上云无忧 > 文档中心 > 百度智能云函数计算 CFC 百度消息服务触发器
函数计算CFC
百度智能云函数计算 CFC 百度消息服务触发器

文档简介:
百度消息服务是全兼容 Apache Kafka 的分布式、高可扩展、高通量的消息托管服务。 如果在百度消息服务的 Topic 上检测到记录,您可以使用 CFC 函数从此 Topic 读取成批的数据来处理,并且 CFC 会定期轮询(每秒一次)Topic 中的新记录。
*此产品及展示信息均由百度智能云官方提供。免费试用 咨询热线:400-826-7010,为您提供专业的售前咨询,让您快速了解云产品,助您轻松上云! 微信咨询
  免费试用、价格特惠

百度消息服务触发器说明

百度消息服务是全兼容 Apache Kafka 的分布式、高可扩展、高通量的消息托管服务。

如果在百度消息服务的 Topic 上检测到记录,您可以使用 CFC 函数从此 Topic 读取成批的数据来处理,并且 CFC 会定期轮询(每秒一次)Topic 中的新记录。

百度消息服务触发器创建

用户可以为新建的函数或已有函数配置百度消息服务触发器,创建函数的流程可以具体参考 从头创建函数,这里不再赘述。

这里假设您已经使用Python2.7创建了一个名为 kafkatrigger 的函数,以下内容以此为前提,将引导您在 CFC 控制台在函数管理页面中为函数配置百度消息服务触发器。接下来,我们将通过以下步骤来完成一个触发器的设置。

编写处理函数

  1. 登录管理控制台,选择“产品服务>云函数计算 CFC”,进入“函数列表”页面
  2. 在“函数列表”页面。点击名称为 kafkatrigger 的函数,进入函数详情页面。

在函数详情页中编写百度消息服务触发器对应的 handler,以对触发操作返回适当的响应,之后点击右下角保存按钮完成函数的修改操作。

# -*- coding: utf-8 -*-
import base64
import json
def handler(event, context): 
	for record in event['Records']:
    	# kafka value is base64 encoded so decode here
    	payload = base64.b64decode(record['Kafka']['Value'])
		print("Decoded payload: " + payload)
	return 'Successfully processed {} records.'.format(len(event['Records']))

注:百度消息服务触发器的event消息体

{
  "Records": [
    {
      "EventId": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl-0-43110",
      "Kafka": {
        "Key": "",          //base64后的key
        "Value": "MA==",    //base64后的value
        "Topic": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl",
        "Partition": 0,
        "Offset": 43110,
        "Timestamp": 1553151704.6529999
      },
      "EventName": "bce:kafka:record",
      "EventSourceBrn": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl",
      "EventSource": "bce:kafka"
    },
    {
      "EventId": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl-0-43111",
      "Kafka": {
        "Key": "",
        "Value": "MQ==",
        "Topic": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl",
        "Partition": 0,
        "Offset": 43111,
        "Timestamp": 1553151704.6529999
      },
      "EventName": "bce:kafka:record",
      "EventSourceBrn": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl",
      "EventSource": "bce:kafka"
    }
  ]
}

配置百度消息服务触发器

  1. 登录管理控制台,选择“产品服务> 函数计算 CFC”,进入“函数列表”页面
  2. 点击需要添加百度消息服务触发器的函数名称(即 kafkatrigger),进入函数详情页面。
  3. 点击左侧导航栏中的“触发器”,进入函数配置页面。 
  4. 在函数配置页面中最下方点击“新增触发器”。 
  5. 在弹出框中,点击下拉框“选择事件源进行添加”,选择百度消息服务触发器。 
  6. 之后在弹出框中配置好选项,并点击确认,完成触发器的创建。 - Topic 选择您将要监听的百度消息服务 Topic - 批处理大小:从 Topic 中一次读取的最大记录数,1-1000 - 起始位置:在 Topic 中开始读取的位置,最新记录对应 kafka 的 OffsetNewest,最老记录对应 kafka 的 OffsetOldest - 启用触发器:是否直接启动触发器,建议先关闭触发器以便测试 
  7. 在函数配置页面“触发器”一栏中,可以看到刚刚创建好的百度消息服务触发器及其信息 

测试触发器

模拟测试

  1. 点击测试按钮 
  2. 输入测试事件,并点击执行

    测试内容如下

    {
      "Records": [
        {
          "EventId": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl-0-43110",
          "Kafka": {
            "Key": "",
            "Value": "MA==",
            "Topic": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl",
            "Partition": 0,
            "Offset": 43110,
            "Timestamp": 1553151704.6529999
          },
          "EventName": "bce:kafka:record",
          "EventSourceBrn": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl",
          "EventSource": "bce:kafka"
        },
        {
          "EventId": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl-0-43111",
          "Kafka": {
            "Key": "",
            "Value": "MQ==",
            "Topic": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl",
            "Partition": 0,
            "Offset": 43111,
            "Timestamp": 1553151704.6529999
          },
          "EventName": "bce:kafka:record",
          "EventSourceBrn": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl",
          "EventSource": "bce:kafka"
        }
      ]
    }

 3. 查看返回的执行结果 

真实测试

在添加好触发器之后,需要设置日志存储,之后向 kafka topic 中发送消息,在日志中可以看到执行结果。

并且在触发器的界面您也可以看到最后的执行结果

相似文档
  • 函数计算支持定时触发器,即在指定的时间点触发函数的执行。触发的时间使用 UTC 时区,计划的最小精度为 1 分钟。 登录管理控制台,选择“产品服务> 函数计算 CFC”,进入“函数列表”页面。 点击需要添加定时触发器的函数名称(即 crontabtrigger),进入函数详情页面。 点击左侧导航栏中的“触发器”,进入函数配置页面。
  • 用户可以使用 cron 或 rate 表达式在函数计算中创建按计划自行触发的规则。所有表达式都使用 UTC 时区,计划的最小精度为 1 分钟。 CFC提供支持 cron 表达式和 rate 表达式。cron 表达式支持的具体的执行您可以定义具体的那一天的那一分钟。 而Rate 表达式更容易定义,它以一定的间隔触发规则,例如每小时一次或每天一次。
  • HTTP触发器实现了将某个函数关联到一个 URL 上(包含相应的 CRUD 操作),它可以接收 HTTP 请求,根据 HTTP 方法、URL,找到匹配的函数将 HTTP 相关信息传入并执行函数,获取执行结果,将函数执行结果包装为 HTTP 返回响应。创建HTTP触发器的核心步骤包括HTTP触发器配置和用户代码配置,以下将为您分别介绍。
  • 用户可以为新建的函数或已有函数配置HTTP触发器,创建函数的流程可以具体参考 创建函数 这里不再赘述。 这里假设用户已完成一个名为httptrigger-helloworld的Nodejs函数的创建,以下内容以此为前提,指引用户在CFC控制台在函数管理页面中为函数配置触发器。接下来,我们将通过三步来完成一个触发器的设置。
  • 用户可以为新建的函数或已有函数配置 BOS 触发器,创建函数的流程可以具体参考 创建函数 。 这里假设用户已完成一个名为 bostrigger 的 Python 函数的创建,以下内容以此为前提,指引用户在 CFC 控制台在函数管理页面中为函数配置触发器。接下来,我们将通过三步来完成一个触发器的设置。
官方微信
联系客服
400-826-7010
7x24小时客服热线
分享
  • QQ好友
  • QQ空间
  • 微信
  • 微博
返回顶部