深入了解 MongoDB 变更流
按照本教程学习如何在 MongoDB 集合上创建变更流并观察其创建的变更事件。
探索Change Streams
完成教程设置
完成 Kafka Connector教程设置中的步骤,启动 Confluence Kafka Connect 和MongoDB环境。
打开变更流
在ChangeStreamShell 1中,创建 Python 脚本以使用 PyMongo 驱动程序打开变更流。
nano openchangestream.py
将以下代码粘贴到文件中并保存更改:
import pymongo from bson.json_util import dumps client = pymongo.MongoClient('mongodb://mongo1') db = client.get_database(name='Tutorial1') with db.orders.watch() as stream: print('\nA change stream is open on the Tutorial1.orders namespace. Currently watching ...\n\n') for change in stream: print(dumps(change, indent = 2))
运行 Python 脚本:
python3 openchangestream.py
成功启动后,脚本输出以下消息:
Change Stream is opened on the Tutorial1.orders namespace. Currently watching ...
trigger Change Event
在ChangeStreamShell2中,使用 mongosh
(MongoDB Shell)通过以下命令连接到 MongoDB:
mongosh "mongodb://mongo1"
连接成功后,您应看到以下 MongoDB shell 提示:
rs0 [direct: primary] test>
在提示符下,键入以下命令:
use Tutorial1 db.orders.insertOne( { 'test' : 1 } )
输入上述命令后,切换到ChangeStreamShell 1以查看变更流输出,该输出应类似于以下内容:
{ "_id": { "_data": "826264..." }, "operationType": "insert", "clusterTime": { "$timestamp": { "t": 1650754657, "i": 1 } }, "wallTime": { "$date": "2022-10-13T17:06:23.409Z" }, "fullDocument": { "_id": { "$oid": "<_id value of document>" }, "test": 1 }, "ns": { "db": "Tutorial1", "coll": "orders" }, "documentKey": { "_id": { "$oid": "<_id value of document>" } } }
要停止脚本,请按 Ctrl+C
。
在此步骤结束时,您已成功触发并观察到变更流事件。
打开已筛选的变更流
您可以通过向变更流传递聚合管道来应用过滤应用于变更流。
在ChangeStreamShell 1中,创建一个新的 Python 脚本以使用 PyMongo 驱动程序打开筛选后的变更流。
nano pipeline.py
将以下代码粘贴到文件中并保存更改:
import pymongo from bson.json_util import dumps client = pymongo.MongoClient('mongodb://mongo1') db = client.get_database(name='Tutorial1') pipeline = [ { "$match": { "$and": [ { "fullDocument.type": "temp" }, { "fullDocument.value": { "$gte": 100 } } ] } } ] with db.sensors.watch(pipeline=pipeline) as stream: print('\nChange Stream is opened on the Tutorial1.sensors namespace. Currently watching for values > 100...\n\n') for change in stream: print(dumps(change, indent = 2))
运行 Python 脚本:
python3 pipeline.py
成功启动后,脚本输出以下消息:
Change Stream is opened on the Tutorial1.sensors namespace. Currently watching for values > 100...
观察筛选的变更流
返回ChangeStreamShell 2会话,该会话应使用mongosh
连接到MongoDB 。
在提示符下,键入以下命令:
use Tutorial1 db.sensors.insertOne( { 'type' : 'temp', 'value':101 } )
如脚本输出所示,变更流会创建变更事件,因为它与以下管道匹配:
[ { "$match": { "$and": [ { "fullDocument.type": "temp" }, { "fullDocument.value": { "$gte": 100 } } ] } } ]
尝试在ChangeStreamShell 2中插入以下文档,以验证变更流仅在文档与过滤匹配时才生成事件:
db.sensors.insertOne( { 'type' : 'temp', 'value': 99 } ) db.sensors.insertOne( { 'type' : 'pressure', 'value': 22 } )
(可选)停止 Docker 容器
完成本教程后,通过停止或删除 Docker 资产来释放计算机上的资源。您可以选择同时删除 Docker 容器和映像,或仅删除容器。如果删除容器和映像,则必须重新下载它们以重新启动 MongoDB Kafka Connector 开发环境,该环境大小约为 2.4 GB。如果您仅删除容器,则可以重复使用图像,并避免下载样本数据管道中的大多数大文件。
提示
更多教程
如果您计划完成更多 MongoDB Kafka Connector 教程,请考虑仅删除容器。如果您不打算完成更多 MongoDB Kafka Connector 教程,请考虑删除容器和映像。
选择与要运行的删除任务对应的选项卡。
运行以下 shell 命令删除开发环境中的 Docker 容器和镜像:
docker-compose -p mongo-kafka down --rmi all
运行以下 shell 命令以删除 Docker 容器,但保留开发环境的映像:
docker-compose -p mongo-kafka down
要重新启动容器,请按照教程设置中启动容器所需的相同步骤进行操作。
总结
在本教程中,您在 MongoDB 上创建了变更流并观察了输出。 MongoDB Kafka Source connector从您配置的变更流中读取变更事件,并将其写入Kafka主题。
要了解如何为源 配置变更流和Kafka connector主题,请继续 学习MongoDBKafka 源 入门connector 教程。
了解详情
阅读以下资源,详细了解本教程中提到的概念: