上云无忧 > 文档中心 > 腾讯云消息队列 CKafka 实战教程 - PostgreSQL 到 Elasticsearch 实时数据同步
消息队列 CKafka
腾讯云消息队列 CKafka 实战教程 - PostgreSQL 到 Elasticsearch 实时数据同步

文档简介:
操作场景: 本文档满足如下场景:将 PostgreSQL 表的数据(存量+增量)实时同步数据同步到目标ES索引。实时同步需要同步新增,修改和删除操作。即当 PostgreSQL 源表出现新增、修改、删除时,目标 ES 中的数据也需要发生相应的增删改。
*此产品及展示信息均由腾讯云官方提供。免费试用 咨询热线:400-826-7010,为您提供专业的售前咨询,让您快速了解云产品,助您轻松上云! 微信咨询
  免费试用、价格特惠

场景说明

本文档满足如下场景:将 PostgreSQL 表的数据(存量+增量)实时同步数据同步到目标ES索引。实时同步需要同步新增,修改和删除操作。即当 PostgreSQL 源表出现新增、修改、删除时,目标 ES 中的数据也需要发生相应的增删改。

使用限制

本种实时同步增删改的任务。一个订阅任务只能订阅一张表的数据,一个 Topic 里面只能存一个表的订阅数据。即订阅一张表的数据到 ES 需要创建一个订阅任务,一个 Topic,一个数据流出任务。同时为了保证数据同步的有序性,一个 Topic 只支持一个分区。
说明
如果需要同步多张表的数据到 ES 里面。则需要创建多个 Topic,同时创建多个订阅任务和流出任务。

操作步骤

步骤1:创建连接

1. 创建 PostgreSQL 连接

1. 单击连接器中的 连接列表,单击新建连接,选择 TDSQL-C 数据库。

2. 填写需要同步的 PostgreSQL 数据库的相关信息。

2. 创建 Elasticsearch 连接

1. 单击连接器中的 连接列表,单击新建连接,选择 Elasticsearch Service

2. 配置 ES 相关的参数:

步骤2:创建 Topic

创建 Topic 有两种方式,如果已购买 kafka 实例,则直接在实例中新建 Topic 即可。否则,可以新建按量付费的 Topic(无需购买实例)。购买按量付费 Topic 操作步骤如下:
进入 Ckafka 控制台,选择弹性 Topic,单击新建 Topic。(若计划将数据同步至 Ckafka 实例中的 topic 则可跳过此步骤

步骤3:创建数据订阅任务

1. 单击连接器中的 任务管理,单击任务列表,单击新建任务

2. 单击下一步,填写数据源配置信息,数据源为 步骤一 中创建的 Postgresql 订阅连接:

3. 继续单击下一步,选择数据目标信息,即同步 PostgreSQL 数据的 topic,根据实际情况选择弹性 TopicCKafka 实例内 Topic 即可,此处选择 步骤二 中创建的弹性 topic:

4. 任务创建成功后,在任务详情 > 查看消息可以看到订阅的数据信息:

注意:
只有源表有数据存在的时候,才会订阅到消息。当源表没有数据时,可执行如下类似的 insert 语句,触发订阅行为,即可查询到订阅的数据:
			
insert into test values('testname',25);

步骤4:创建数据流出任务

1. 新建连接后,单击任务管理 > 任务列表,单击新建任务,任务类型选择数据流出,选择 Elasticsearch Service

2. 配置数据源,选择同步了 MySQL 数据的 topic,这里选择 步骤1 中的 topic,选择 从最开始位置开始消费

3. 下一步中数据处理可根据实际情况进行配置,这里不进行相关配置,使用原始消息数据。最后进行 ES 相关配置,其中主键为数据库表的主键名称。
注意
此模式需要开启 数据库同步模式,并填写表的主键的列名,如此处主键列名为 id

4. 当数据任务运行后,即可在 ES 对应的索引中查询到相应的消息。
相似文档
  • 背景: Canal 是阿里巴巴开源的一款通过解析 MySQL 数据库增量日志,达到增量数据订阅和消费目的 CDC 工具。Canal 解析 MySQL 的 binlog 后可投递到 kafka 一类的消息中间件,供下游系统进行分析处理。
  • 操作场景: 在使用 CKafka 连接器在进行数据流入流出服务时,可能会遇到如下情况: 需要对消息中的特定字段进行解析,得到相关信息。 需要多次迭代处理消息中某一字段。 需要处理为未经过结构化的原始消息后,才能继续使用。 需要处理多层嵌套格式的消息。
  • 操作场景: 在使用 TKE 容器服务 时,先前通常使用如下方法,来获取并查询部署的组件服务日志: 1. 从 TKE 控制台 登录容器节点,切换到日志所在文件夹并查看日志。 2. 从 TKE 控制台 查询重定向至标准输出的容器日志。 3. SSH 终端 登录集群节点,查询挂载在宿主机上的日志,或使用 Kubectl 登录容器查询日志。 4. 使用 CLS 采集器 采集日志,并在 日志服务 的控制台查询。
  • Flink 简介: Apache Flink 是一个可以处理流数据的实时处理框架,用于在无界和有界数据流上进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。
  • 无论是使用传统的 Avro API 自定义序列化类与反序列化类,还是使用 Twitter 的 Bijection 类库实现 Avro 的序列化与反序列化,两种方法有相同的缺点:在每条 Kafka 记录里都嵌入了 Schema,从而导致记录的大小成倍增加。但是不管怎样,在读取记录时仍然需要用到整个 Schema,所以要先找到 Schema。
官方微信
联系客服
400-826-7010
7x24小时客服热线
分享
  • QQ好友
  • QQ空间
  • 微信
  • 微博
返回顶部