MongoDB Change Streams を探索
このチュートリアルに従って、MongoDB コレクションに変更ストリームを作成し、それが作成する変更イベントを観察する方法を学びます。
Change Streams探索
チュートリアル設定を完了する
Kafka Connector チュートリアル セットの手順を完了して、Confluent Kafka Connect と MongoDB 環境を起動します。
Docker コンテナへの接続
チュートリアルの Docker コンテナに 2 つの対話型 shell セッションを、それぞれ別のウィンドウで作成します。
対話型 shell を起動するには、ターミナルから次のコマンドを実行します。
docker exec -it mongo1 /bin/bash
このチュートリアル全体では、このインタラクティブ shell をChangeStreamShell 1として参照します。
対話型 shell を起動するには、2 つ目のターミナルで次のコマンドを実行します。
docker exec -it mongo1 /bin/bash
このチュートリアル全体では、このインタラクティブ shell をChangeStreamShell 2として参照します。
変更ストリームを開く
ChangeStreamShell 1では、PyMongo ドライバーを使用して変更ストリームを開くための Python スクリプトを作成します。
nano openchangestream.py
次のコードを ファイルに貼り付け、変更を保存します。
import pymongo from bson.json_util import dumps client = pymongo.MongoClient('mongodb://mongo1') db = client.get_database(name='Tutorial1') with db.orders.watch() as stream: print('\nA change stream is open on the Tutorial1.orders namespace. Currently watching ...\n\n') for change in stream: print(dumps(change, indent = 2))
Python スクリプトを実行します。
python3 openchangestream.py
スクリプトは、正常に起動されると、次のメッセージを出力します。
Change Stream is opened on the Tutorial1.orders namespace. Currently watching ...
変更イベントのtrigger
ChangeStreamShell2で、次のコマンドを使用して MongoDB に接続します: mongosh
(MongoDB shell)を使用して MongoDB に接続します。
mongosh "mongodb://mongo1"
正常に接続すると、次の MongoDB shell プロンプトが表示されます。
rs0 [direct: primary] test>
プロンプトで、次のコマンドを入力します。
use Tutorial1 db.orders.insertOne( { 'test' : 1 } )
上記のコマンドを入力した後、 ChangeStreamShell 1に切り替えて変更ストリーム出力を表示します。出力は次のようになります。
{ "_id": { "_data": "826264..." }, "operationType": "insert", "clusterTime": { "$timestamp": { "t": 1650754657, "i": 1 } }, "wallTime": { "$date": "2022-10-13T17:06:23.409Z" }, "fullDocument": { "_id": { "$oid": "<_id value of document>" }, "test": 1 }, "ns": { "db": "Tutorial1", "coll": "orders" }, "documentKey": { "_id": { "$oid": "<_id value of document>" } } }
スクリプトを停止するには、 Ctrl+C
を押します。
この手順の最後で、変更ストリーム イベントを正常にトリガーして監視することができます。
フィルタリングされた変更ストリームを開く
集計パイプラインを渡すことで、変更ストリームにフィルターを適用できます。
ChangeStreamShell 1では、PyMongo ドライバーを使用してフィルタリングされた変更ストリームを開くための新しい Python スクリプトを作成します。
nano pipeline.py
次のコードを ファイルに貼り付け、変更を保存します。
import pymongo from bson.json_util import dumps client = pymongo.MongoClient('mongodb://mongo1') db = client.get_database(name='Tutorial1') pipeline = [ { "$match": { "$and": [ { "fullDocument.type": "temp" }, { "fullDocument.value": { "$gte": 100 } } ] } } ] with db.sensors.watch(pipeline=pipeline) as stream: print('\nChange Stream is opened on the Tutorial1.sensors namespace. Currently watching for values > 100...\n\n') for change in stream: print(dumps(change, indent = 2))
Python スクリプトを実行します。
python3 pipeline.py
スクリプトは、正常に起動されると、次のメッセージを出力します。
Change Stream is opened on the Tutorial1.sensors namespace. Currently watching for values > 100...
フィルタリングされた変更ストリームの観察
mongosh
を使用して MongoDB に接続する必要があるChangeStreamShell 2セッションに戻ります。
プロンプトで、次のコマンドを入力します。
use Tutorial1 db.sensors.insertOne( { 'type' : 'temp', 'value':101 } )
スクリプト出力が示すように、変更ストリームは次のパイプラインと一致するため変更イベントを作成します。
[ { "$match": { "$and": [ { "fullDocument.type": "temp" }, { "fullDocument.value": { "$gte": 100 } } ] } } ]
ChangeStreamShell 2の に次のドキュメントを挿入して、ドキュメントがフィルターに一致する場合にのみ変更ストリームが生成することを確認します。
db.sensors.insertOne( { 'type' : 'temp', 'value': 99 } ) db.sensors.insertOne( { 'type' : 'pressure', 'value': 22 } )
(任意)Docker コンテナを停止します
このチュートリアルを完了したら、Docker アセットを停止または削除して、コンピューター上のリソースを解放します。 Docker コンテナとイメージの両方を削除することも、 コンテナのみを削除することも選択できます。 コンテナとイメージを削除した場合、サイズが約 2.4 GB の MongoDB Kafka Connector 開発環境を再起動するには、それらを再度ダウンロードする必要があります。 コンテナのみを削除すると、イメージを再利用し、サンプル データ パイプライン内の大きなファイルのほとんどをダウンロードしないでください。
Tip
その他のチュートリアル
MongoDB Kafka Connector の追加チュートリアルを完了する予定の場合は、コンテナのみを削除することを検討してください。 MongoDB Kafka Connector の追加チュートリアルを完了する予定がない場合は、コンテナとイメージの削除を検討してください。
実行する削除タスクに対応するタブを選択します。
次の shell コマンドを実行して、開発環境の Docker コンテナとイメージを削除します。
docker-compose -p mongo-kafka down --rmi all
次の shell コマンドを実行して、Docker コンテナを削除しますが、開発環境用のイメージは保持します。
docker-compose -p mongo-kafka down
コンテナを再起動するには、チュートリアル設定 でコンテナを起動するのに必要な手順と同じ手順に従います。
概要
このチュートリアルでは、MongoDB に変更ストリームを作成し、その出力を確認しました。 MongoDB Kafka ソース コネクタは、構成した変更ストリームから変更イベントを読み取り、 Kafka トピックに書き込みます。
ソース コネクタの変更ストリームとKafkaトピックを構成する方法については、「 MongoDB KafkaソースConnectorを使い始める 」チュートリアルに進んでください。
詳細
このチュートリアルで述べられた概念の詳細については、次のリソースをお読みください。