上云无忧 > 文档中心 > 腾讯云消息队列 CKafka - MySQL 订阅消息 canal 格式说明
消息队列 CKafka
腾讯云消息队列 CKafka - MySQL 订阅消息 canal 格式说明

文档简介:
概述: 使用连接器订阅 MySQL 的变更操作时,可选多种消息格式,默认采用 debezium 格式,同时提供了兼容开源 canal 等格式的能力。本文介绍兼容 canal 的消息格式说明。
*此产品及展示信息均由腾讯云官方提供。免费试用 咨询热线:400-826-7010,为您提供专业的售前咨询,让您快速了解云产品,助您轻松上云! 微信咨询
  免费试用、价格特惠

概述

使用连接器订阅 MySQL 的变更操作时,可选多种消息格式,默认采用 debezium 格式,同时提供了兼容开源 canal 等格式的能力。本文介绍兼容 canal 的消息格式说明。

canal 格式说明

兼容字段
canal原生字段
是否支持
id(默认值0)
id(canal 生成的消息 ID)
database(数据库名)
database(数据库名)
table(表名)
table(表名)
pkNames(默认值null)
pkNames(主键字段名)
isDdl(变更是否属于DDL)
isDdl(变更是否属于 DDL)
type(变更类型)
type(变更类型)
es(binlog时间戳)
es(binlog 时间戳)
ts(connector 同步时间,仅 DML 支持,DDL 中与 ts 一致)
ts(connector 同步时间)
sql(DDL 执行语句)
sql(DDL 执行语句)
sqlType(暂不支持,默认值 null)
sqlType(与 mysqlType 对应的数据类型编号)
mysqlType(暂不支持,默认值 null)
mysqlType(mysql 中的字段类型)
data(变更后的记录-全字段)
data(变更后的记录-全字段)
old(变更前的记录--全字段)
old(变更前的记录--仅变更字段)

DDL 格式

create database

		
{
"data": null,
"database": "dip_test",
"es": 1655812326,
"id": 0,
"isDdl": true,
"mysqlType": null,
"old": null,
"pkNames": null,
"sql": "CREATE DATABASE `dip_test` CHARSET utf8mb4 COLLATE utf8mb4_0900_ai_ci",
"sqlType": null,
"table": "",
"ts": 1655812326,
"type": "QUERY"
}

drop database

		
{
"data": null,
"database": "dip_test",
"es": 1655812326,
"id": 0,
"isDdl": true,
"mysqlType": null,
"old": null,
"pkNames": null,
"sql": "DROP DATABASE IF EXISTS `dip_test`",
"sqlType": null,
"table": "",
"ts": 1655812326,
"type": "QUERY"
}

create table

		
{
"data": null,
"database": "dip_test",
"es": 1655812326,
"id": 0,
"isDdl": true,
"mysqlType": null,
"old": null,
"pkNames": null,
"sql": "CREATE TABLE `customers` (
`id` int NOT NULL AUTO_INCREMENT,
`first_name` varchar(255) NOT NULL,
`last_name` varchar(255) NOT NULL,
`email` varchar(255) NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `email` (`email`),
KEY `ix_id` (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1041 DEFAULT CHARSET=utf8",
"sqlType": null,
"table": "customers",
"ts": 1655812326,
"type": "CREATE"
}

alter table

		
{
"data": null,
"database": "test",
"es": 1655782153,
"id": 0,
"isDdl": true,
"mysqlType": null,
"old": null,
"pkNames": null,
"sql": "ALTER TABLE `user` ADD COLUMN `createtime` datetime NULL DEFAULT CURRENT_TIMESTAMP",
"sqlType": null,
"table": "user",
"ts": 1655782153,
"type": "ALTER"
}

drop table

		
{
"data": null,
"database": "dip_test",
"es": 1655812326,
"id": 0,
"isDdl": true,
"mysqlType": null,
"old": null,
"pkNames": null,
"sql": "DROP TABLE IF EXISTS `dip_test`.`customers`",
"sqlType": null,
"table": "customers",
"ts": 1655812326,
"type": "ERASE"
}

rename table

		
{
"data": null,
"database": "testDB",
"es": 1656300979748,
"id": 0,
"isDdl": true,
"mysqlType": null,
"old": null,
"pkNames": null,
"sql": "rename table test to t_test",
"sqlType": null,
"table": "t_test",
"ts": 1656300979748,
"type": "RENAME"
}

DML 格式

insert

		
{
"data": [
{
"last_name": "Kretchmar",
"id": 1004,
"first_name": "Anne",
"email": "annek@noanswer.org"
}
],
"database": "inventory",
"es": 0,
"id": 0,
"isDdl": false,
"mysqlType": null,
"old": null,
"pkNames": null,
"sql": "",
"sqlType": null,
"table": "customers",
"ts": 1465491411815,
"type": "INSERT"
}

update

		
{
"data": [
{
"last_name": "Kretchmar",
"id": 1004,
"first_name": "Anne Marie",
"email": "annek@noanswer.org"
}
],
"database": "inventory",
"es": 1465581029100,
"id": 0,
"isDdl": false,
"mysqlType": null,
"old": [
{
"last_name": "Kretchmar",
"id": 1004,
"first_name": "Anne",
"email": "annek@noanswer.org"
}
],
"pkNames": null,
"sql": "",
"sqlType": null,
"table": "customers",
"ts": 1465581029523,
"type": "UPDATE"
}

delete

		
{
"data": null,
"database": "inventory",
"es": 1465581902300,
"id": 0,
"isDdl": false,
"mysqlType": null,
"old": [
{
"last_name": "Kretchmar",
"id": 1004,
"first_name": "Anne Marie",
"email": "annek@noanswer.org"
}
相似文档
  • 背景: CKafka 连接器支持将订阅的多个 Mysql 数据库表的变更消息推送到 Kafka 的 Topic,有两种推送形式: 1. 支持将多个表的消息推送到同一个 Topic。 2. 支持将多个表的消息推送到不同的 Topic。
  • 背景: 在使用 CKafka 连接器订阅 Postgresql 数据库时,需要给 连接管理 中配置的 PostgreSQL 用户分配相应的权限。只有拥有相应权限的用户从被允许的主机访问数据库时,才能够进行消息的同步。
  • 概览: 在通过 CKafka 连接器处理数据流入流出任务时,通常需要对数据进行简易的清洗操作,如格式化原始数据、解析特定字段、数据格式转换等等。开发者往往需要自己搭建一套数据清洗的服务(ETL)。
  • Ckafka 连接器的数据处理功能提供了根据正则表达式提取消息内容的能力,正则提取采用的是开源的正则提取包 re2 。 Java 的标准正则表达式包 java.util.regex 以及其他被广泛使用的正则表达式包如 PCRE、Perlre和 Python(re),都使用回溯实现策略,即当一个 pattern 出现两个替代方案a|b 的时候,引擎将首先尝试匹配子模式a,如果匹配失败,它将重置输入流并尝试匹配子模式 b。
  • 操作场景: JSON 目前是互联网信息传递中最通用的格式协议之一。目前数据处理也主要围绕 JSON 数据格式进行解析处理。 JSONPath 是针对 JSON 格式推出的消息查询语法规范。在数据处理中,不仅能够使用简单的 JSONPath 语法,快速获取复杂嵌套 JSON 结构体的某一成员的值;还能使用 JayWay 库的扩展函数,聚合或操作某一类型的成员字段。
官方微信
联系客服
400-826-7010
7x24小时客服热线
分享
  • QQ好友
  • QQ空间
  • 微信
  • 微博
返回顶部