Docs 菜单
Docs 主页
/
MongoDB Kafka Connector
/

深入了解 MongoDB 变更流

在此页面上

  • 探索Change Streams
  • 总结
  • 了解详情

按照本教程学习如何在 MongoDB 集合上创建变更流并观察其创建的变更事件。

1

完成 Kafka Connector教程设置中的步骤,启动 Confluence Kafka Connect 和MongoDB环境。

2

在教程 容器上创建两个交互式shellDocker 会话,每个会话在一个单独的窗口中。

从终端运行以下命令,启动交互式 Shell。

docker exec -it mongo1 /bin/bash

在本教程中,我们将把这个交互式shell称为ChangeStreamShell1

在第二个终端运行以下命令,启动交互式 Shell:

docker exec -it mongo1 /bin/bash

在本教程中,我们将把这个交互式shell称为ChangeStreamShell2

3

ChangeStreamShell 1中,创建 Python 脚本以使用 PyMongo 驱动程序打开变更流。

nano openchangestream.py

将以下代码粘贴到文件中并保存更改:

import pymongo
from bson.json_util import dumps
client = pymongo.MongoClient('mongodb://mongo1')
db = client.get_database(name='Tutorial1')
with db.orders.watch() as stream:
print('\nA change stream is open on the Tutorial1.orders namespace. Currently watching ...\n\n')
for change in stream:
print(dumps(change, indent = 2))

运行 Python 脚本:

python3 openchangestream.py

成功启动后,脚本输出以下消息:

Change Stream is opened on the Tutorial1.orders namespace. Currently watching ...
4

ChangeStreamShell2中,使用 mongosh (MongoDB Shell)通过以下命令连接到 MongoDB:

mongosh "mongodb://mongo1"

连接成功后,您应看到以下 MongoDB shell 提示:

rs0 [direct: primary] test>

在提示符下,键入以下命令:

use Tutorial1
db.orders.insertOne( { 'test' : 1 } )

输入上述命令后,切换到ChangeStreamShell 1以查看变更流输出,该输出应类似于以下内容:

{
"_id": {
"_data": "826264..."
},
"operationType": "insert",
"clusterTime": {
"$timestamp": {
"t": 1650754657,
"i": 1
}
},
"wallTime": {
"$date": "2022-10-13T17:06:23.409Z"
},
"fullDocument": {
"_id": {
"$oid": "<_id value of document>"
},
"test": 1
},
"ns": {
"db": "Tutorial1",
"coll": "orders"
},
"documentKey": {
"_id": {
"$oid": "<_id value of document>"
}
}
}

要停止脚本,请按 Ctrl+C

在此步骤结束时,您已成功触发并观察到变更流事件。

5

您可以通过向变更流传递聚合管道来应用过滤应用于变更流。

ChangeStreamShell 1中,创建一个新的 Python 脚本以使用 PyMongo 驱动程序打开筛选后的变更流。

nano pipeline.py

将以下代码粘贴到文件中并保存更改:

import pymongo
from bson.json_util import dumps
client = pymongo.MongoClient('mongodb://mongo1')
db = client.get_database(name='Tutorial1')
pipeline = [ { "$match": { "$and": [ { "fullDocument.type": "temp" }, { "fullDocument.value": { "$gte": 100 } } ] } } ]
with db.sensors.watch(pipeline=pipeline) as stream:
print('\nChange Stream is opened on the Tutorial1.sensors namespace. Currently watching for values > 100...\n\n')
for change in stream:
print(dumps(change, indent = 2))

运行 Python 脚本:

python3 pipeline.py

成功启动后,脚本输出以下消息:

Change Stream is opened on the Tutorial1.sensors namespace. Currently watching for values > 100...
6

返回ChangeStreamShell 2会话,该会话应使用mongosh连接到MongoDB 。

在提示符下,键入以下命令:

use Tutorial1
db.sensors.insertOne( { 'type' : 'temp', 'value':101 } )

如脚本输出所示,变更流会创建变更事件,因为它与以下管道匹配:

[ { "$match": { "$and": [ { "fullDocument.type": "temp" }, { "fullDocument.value": { "$gte": 100 } } ] } } ]

尝试在ChangeStreamShell 2中插入以下文档,以验证变更流仅在文档与过滤匹配时才生成事件:

db.sensors.insertOne( { 'type' : 'temp', 'value': 99 } )
db.sensors.insertOne( { 'type' : 'pressure', 'value': 22 } )
7

完成本教程后,通过停止或删除 Docker 资产来释放计算机上的资源。您可以选择同时删除 Docker 容器和映像,或仅删除容器。如果删除容器和映像,则必须重新下载它们以重新启动 MongoDB Kafka Connector 开发环境,该环境大小约为 2.4 GB。如果您仅删除容器,则可以重复使用图像,并避免下载样本数据管道中的大多数大文件。

提示

更多教程

如果您计划完成更多 MongoDB Kafka Connector 教程,请考虑仅删除容器。如果您不打算完成更多 MongoDB Kafka Connector 教程,请考虑删除容器和映像。

选择与要运行的删除任务对应的选项卡。

运行以下 shell 命令删除开发环境中的 Docker 容器和镜像:

docker-compose -p mongo-kafka down --rmi all

运行以下 shell 命令以删除 Docker 容器,但保留开发环境的映像:

docker-compose -p mongo-kafka down

要重新启动容器,请按照教程设置中启动容器所需的相同步骤进行操作。

在本教程中,您在 MongoDB 上创建了变更流并观察了输出。 MongoDB Kafka Source connector从您配置的变更流中读取变更事件,并将其写入Kafka主题。

要了解如何为源 配置变更流和Kafka connector主题,请继续 学习MongoDBKafka 源 入门connector 教程。

阅读以下资源,详细了解本教程中提到的概念:

  • Change Streams和 Source connector

  • 修改变更流输出

  • MongoDB Shell (mongosh)

后退

教程设置