Docs Menu
Docs Home
/
MongoDB Kafka Connector
/

MongoDB Change Streams を探索

項目一覧

  • Change Streams探索
  • 概要
  • 詳細

このチュートリアルに従って、MongoDB コレクションに変更ストリームを作成し、それが作成する変更イベントを観察する方法を学びます。

1

Kafka Connector チュートリアル セットの手順を完了して、Confluent Kafka Connect と MongoDB 環境を起動します。

2

チュートリアルの Docker コンテナに 2 つの対話型 shell セッションを、それぞれ別のウィンドウで作成します。

対話型 shell を起動するには、ターミナルから次のコマンドを実行します。

docker exec -it mongo1 /bin/bash

このチュートリアル全体では、このインタラクティブ shell をChangeStreamShell 1として参照します。

対話型 shell を起動するには、2 つ目のターミナルで次のコマンドを実行します。

docker exec -it mongo1 /bin/bash

このチュートリアル全体では、このインタラクティブ shell をChangeStreamShell 2として参照します。

3

ChangeStreamShell 1では、PyMongo ドライバーを使用して変更ストリームを開くための Python スクリプトを作成します。

nano openchangestream.py

次のコードを ファイルに貼り付け、変更を保存します。

import pymongo
from bson.json_util import dumps
client = pymongo.MongoClient('mongodb://mongo1')
db = client.get_database(name='Tutorial1')
with db.orders.watch() as stream:
print('\nA change stream is open on the Tutorial1.orders namespace. Currently watching ...\n\n')
for change in stream:
print(dumps(change, indent = 2))

Python スクリプトを実行します。

python3 openchangestream.py

スクリプトは、正常に起動されると、次のメッセージを出力します。

Change Stream is opened on the Tutorial1.orders namespace. Currently watching ...
4

ChangeStreamShell2で、次のコマンドを使用して MongoDB に接続します: mongosh (MongoDB shell)を使用して MongoDB に接続します。

mongosh "mongodb://mongo1"

正常に接続すると、次の MongoDB shell プロンプトが表示されます。

rs0 [direct: primary] test>

プロンプトで、次のコマンドを入力します。

use Tutorial1
db.orders.insertOne( { 'test' : 1 } )

上記のコマンドを入力した後、 ChangeStreamShell 1に切り替えて変更ストリーム出力を表示します。出力は次のようになります。

{
"_id": {
"_data": "826264..."
},
"operationType": "insert",
"clusterTime": {
"$timestamp": {
"t": 1650754657,
"i": 1
}
},
"wallTime": {
"$date": "2022-10-13T17:06:23.409Z"
},
"fullDocument": {
"_id": {
"$oid": "<_id value of document>"
},
"test": 1
},
"ns": {
"db": "Tutorial1",
"coll": "orders"
},
"documentKey": {
"_id": {
"$oid": "<_id value of document>"
}
}
}

スクリプトを停止するには、 Ctrl+Cを押します。

この手順の最後で、変更ストリーム イベントを正常にトリガーして監視することができます。

5

集計パイプラインを渡すことで、変更ストリームにフィルターを適用できます。

ChangeStreamShell 1では、PyMongo ドライバーを使用してフィルタリングされた変更ストリームを開くための新しい Python スクリプトを作成します。

nano pipeline.py

次のコードを ファイルに貼り付け、変更を保存します。

import pymongo
from bson.json_util import dumps
client = pymongo.MongoClient('mongodb://mongo1')
db = client.get_database(name='Tutorial1')
pipeline = [ { "$match": { "$and": [ { "fullDocument.type": "temp" }, { "fullDocument.value": { "$gte": 100 } } ] } } ]
with db.sensors.watch(pipeline=pipeline) as stream:
print('\nChange Stream is opened on the Tutorial1.sensors namespace. Currently watching for values > 100...\n\n')
for change in stream:
print(dumps(change, indent = 2))

Python スクリプトを実行します。

python3 pipeline.py

スクリプトは、正常に起動されると、次のメッセージを出力します。

Change Stream is opened on the Tutorial1.sensors namespace. Currently watching for values > 100...
6

mongoshを使用して MongoDB に接続する必要があるChangeStreamShell 2セッションに戻ります。

プロンプトで、次のコマンドを入力します。

use Tutorial1
db.sensors.insertOne( { 'type' : 'temp', 'value':101 } )

スクリプト出力が示すように、変更ストリームは次のパイプラインと一致するため変更イベントを作成します。

[ { "$match": { "$and": [ { "fullDocument.type": "temp" }, { "fullDocument.value": { "$gte": 100 } } ] } } ]

ChangeStreamShell 2の に次のドキュメントを挿入して、ドキュメントがフィルターに一致する場合にのみ変更ストリームが生成することを確認します。

db.sensors.insertOne( { 'type' : 'temp', 'value': 99 } )
db.sensors.insertOne( { 'type' : 'pressure', 'value': 22 } )
7

このチュートリアルを完了したら、Docker アセットを停止または削除して、コンピューター上のリソースを解放します。 Docker コンテナとイメージの両方を削除することも、 コンテナのみを削除することも選択できます。 コンテナとイメージを削除した場合、サイズが約 2.4 GB の MongoDB Kafka Connector 開発環境を再起動するには、それらを再度ダウンロードする必要があります。 コンテナのみを削除すると、イメージを再利用し、サンプル データ パイプライン内の大きなファイルのほとんどをダウンロードしないでください。

Tip

その他のチュートリアル

MongoDB Kafka Connector の追加チュートリアルを完了する予定の場合は、コンテナのみを削除することを検討してください。 MongoDB Kafka Connector の追加チュートリアルを完了する予定がない場合は、コンテナとイメージの削除を検討してください。

実行する削除タスクに対応するタブを選択します。

次の shell コマンドを実行して、開発環境の Docker コンテナとイメージを削除します。

docker-compose -p mongo-kafka down --rmi all

次の shell コマンドを実行して、Docker コンテナを削除しますが、開発環境用のイメージは保持します。

docker-compose -p mongo-kafka down

コンテナを再起動するには、チュートリアル設定 でコンテナを起動するのに必要な手順と同じ手順に従います。

このチュートリアルでは、MongoDB に変更ストリームを作成し、その出力を確認しました。 MongoDB Kafka ソース コネクタは、構成した変更ストリームから変更イベントを読み取り、 Kafka トピックに書き込みます。

ソース コネクタの変更ストリームとKafkaトピックを構成する方法については、「 MongoDB KafkaソースConnectorを使い始める 」チュートリアルに進んでください。

このチュートリアルで述べられた概念の詳細については、次のリソースをお読みください。

  • Change StreamsとソースConnector

  • 変更ストリーム出力の修正

  • MongoDB Shell(mongosh)

戻る

チュートリアル設定