腾讯云消息队列 CKafka 新建数据流出任务 - Elasticsearch Service
文档简介:
操作场景:
CKafka 连接器提供数据流出能力,您可以将 CKafka 数据分发至 Elasticsearch Service(ES)便于海量数据存储搜索、实时日志分析等操作。
操作场景
CKafka 连接器提供数据流出能力,您可以将 CKafka 数据分发至 Elasticsearch Service(ES)便于海量数据存储搜索、实时日志分析等操作。
说明
只支持7.0以上版本的 Elasticsearch Service。
前提条件
该功能目前依赖 Elasticsearch Service 服务,使用时需开通相关产品功能。
已创建好数据流出目标 Elasticsearch Service 连接。
操作步骤
1. 登录 CKafka 控制台。
2. 在左侧导航栏单击连接器 > 任务列表,选择好地域后,单击新建任务。
3. 填写任务名称,任务类型选择数据流出,数据目标类型选择 Elasticsearch Service,单击下一步。
4. 配置数据源信息。

Topic 类型:选择数据源 Topic。
弹性 Topic:选择提前创建好的弹性 Topic,详情参见 Topic 管理。
CKafka 实例内 Topic:选择在 CKafka 创建好的实例和 Topic,若实例设置了ACL 策略,请确保选中的 topic 有读写权限,详情参见 创建 Topic。
起始位置:选择转储时历史消息的处理方式,topic offset 设置。
5. 设置上述信息后,单击下一步,单击预览 Topic 消息,将会选取源 Topic 中的第一条消息进行解析。
说明
目前解析消息需要满足以下条件:
消息为 JSON 字符串结构。
源数据必须为单层 JSON 格式,嵌套 JSON 格式可使用 数据处理 进行简单的消息格式转换。
6. (可选)开启对源数据进行数据处理按钮,具体配置方法请参见 简单数据处理。
7. 单击下一步,配置数据目标信息。

源数据:单击拉取源 Topic 数据。若源Topic暂无数据,也可以自定义数据。
数据目标:选择提前创建好的数据流出的目标 Elasticsearch Service 连接。
索引名称:填写索引名称,索引名称必须全部为小写,支持 jsonpath 语法。
按日期拆分索引名称:可选,开启后需选择日期格式,写入 ES 的索引为%(索引名称)_%(日期)。
失败消息处理:选择投递失败的消息的处理方式,支持丢弃、保留和投递至 CLS (需指定投递到的日志集和日志主题并授权访问日志服务 CLS)三种方式。
保留:适合用于测试环境,任务运行失败时将会终止任务不会重试,并且在事件中心中记录失败原因。
丢弃:适合用于生产环境,任务运行失败时将会忽略当前失败消息。建议使用 "保留" 模式测试无误后,再将任务编辑成 "丢弃" 模式用于生产。
投递至 CLS:适合用于严格生产环境,任务运行失败时会将失败消息及元数据和失败原因上传到指定 CLS 主题中。
死信队列:适合用于严格生产环境,任务运行失败时会将失败消息及元数据和失败原因投递到指定的 CKafak Topic中。
数据源类型

索引时间:可以指定源数据中某一字段作为索引时间。默认为消息的投递时间。
ES文档 ID 字段:可以指定该字段的值作为 ES 文档 ID 的值。默认为topic+kafkaPartition+kafkaOffset。
保留非 JSON 数据:若开启,则对于非 JSON 数据,会指定 KEY 进行组装投递。若关闭,则会丢弃非 JSON 数据。
本选项仅用于连接器订阅关系型数据库到 Topic 里面的数据(增删改)同步更新到 ES 。会识别数据库的增删改,保持 ES 的数据与源表的数据一致。

同步模式:若选择字段逐一匹配,则可以自定义消息字段名和目标索引字段的映射关系。若选择默认字段匹配,则会在 ES 的索引中,mapping 使用消息的 key 作为 field 名称。
目标索引类型:可以选择新建索引或者从现有的 ES 索引中选择。
主键:指定数据库表的主键作为 ES 文档 ID 的值。
索引时间:可以指定源数据中的某一字段作为索引时间。默认为消息的投递时间。
8. 单击提交,可以在任务列表看到刚刚创建的任务,在状态栏可以查看任务创建进度。