自定义管道以筛选变更事件
此用法示例演示了如何配置管道以自定义 MongoDB Kafka connector 使用的数据。管道是 MongoDB 聚合管道,由发送给数据库的用于过滤或转换数据的指令组成。
MongoDB会通知Connector与变更流上的聚合管道相匹配的数据变更。 变更流是描述客户端实时MongoDB 部署所做的数据更改的一系列事件。 有关更多信息,请参阅MongoDB服务器手册中有关 Change Streams的条目。
例子
假设您正在协调一项事件,并希望收集参加特定事件的每位来宾的姓名和到达时间。 每当访客签入事件时,应用程序都会插入一份包含以下详细信息的新文档:
{ "_id": ObjectId(...), "eventId": 321, "name": "Dorothy Gale", "arrivalTime": 2021-10-31T20:30:00.245Z }
您可以定义您的connector pipeline
设置,以指示change stream筛选事件信息,如下所示:
为插入操作创建变更事件,并为所有其他类型的操作创建省略事件。
仅为与
fullDocument.eventId
值“321”匹配的文档创建更改事件,并忽略所有其他文档。使用投影省略
fullDocument
对象中的_id
和eventId
字段。
要应用这些转换,请将以下聚合管道分配给您的pipeline
设置:
pipeline=[{"$match": { "$and": [{"operationType": "insert"}, { "fullDocument.eventId": 321 }] } }, {"$project": { "fullDocument._id": 0, "fullDocument.eventId": 0 } } ]
重要
确保管道的结果包含payload
对象的顶级_id
和ns
字段。 MongoDB使用id
作为恢复令牌的值,并使用ns
生成Kafka输出主题名称。
当应用程序插入样本文档时,配置的connector会将以下记录发布到你的Kafka主题:
{ ... "payload": { _id: { _data: ... }, "operationType": "insert", "fullDocument": { "name": "Dorothy Gale", "arrivalTime": "2021-10-31T20:30:00.245Z", }, "ns": { ... }, "documentKey": { _id: {"$oid": ... } } } }
有关使用源Connector管理变更流的更多信息,请参阅有关Change Streams 的Connector文档。