上云无忧 > 文档中心 > 腾讯云消息队列 CKafka - MySQL 订阅消息官方格式说明
消息队列 CKafka
腾讯云消息队列 CKafka - MySQL 订阅消息官方格式说明

文档简介:
概述: 使用 CKafka 连接器订阅 MySQL 的变更操作时,可选多种消息格式,默认采用 debezium 格式,同时提供了兼容其他消息格式的能力。本文介绍兼容 官方自定义格式 的消息格式说明。
*此产品及展示信息均由腾讯云官方提供。免费试用 咨询热线:400-826-7010,为您提供专业的售前咨询,让您快速了解云产品,助您轻松上云! 微信咨询
  免费试用、价格特惠

概述

使用 CKafka 连接器订阅 MySQL 的变更操作时,可选多种消息格式,默认采用 debezium 格式,同时提供了兼容其他消息格式的能力。本文介绍兼容 官方自定义格式 的消息格式说明。

“官方格式一”说明

“官方格式一”目前仅支持 DML 消息,DDL 消息格式与 canal 格式一致。
字段名称
字段说明
BINLOG_NAME
binlog 日志文件名称
BINLOG_POS
binlog 日志的 pos 位置
DATABASE
数据库名称
EVENT_SERVER_ID
暂时默认为 null
GLOBAL_ID
如果开启 GTID,则为 GTID 信息
GROUP_ID
暂时默认为 null
NEW_VALUES
type = U,则为更新后的行信息,格式为 json
type = D,则为 null
type = I,则为新插入的行信息,格式为 json
OLD_VALUES
type = U,则为更新前的行信息,格式为 json
type = D,则为删除的行信息,格式为 json
type = I,则为 null
TABLE
表名
TIME
日志生成时间
TYPE
日志类型:
U:update
D:delete
l:insert

DDL 格式

create database

		
{
"data": null,
"database": "dip_test",
"es": 1655812326,
"id": 0,
"isDdl": true,
"mysqlType": null,
"old": null,
"pkNames": null,
"sql": "CREATE DATABASE `dip_test` CHARSET utf8mb4 COLLATE utf8mb4_0900_ai_ci",
"sqlType": null,
"table": "",
"ts": 1655812326,
"type": "QUERY"
}

drop database

		
{
"data": null,
"database": "dip_test",
"es": 1655812326,
"id": 0,
"isDdl": true,
"mysqlType": null,
"old": null,
"pkNames": null,
"sql": "DROP DATABASE IF EXISTS `dip_test`",
"sqlType": null,
"table": "",
"ts": 1655812326,
"type": "QUERY"
}

