腾讯云消息队列 CKafka - MongoDB 数据订阅
文档简介:
简介:
MongoDB Kafka Connector 允许监控一个 Mongo 实例内的所有数据库(database)或单个数据库,也允许监控某个数据库内的所有集合(collection)或单个集合。将 Mongo 的修改信息生成修改事件消息,以消息流的方式提交给 kafka 的 topic。客户端应用可以通过消费对应 topic 中的消息来对数据库修改事件进行处理,从而达到监控特定数据库的目的。
简介
MongoDB Kafka Connector 允许监控一个 Mongo 实例内的所有数据库(database)或单个数据库,也允许监控某个数据库内的所有集合(collection)或单个集合。将 Mongo 的修改信息生成修改事件消息,以消息流的方式提交给 kafka 的 topic。客户端应用可以通过消费对应 topic 中的消息来对数据库修改事件进行处理,从而达到监控特定数据库的目的。
本文档是对 Mongo 官方文档的归纳和整理,详情参见 MongoDB Change Events。
事件格式
以下 JSON 框架展示了所有修改事件消息中可能出现的字段:
{_id : { <BSON Object> },"operationType" : "<operation>","fullDocument" : { <document> },"ns" : {"db" : "<database>","coll" : "<collection>"},"to" : {"db" : "<database>","coll" : "<collection>"},"documentKey" : { "_id" : <value> },"updateDescription" : {"updatedFields" : { <document> },"removedFields" : [ "<field>", ... ],"truncatedArrays" : [{ "field" : <field>, "newSize" : <integer> },...]},"clusterTime" : <Timestamp>,"txnNumber" : <NumberLong>,"lsid" : {"id" : <UUID>,"uid" : <BinData>}}
其中部分字段可能只在特定的事件类型中才会出现,下表对相应字段及其含义进行了描述。
Field
|
Type
|
Description
|
_id
|
document
|
一个用来唯一标识事件的 BSON 对象。 _id 对象的格式如下:{ "_data" : <BinData|hex string>} 。_data 的类型取决于 MongoDB 的版本 ,可通过 Resume Tokens 查看完整的_data类型介绍。
|
operationType
|
string
|
触发修改事件的操作类型,具体包括以下 8 种:insertdeletereplaceupdatedroprenamedropDatabaseinvalidate
|
fullDocument
|
document
|
表示被新增( insert), 替换(replace), 删除(delete), 更新(update )操作所影响的文档。对于 insert 和 replace 操作,该字段表示新增的文档。对于 delete 操作,该字段缺省表示文档已经不存在。对于 update 操作,只有配置了 fullDocument 为 updateLookup 时才会显示。
|
ns
|
document
|
命名空间(namespace),由 database 和 collection 构成。
|
ns.db
|
string
|
数据库名称。
|
ns.coll
|
string
|
集合名称。对于 dropDatabase 操作,该字段缺省。
|
to
|
document
|
当操作类型为 rename 时,表示新的集合名称。该字段对其他操作是缺省的。
|
to.db
|
string
|
新的数据库的名称。
|
to.coll
|
string
|
新的集合名称。
|
documentKey
|
document
|
操作修改的文档的 ID。
|
updateDescription
|
document
|
一个用来描述被更新操作(update operation)修改的字段的文档。该字段仅当事件对应的操作为 update 时才有。
|
updateDescription.updatedFields
|
document
|
包含被更新操作修改的字段,字段的 value 值为更新后的值。
|
updateDescription.removedFields
|
array
|
包含被更新操作删除的字段。
|
updateDescription.truncatedArrays
|
array
|
其中记录了使用以下一个或多个基于 pipeline 的更新执行的数组截断:$addFields $set $replaceRoot $replaceWith
|
updateDescription.truncatedArrays.field
|
string
|
被删除的字段。
|
updateDescription.truncatedArrays.newSize
|
integer
|
truncated array 中的元素个数。
|
clusterTime
|
Timestamp
|
oplog 与事件关联的时间戳。对于涉及 多文档事务, 关联的事件的 clusterTime 值是相同的。
|
txnNumber
|
NumberLong
|
事务 ID。仅当操作是 多文档事务 时出现。
|
lsid
|
Document
|
与事务关联的 session 的 ID,仅当操作是 多文档事务 时出现。
|
事件列表
新增事件(insert event)
{_id: { < Resume Token > },operationType: 'insert',clusterTime: <Timestamp>,ns: {db: 'engineering',coll: 'users'},documentKey: {userName: 'alice123',_id: ObjectId("599af247bb69cd8996xxxxxx")},fullDocument: {_id: ObjectId("599af247bb69cd8996xxxxxx"),userName: 'alice123',name: 'Alice'}}
其中 documentKey 字段同时包含了 _id 和 username 字段。表示 engineering.users 集合是分片的,shard key 为 username 和 _id。
更新事件(update event)
{_id: { < Resume Token > },operationType: 'update',clusterTime: <Timestamp>,ns: {db: 'engineering',coll: 'users'},documentKey: {_id: ObjectId("58a4eb4a30c75625e0xxxxxx")},updateDescription: {updatedFields: {email: 'alice@10gen.com'},removedFields: ['phoneNumber'],truncatedArrays: [ {"field" : "vacation_time","newSize" : 36} ]}}
以下例子展示了 update event 配置了 fullDocument : updateLookup 选项的消息内容:
{_id: { < Resume Token > },operationType: 'update',clusterTime: <Timestamp>,ns: {db: 'engineering',coll: 'users'},documentKey: {_id: ObjectId("58a4eb4a30c75625e0xxxxxx")},updateDescription: {updatedFields: {email: 'alice@10gen.com'},removedFields: ['phoneNumber'],truncatedArrays: [ {"field" : "vacation_time","newSize" : 36} ]},fullDocument: {_id: ObjectId("58a4eb4a30c75625e0xxxxxx"),name: 'Alice',userName: 'alice123',email: 'alice@10gen.com',team: 'replication'}}
替换事件(replace event)
{_id: { < Resume Token > },operationType: 'replace',clusterTime: <Timestamp>,ns: {db: 'engineering',coll: 'users'},documentKey: {_id: ObjectId("599af247bb69cd8996xxxxxx")},fullDocument: {_id: ObjectId("599af247bb69cd8996xxxxxx"),userName: 'alice123',name: 'Alice'}}
replace 操作是通过两步操作实现的:
删除原 documentKey 对应的文档
根据一样的 documentkey插入新的文档
基于 replace 事件的 fullDocument 字段表示的是插入后的新文档。
删文档事件(delete event)
{_id: { < Resume Token > },operationType: 'delete',clusterTime: <Timestamp>,ns: {db: 'engineering',coll: 'users'},documentKey: {_id: ObjectId("599af247bb69cd8996xxxxxx")}}
对于删除文档事件的消息,fullDocument 字段缺省。
删集合事件(drop event)
{_id: { < Resume Token > },operationType: 'drop',clusterTime: <Timestamp>,ns: {db: 'engineering',coll: 'users'}}
当一个集合被删除时会触发该事件,同时会导致订阅了该集合的 connector 产生一个无效事件(invalidate event)。
改名事件(rename event)
{_id: { < Resume Token > },operationType: 'rename',clusterTime: <Timestamp>,ns: {db: 'engineering',coll: 'users'},to: {db: 'engineering',coll: 'people'}}
当一个集合名称被更改时会触发该事件,同时会导致订阅了该集合的 connector 产生一个无效事件(invalidate event)。
删库事件(drop database event)
{_id: { < Resume Token > },operationType: 'dropDatabase',clusterTime: <Timestamp>,ns: {db: 'engineering'}}
当一个数据库被删除时会触发该事件,同时会导致订阅了该集合的 connector 产生一个无效事件(invalidate event)。
在生成数据库删除事件(dropDatabase)之前,会为数据库中的每一个集合生成一个集合删除事件(drop event)。
无效事件(invalidate event)
{_id: { < Resume Token > },operationType: 'invalidate',clusterTime: <Timestamp>}
对于订阅了一个集合(collection)的 connector,drop event,rename event 或 dropDatabase event 这类会对该集合产生影响的事件都会产生一个无效事件。
对于订阅了一个数据库(database)的 connector,dropDatabase event 会产生一个无效事件。