上云无忧 > 文档中心 > 腾讯云消息队列 CKafka - MongoDB 数据订阅
消息队列 CKafka
腾讯云消息队列 CKafka - MongoDB 数据订阅

文档简介:
简介: MongoDB Kafka Connector 允许监控一个 Mongo 实例内的所有数据库(database)或单个数据库,也允许监控某个数据库内的所有集合(collection)或单个集合。将 Mongo 的修改信息生成修改事件消息,以消息流的方式提交给 kafka 的 topic。客户端应用可以通过消费对应 topic 中的消息来对数据库修改事件进行处理,从而达到监控特定数据库的目的。
*此产品及展示信息均由腾讯云官方提供。免费试用 咨询热线:400-826-7010,为您提供专业的售前咨询,让您快速了解云产品,助您轻松上云! 微信咨询
  免费试用、价格特惠

简介

MongoDB Kafka Connector 允许监控一个 Mongo 实例内的所有数据库(database)或单个数据库,也允许监控某个数据库内的所有集合(collection)或单个集合。将 Mongo 的修改信息生成修改事件消息,以消息流的方式提交给 kafka 的 topic。客户端应用可以通过消费对应 topic 中的消息来对数据库修改事件进行处理,从而达到监控特定数据库的目的。
本文档是对 Mongo 官方文档的归纳和整理,详情参见 MongoDB Change Events

事件格式

以下 JSON 框架展示了所有修改事件消息中可能出现的字段:
		
{
_id : { <BSON Object> },
"operationType" : "<operation>",
"fullDocument" : { <document> },
"ns" : {
"db" : "<database>",
"coll" : "<collection>"
},
"to" : {
"db" : "<database>",
"coll" : "<collection>"
},
"documentKey" : { "_id" : <value> },
"updateDescription" : {
"updatedFields" : { <document> },
"removedFields" : [ "<field>", ... ],
"truncatedArrays" : [
{ "field" : <field>, "newSize" : <integer> },
...
]
},
"clusterTime" : <Timestamp>,
"txnNumber" : <NumberLong>,
"lsid" : {
"id" : <UUID>,
"uid" : <BinData>
}
}
其中部分字段可能只在特定的事件类型中才会出现,下表对相应字段及其含义进行了描述。
Field
Type
Description
_id
document
一个用来唯一标识事件的 BSON 对象。 _id 对象的格式如下:{ "_data" : <BinData|hex string>} 。_data 的类型取决于 MongoDB 的版本 ,可通过 Resume Tokens 查看完整的_data类型介绍。
operationType
string
触发修改事件的操作类型,具体包括以下 8 种:insertdeletereplaceupdatedroprenamedropDatabaseinvalidate
fullDocument
document
表示被新增( insert), 替换(replace), 删除(delete), 更新(update )操作所影响的文档。对于 insert 和 replace 操作,该字段表示新增的文档。对于 delete 操作,该字段缺省表示文档已经不存在。对于 update 操作,只有配置了 fullDocument 为 updateLookup 时才会显示。
ns
document
命名空间(namespace),由 database 和 collection 构成。
ns.db
string
数据库名称。
ns.coll
string
集合名称。对于 dropDatabase 操作,该字段缺省。
to
document
当操作类型为 rename 时,表示新的集合名称。该字段对其他操作是缺省的。
to.db
string
新的数据库的名称。
to.coll
string
新的集合名称。
documentKey
document
操作修改的文档的 ID。
updateDescription
document
一个用来描述被更新操作(update operation)修改的字段的文档。该字段仅当事件对应的操作为 update 时才有。
updateDescription.updatedFields
document
包含被更新操作修改的字段,字段的 value 值为更新后的值。
updateDescription.removedFields
array
包含被更新操作删除的字段。
updateDescription.truncatedArrays
array
其中记录了使用以下一个或多个基于 pipeline 的更新执行的数组截断:$addFields $set $replaceRoot $replaceWith
updateDescription.truncatedArrays.field
string
被删除的字段。
updateDescription.truncatedArrays.newSize
integer
truncated array 中的元素个数。
clusterTime
Timestamp
oplog 与事件关联的时间戳。对于涉及 多文档事务, 关联的事件的 clusterTime 值是相同的。
txnNumber
NumberLong
事务 ID。仅当操作是 多文档事务 时出现。
lsid
Document
与事务关联的 session 的 ID,仅当操作是 多文档事务 时出现。

事件列表

新增事件(insert event)

		
{
_id: { < Resume Token > },
operationType: 'insert',
clusterTime: <Timestamp>,
ns: {
db: 'engineering',
coll: 'users'
},
documentKey: {
userName: 'alice123',
_id: ObjectId("599af247bb69cd8996xxxxxx")
},
fullDocument: {
_id: ObjectId("599af247bb69cd8996xxxxxx"),
userName: 'alice123',
name: 'Alice'
}
}
其中 documentKey 字段同时包含了 _id 和 username 字段。表示 engineering.users 集合是分片的,shard key 为 username 和 _id。

更新事件(update event)

		
{
_id: { < Resume Token > },
operationType: 'update',
clusterTime: <Timestamp>,
ns: {
db: 'engineering',
coll: 'users'
},
documentKey: {
_id: ObjectId("58a4eb4a30c75625e0xxxxxx")
},
updateDescription: {
updatedFields: {
email: 'alice@10gen.com'
},
removedFields: ['phoneNumber'],
truncatedArrays: [ {
"field" : "vacation_time",
"newSize" : 36
} ]
}
}
以下例子展示了 update event 配置了 fullDocument : updateLookup 选项的消息内容:
		
{
_id: { < Resume Token > },
operationType: 'update',
clusterTime: <Timestamp>,
ns: {
db: 'engineering',
coll: 'users'
},
documentKey: {
_id: ObjectId("58a4eb4a30c75625e0xxxxxx")
},
updateDescription: {
updatedFields: {
email: 'alice@10gen.com'
},
removedFields: ['phoneNumber'],
truncatedArrays: [ {
"field" : "vacation_time",
"newSize" : 36
} ]
},
fullDocument: {
_id: ObjectId("58a4eb4a30c75625e0xxxxxx"),
name: 'Alice',
userName: 'alice123',
email: 'alice@10gen.com',
team: 'replication'
}
}

替换事件(replace event)

		
{
_id: { < Resume Token > },
operationType: 'replace',
clusterTime: <Timestamp>,
ns: {
db: 'engineering',
coll: 'users'
},
documentKey: {
_id: ObjectId("599af247bb69cd8996xxxxxx")
},
fullDocument: {
_id: ObjectId("599af247bb69cd8996xxxxxx"),
userName: 'alice123',
name: 'Alice'
}
}
replace 操作是通过两步操作实现的:
删除原 documentKey 对应的文档
根据一样的 documentkey插入新的文档
基于 replace 事件的 fullDocument 字段表示的是插入后的新文档。

删文档事件(delete event)

		
{
_id: { < Resume Token > },
operationType: 'delete',
clusterTime: <Timestamp>,
ns: {
db: 'engineering',
coll: 'users'
},
documentKey: {
_id: ObjectId("599af247bb69cd8996xxxxxx")
}
}
对于删除文档事件的消息,fullDocument 字段缺省。

删集合事件(drop event)

		
{
_id: { < Resume Token > },
operationType: 'drop',
clusterTime: <Timestamp>,
ns: {
db: 'engineering',
coll: 'users'
}
}
当一个集合被删除时会触发该事件,同时会导致订阅了该集合的 connector 产生一个无效事件(invalidate event)。

改名事件(rename event)

		
{
_id: { < Resume Token > },
operationType: 'rename',
clusterTime: <Timestamp>,
ns: {
db: 'engineering',
coll: 'users'
},
to: {
db: 'engineering',
coll: 'people'
}
}
当一个集合名称被更改时会触发该事件,同时会导致订阅了该集合的 connector 产生一个无效事件(invalidate event)。

删库事件(drop database event)

		
{
_id: { < Resume Token > },
operationType: 'dropDatabase',
clusterTime: <Timestamp>,
ns: {
db: 'engineering'
}
}
当一个数据库被删除时会触发该事件,同时会导致订阅了该集合的 connector 产生一个无效事件(invalidate event)。
在生成数据库删除事件(dropDatabase)之前,会为数据库中的每一个集合生成一个集合删除事件(drop event)。

无效事件(invalidate event)

		
{
_id: { < Resume Token > },
operationType: 'invalidate',
clusterTime: <Timestamp>
}
对于订阅了一个集合(collection)的 connector,drop event,rename event 或 dropDatabase event 这类会对该集合产生影响的事件都会产生一个无效事件。
对于订阅了一个数据库(database)的 connector,dropDatabase event 会产生一个无效事件。
相似文档
  • 简介: MySQL 通过一种二进制日志(binlog)来按序记录所有提交给数据库的操作,包括对表结构的修改以及对表中数据的修改。MySQL 通过 binlog 来进行备份或恢复数据。
  • 简介: Debezium PostgreSQL connector 能够抓取 PostgreSQL 数据库中的行级(row-level)修改操作,并生成相应的修改事件。Debezium PostgreSQL connector 第一次连接 PostgreSQL 服务器时,会对所有数据库生成一个快照(snapshot),然后会持续的抓取提交给数据库的包括新增(insert)、更新(update)、删除(delete)在内的行级修改操作,并生成数据修改事件,将其作为消息提交给 Kafka 的相应 topic 。客户端应用可以通过消费对应 topic中的消息来对数据库修改事件进行处理,从而达到监控特定数据库的目的。
  • 概述: 使用 CKafka 连接器订阅 MySQL 的变更操作时,可选多种消息格式,默认采用 debezium 格式,同时提供了兼容其他消息格式的能力。本文介绍兼容 官方自定义格式 的消息格式说明。
  • 概述: 使用连接器订阅 MySQL 的变更操作时,可选多种消息格式,默认采用 debezium 格式,同时提供了兼容开源 canal 等格式的能力。本文介绍兼容 canal 的消息格式说明。
  • 背景: CKafka 连接器支持将订阅的多个 Mysql 数据库表的变更消息推送到 Kafka 的 Topic,有两种推送形式: 1. 支持将多个表的消息推送到同一个 Topic。 2. 支持将多个表的消息推送到不同的 Topic。
官方微信
联系客服
400-826-7010
7x24小时客服热线
分享
  • QQ好友
  • QQ空间
  • 微信
  • 微博
返回顶部