create table

		
{
"data": null,
"database": "dip_test",
"es": 1655812326,
"id": 0,
"isDdl": true,
"mysqlType": null,
"old": null,
"pkNames": null,
"sql": "CREATE TABLE `customers` (
`id` int NOT NULL AUTO_INCREMENT,
`first_name` varchar(255) NOT NULL,
`last_name` varchar(255) NOT NULL,
`email` varchar(255) NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `email` (`email`),
KEY `ix_id` (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1041 DEFAULT CHARSET=utf8",
"sqlType": null,
"table": "customers",
"ts": 1655812326,
"type": "CREATE"
}

alter table

		
{
"data": null,
"database": "test",
"es": 1655782153,
"id": 0,
"isDdl": true,
"mysqlType": null,
"old": null,
"pkNames": null,
"sql": "ALTER TABLE `user` ADD COLUMN `createtime` datetime NULL DEFAULT CURRENT_TIMESTAMP",
"sqlType": null,
"table": "user",
"ts": 1655782153,
"type": "ALTER"
}

drop table

		
{
"data": null,
"database": "dip_test",
"es": 1655812326,
"id": 0,
"isDdl": true,
"mysqlType": null,
"old": null,
"pkNames": null,
"sql": "DROP TABLE IF EXISTS `dip_test`.`customers`",
"sqlType": null,
"table": "customers",
"ts": 1655812326,
"type": "ERASE"
}

rename table

		
{
"data": null,
"database": "testDB",
"es": 1656300979748,
"id": 0,
"isDdl": true,
"mysqlType": null,
"old": null,
"pkNames": null,
"sql": "rename table test to t_test",
"sqlType": null,
"table": "t_test",
"ts": 1656300979748,
"type": "RENAME"
}

DML 格式

insert

		
{
"BINLOG_NAME": "mysql-bin.000003",
"BINLOG_POS": 154,
"DATABASE": "inventory",
"EVENT_SERVER_ID": null,
"GLOBAL_ID": null,
"GROUP_ID": null,
"NEW_VALUES": {
"last_name": "Kretchmar",
"id": "1004",
"first_name": "Anne",
"email": "annek@noanswer.org"
},
"OLD_VALUES": null,
"TABLE": "customers",
"TIME": "19700101080000",
"TYPE": "I"
}

update

		
{
"BINLOG_NAME": "mysql-bin.000003",
"BINLOG_POS": 484,
"DATABASE": "inventory",
"EVENT_SERVER_ID": null,
"GLOBAL_ID": null,
"GROUP_ID": null,
"NEW_VALUES": {
"last_name": "Kretchmar",
"id": "1004",
"first_name": "Anne Marie",
"email": "annek@noanswer.org"
},
"OLD_VALUES": {
"last_name": "Kretchmar",
"id": "1004",
"first_name": "Anne",
"email": "annek@noanswer.org"
},
"TABLE": "customers",
"TIME": "20160611015029",
"TYPE": "U"
}

delete

		
{
"BINLOG_NAME": "mysql-bin.000003",
"BINLOG_POS": 805,
"DATABASE": "inventory",
"EVENT_SERVER_ID": null,
"GLOBAL_ID": null,
"GROUP_ID": null,
"NEW_VALUES": null,
"OLD_VALUES": {
"last_name": "Kretchmar",
"id": "1004",
"first_name": "Anne Marie",
"email": "annek@noanswer.org"
},
"TABLE": "customers",
"TIME": "20160611020502",
"TYPE": "D"
}
相似文档
  • 概述: 使用连接器订阅 MySQL 的变更操作时,可选多种消息格式,默认采用 debezium 格式,同时提供了兼容开源 canal 等格式的能力。本文介绍兼容 canal 的消息格式说明。
  • 背景: CKafka 连接器支持将订阅的多个 Mysql 数据库表的变更消息推送到 Kafka 的 Topic,有两种推送形式: 1. 支持将多个表的消息推送到同一个 Topic。 2. 支持将多个表的消息推送到不同的 Topic。
  • 背景: 在使用 CKafka 连接器订阅 Postgresql 数据库时,需要给 连接管理 中配置的 PostgreSQL 用户分配相应的权限。只有拥有相应权限的用户从被允许的主机访问数据库时,才能够进行消息的同步。
  • 概览: 在通过 CKafka 连接器处理数据流入流出任务时,通常需要对数据进行简易的清洗操作,如格式化原始数据、解析特定字段、数据格式转换等等。开发者往往需要自己搭建一套数据清洗的服务(ETL)。
  • Ckafka 连接器的数据处理功能提供了根据正则表达式提取消息内容的能力,正则提取采用的是开源的正则提取包 re2 。 Java 的标准正则表达式包 java.util.regex 以及其他被广泛使用的正则表达式包如 PCRE、Perlre和 Python(re),都使用回溯实现策略,即当一个 pattern 出现两个替代方案a|b 的时候,引擎将首先尝试匹配子模式a,如果匹配失败,它将重置输入流并尝试匹配子模式 b。
官方微信
联系客服
400-826-7010
7x24小时客服热线
分享
  • QQ好友
  • QQ空间
  • 微信
  • 微博
返回顶部