Docs 菜单
Docs 主页
/
MongoDB Kafka Connector
/ /

自定义管道以筛选变更事件

此用法示例演示了如何配置管道以自定义 MongoDB Kafka connector 使用的数据。管道是 MongoDB 聚合管道,由发送给数据库的用于过滤或转换数据的指令组成。

MongoDB会通知Connector与变更流上的聚合管道相匹配的数据变更。 变更流是描述客户端实时MongoDB 部署所做的数据更改的一系列事件。 有关更多信息,请参阅MongoDB服务器手册中有关 Change Streams的条目。

假设您正在协调一项事件,并希望收集参加特定事件的每位来宾的姓名和到达时间。 每当访客签入事件时,应用程序都会插入一份包含以下详细信息的新文档:

{
"_id": ObjectId(...),
"eventId": 321,
"name": "Dorothy Gale",
"arrivalTime": 2021-10-31T20:30:00.245Z
}

您可以定义您的connector pipeline设置,以指示change stream筛选事件信息,如下所示:

  • 为插入操作创建变更事件,并为所有其他类型的操作创建省略事件。

  • 仅为与fullDocument.eventId值“321”匹配的文档创建更改事件,并忽略所有其他文档。

  • 使用投影省略fullDocument对象中的_ideventId字段。

要应用这些转换,请将以下聚合管道分配给您的pipeline设置:

pipeline=[{"$match": { "$and": [{"operationType": "insert"}, { "fullDocument.eventId": 321 }] } }, {"$project": { "fullDocument._id": 0, "fullDocument.eventId": 0 } } ]

重要

确保管道的结果包含payload对象的顶级_idns字段。 MongoDB使用id作为恢复令牌的值,并使用ns生成Kafka输出主题名称。

当应用程序插入样本文档时,配置的connector会将以下记录发布到你的Kafka主题:

{
...
"payload": {
_id: { _data: ... },
"operationType": "insert",
"fullDocument": {
"name": "Dorothy Gale",
"arrivalTime": "2021-10-31T20:30:00.245Z",
},
"ns": { ... },
"documentKey": {
_id: {"$oid": ... }
}
}
}

有关使用源Connector管理变更流的更多信息,请参阅有关Change Streams 的Connector文档。

后退

使用示例