上云无忧 > 文档中心 > 腾讯云消息队列 CKafka 实战教程 - 替换 Canal 实现 MySQL 数据库订阅
消息队列 CKafka
腾讯云消息队列 CKafka 实战教程 - 替换 Canal 实现 MySQL 数据库订阅

文档简介:
背景: Canal 是阿里巴巴开源的一款通过解析 MySQL 数据库增量日志,达到增量数据订阅和消费目的 CDC 工具。Canal 解析 MySQL 的 binlog 后可投递到 kafka 一类的消息中间件,供下游系统进行分析处理。
*此产品及展示信息均由腾讯云官方提供。免费试用 咨询热线:400-826-7010,为您提供专业的售前咨询,让您快速了解云产品,助您轻松上云! 微信咨询
  免费试用、价格特惠

背景

Canal 是阿里巴巴开源的一款通过解析 MySQL 数据库增量日志,达到增量数据订阅和消费目的 CDC 工具。Canal 解析 MySQL 的 binlog 后可投递到 kafka 一类的消息中间件,供下游系统进行分析处理。 如果您正在或考虑使用 Canal 同步 MySQL 的增量变更记录到 kafka,腾讯云 CKafka 连接器提供了完美兼容此场景的能力。 本文介绍 Canal 拉取 MySQL 变更记录并投递到 kafka 的简易使用场景以及 CKafka 连接器相应的替换处理方式。

CKafka 连接器订阅 MySQL 功能列表

允许选择多库多表。
允许指定根据队列消息进行分区同步。
允许通过正则匹配选择多表。
支持存量数据的同步。
同时支持 DDL、DML 类型的变更。
兼容 Canal、debezium 等消息格式。

案例说明

前提:已有 MySQL 实例和 kafka 实例

Canal 订阅 MySQL 变更记录到 kafka

1. 下载 canal server: canal.deployer-1.1.x.tar.gz 并解压。
		
tar -zxvf canal.deployer-1.1.x.tar.gz
2. 进入解压目录,配置 conf/example/instance.properties 文件,下面展示必须配置的 mysql 实例信息以及消息中间件的 topic、partition 等信息:
		
canal.instance.master.address=xx.xx.xx.xx:3306
canal.instance.dbUsername=user
canal.instance.dbPassword=password
canal.mq.topic=canal-test-1
canal.mq.partition=0
3. 进入解压目录,配置 conf/canal.properties 文件,下面展示必须配置的消息中间件实例信息:
		
canal.serverMode = kafka
kafka.bootstrap.servers = xx.xx.xx.xx:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0
4. 启动 canal server:
		
sh bin/startup.sh
启动 canal server 后,Canal 开始根据配置获取 MySQL 的增量变更消息并推送到 kafka 集群。

CKafka 连接器 订阅 MySQL 变更记录到 kafka

1. 新建连接,选择 MySQL 实例,CKafka 连接器同时支持腾讯云 MySQL 实例和云上自建 MySQL 实例:

2. 新建数据接入任务,选择 MySQL 数据订阅(Binlog):

3. 数据源选择第一步中配置的连接,CKafka 连接器支持多种订阅库表的方式,包括全部库表、批量选择、正则匹配:

4. CKafka 连接器提供了高级配置选项,您可以根据业务需求自行进行选择配置,其中数据格式选择 Canal 格式:

5. 最后配置 Canal 的数据目标,CKafka 连接器支持将消息推送到 Ckafka 实例中的 Topic 或单独创建的弹性 Topic。

6. 可在控制台查看 Topic 的落盘消息情况:

更多功能

多种上游数据源的订阅

除了订阅 MySQL 数据源,CKafka 连接器同时支持订阅其他多种上游数据源,如 Postgresql、MongoDB、MariaDB、SQL Server 等。

可视化实时监控面板

CKafka 连接器提供了可视化的实时监控面板,可以一键更改配置信息、一键查看各种性能指标、一键查看 Topic 中落盘的消息内容,能够做到高可靠、免运维。

对标 logstash 的数据处理

CKafka 连接器提供了对标 logstash 的数据处理能力,仅需通过界面进行编辑即可创建多种数据处理规则。详情参见 数据处理规则说明

多种下游数据源的投递

CKafka 连接器支持多种下游数据源的消息推送功能,包括 Elastic Search、ClickHouse 等。如果您正在或考虑使用 Canal 将消息同步到 kafka 以外的其他数据源,CKafka 连接器同样提供了相应的能力。
相似文档
  • 操作场景: 在使用 CKafka 连接器在进行数据流入流出服务时,可能会遇到如下情况: 需要对消息中的特定字段进行解析,得到相关信息。 需要多次迭代处理消息中某一字段。 需要处理为未经过结构化的原始消息后,才能继续使用。 需要处理多层嵌套格式的消息。
  • 操作场景: 在使用 TKE 容器服务 时,先前通常使用如下方法,来获取并查询部署的组件服务日志: 1. 从 TKE 控制台 登录容器节点,切换到日志所在文件夹并查看日志。 2. 从 TKE 控制台 查询重定向至标准输出的容器日志。 3. SSH 终端 登录集群节点,查询挂载在宿主机上的日志,或使用 Kubectl 登录容器查询日志。 4. 使用 CLS 采集器 采集日志,并在 日志服务 的控制台查询。
  • Flink 简介: Apache Flink 是一个可以处理流数据的实时处理框架,用于在无界和有界数据流上进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。
  • 无论是使用传统的 Avro API 自定义序列化类与反序列化类,还是使用 Twitter 的 Bijection 类库实现 Avro 的序列化与反序列化,两种方法有相同的缺点:在每条 Kafka 记录里都嵌入了 Schema,从而导致记录的大小成倍增加。但是不管怎样,在读取记录时仍然需要用到整个 Schema,所以要先找到 Schema。
  • Spark Streaming 是 Spark Core 的一个扩展,用于高吞吐且容错地处理持续性的数据,目前支持的外部输入有 Kafka、Flume、HDFS/S3、Kinesis、Twitter 和 TCP socket。
官方微信
联系客服
400-826-7010
7x24小时客服热线
分享
  • QQ好友
  • QQ空间
  • 微信
  • 微博
返回顶部