オープンChange Streams
項目一覧
Overview
このガイドでは、変更ストリームを使用してデータに対するリアルタイムの変更を監視する方法を学習できます。 変更ストリームは、アプリケーションが単一のコレクション、データベース、または配置のデータ変更をサブスクライブできる MongoDB Server の機能です。
変更ストリームを開くと、次のアクションの実行に役立ちます。
動作を検出するデバイスなど、イベントの追跡とReactを有効にする
株価の変化を監視するアプリケーションを作成する
特定のジョブ 位置の従業員のアクティビティのログを生成
アプリケーションが受信したデータをフィルタリングして変換するための集計演算子のセットを指定できます。 MongoDB 配置 v6.0 以降に接続すると、変更前と変更後のドキュメント データを含むようにイベントを構成することもできます。
変更ストリームを開いて構成する方法については、次のセクションに移動します。
サンプル データ
このガイドの例では、 directors
コレクションの変更を監視します。 このコレクションには、 構造体としてモデル化された次のドキュメントが含まれているとします。
let docs = vec! [ Director { name: "Todd Haynes".to_string(), movies: vec! ["Far From Heaven".to_string(), "Carol".to_string()], oscar_noms: 1, }, Director { name: "Jane Campion".to_string(), movies: vec! ["The Piano".to_string(), "Bright Star".to_string()], oscar_noms: 5, } ];
Tip
コレクションにドキュメントを挿入する方法については、「 ドキュメントの挿入 」ガイドを参照してください。
変更ストリームを開く
変更ストリームを開いて、特定のタイプのデータ変更をサブスクライブし、アプリケーション内で変更イベントを生成できます。
変更ストリームを開くには、 Collection
、 Database
、またはClient
インスタンスでwatch()
メソッドを呼び出します。
重要
スタンドアロンの MongoDB 配置では、この機能にはレプリカセットの oplog が必要なため、変更ストリームはサポートされていません。 oplogの詳細については、サーバー マニュアルの 「レプリカセットoplog 」 ページ を参照してください。
watch()
メソッドを呼び出す構造体によって、変更ストリームがリッスンするイベントの範囲が決まります。 次の表は、呼び出しオブジェクトに基づくwatch()
メソッドの動作を説明したものです。
構造体タイプ | の動作 watch() |
---|---|
| Monitors changes to the individual collection |
| Monitors changes to all collections in the database |
| Monitors all changes in the connected MongoDB deployment |
例
次の例では、 directors
コレクションの変更ストリームを開きます。 このコードは、各 インスタンスの フィールドと フィールドにアクセスすることで、各変更イベントの操作タイプと変更されたドキュメントを出力します。operation_type
full_document
ChangeStreamEvent
let mut change_stream = my_coll.watch().await?; while let Some(event) = change_stream.next().await.transpose()? { println!("Operation performed: {:?}", event.operation_type); println!("Document: {:?}", event.full_document); }
Tip
ChangeStreamEvent
構造体フィールドの完全なリストについては、 APIドキュメントの「 ChangeStreamEvent 」を参照してください。
name
値が"Wes Anderson"
であるドキュメントをdirectors
コレクションに挿入すると、上記のコードによって次の出力が生成されます。
Operation performed: Insert Document: Some(Director { name: "Wes Anderson", movies: [...], oscar_noms: 7 })
変更ストリームへの集計演算子の適用
pipeline()
メソッドとwatch()
メソッドを連鎖させて、変更ストリームが受信する変更イベントを指定できます。 集計パイプラインをパラメータとしてpipeline()
に渡します。
MongoDB Server のバージョンがサポートする集計演算子については、サーバー マニュアルの「変更ストリーム出力の変更 」を参照してください。
例
次の例では、アップデート操作でフィルタリングするための集計パイプラインを作成します。 次に、コードはパイプラインをpipeline()
メソッドに渡し、アップデート操作用の変更イベントのみを受信して出力するように変更ストリームを構成します。
let mut update_change_stream = my_coll.watch() .pipeline(vec![doc! { "$match" : doc! { "operationType" : "update" } }]) .await?; while let Some(event) = update_change_stream.next().await.transpose()? { println!("Update performed: {:?}", event.update_description); }
oscar_noms
フィールドの値を増やして、 name
値が"Todd Haynes"
であるドキュメントを更新すると、上記のコードによって次の出力が生成されます。
Update performed: Some(UpdateDescription { updated_fields: Document({ "oscar_noms": Int64(2)}), removed_fields: [], truncated_arrays: Some([]) })
Tip
アップデート操作の実行方法については、「ドキュメントの修正 」ガイドを参照してください。
変更前と変更後のイメージを含めます
変更イベントは、以下のデータを含めるか省略するように構成できます。
変更前のイメージ: 操作前のドキュメントのバージョンを表すドキュメント(存在する場合)
変更後イメージ: 操作後のドキュメントのバージョンを表すドキュメント(存在する場合)
重要
配置で MongoDB v6.0 以降が使用されている場合にのみ、コレクションで変更前と変更後のイメージを有効にできます。
変更前イメージまたは変更後イメージを含む変更ストリーム イベントを受信するには、次のアクションを実行する必要があります。
MongoDB 配置のコレクションの変更前と変更後のイメージを有効にします。
Tip
配置で変更前と変更後のイメージを有効にする方法については、サーバー マニュアルの「 Change Streams とドキュメントの変更前イメージおよび変更後イメージ 」を参照してください。
変更前イメージと変更後イメージが有効になっているコレクションを作成するようにドライバーに指示する方法については、このページの「 変更前イメージと変更後イメージが有効になっているコレクションの作成」セクションを参照してください。
変更ストリームを設定して、変更前のイメージと変更後のイメージのどちらかを取得します。 この構成中に、変更前と変更後のイメージを要求するようにドライバーに指示したり、利用可能な場合にのみ含めるようにドライバーに指示したりできます。
Tip
変更イベントに変更前のイメージを記録するように変更ストリームを構成する方法について詳しくは、このページの「 変更前のイメージの構成例」を参照してください。
変更イベントで変更後のイメージを記録するように変更ストリームを構成するには、このページの「 変更後のイメージの構成例」を参照してください。
変更前と変更後のイメージが有効になっているコレクションの作成
コレクションでイメージ前ドキュメントとイメージ後ドキュメントを有効にするには、 change_stream_pre_and_post_images()
オプション ビルダー メソッドを使用します。 次の例では、このビルダ メソッドを使用してコレクション オプションを指定し、変更前と変更後のイメージが利用可能なコレクションを作成します。
let enable = ChangeStreamPreAndPostImages::builder().enabled(true).build(); let result = my_db.create_collection("directors") .change_stream_pre_and_post_images(enable) .await?;
MongoDB Shell またはアプリケーションからcollMod
コマンドを実行することで、既存のコレクション内の変更前と変更後のイメージのオプションを変更できます。 この操作を実行する方法については、サーバー マニュアルの 「コマンドの実行」 ガイドとcollModに関する エントリを参照してください。
警告
コレクションで変更前イメージまたは変更後イメージを有効にした場合、 collMod
を使用してこれらの設定を変更すると、そのコレクションの既存の変更ストリームが終了する可能性があります。
変更前のイメージ構成例
変更前イメージを含む変更イベントを返す変更ストリームを構成するには、 full_document_before_change()
オプション ビルダー メソッドを使用します。 次の例では、変更ストリーム オプションを指定し、イメージ前ドキュメントを返す変更ストリームを作成します。
let pre_image = FullDocumentBeforeChangeType::Required; let mut change_stream = my_coll.watch() .full_document_before_change(pre_image) .await?; while let Some(event) = change_stream.next().await.transpose()? { println!("Operation performed: {:?}", event.operation_type); println!("Pre-image: {:?}", event.full_document_before_change); }
上記の例では、 FullDocumentBeforeChangeType::Required
の値をfull_document_before_change()
オプション ビルダ メソッドに渡します。 この方法では、変更イベントの置換、アップデート、削除に変更前のイメージが必要になるように変更ストリームが構成されます。 変更前のイメージが利用できない場合、ドライバーはエラーを発生させます。
name
の値が"Jane Campion"
であるドキュメントを更新すると、変更イベントによって次の出力が生成されます。
Operation performed: Update Pre-image: Some(Director { name: "Jane Campion", movies: ["The Piano", "Bright Star"], oscar_noms: 5 })
イメージ後構成の例
変更後イメージを含む変更イベントを返す変更ストリームを構成するには、 full_document()
オプション ビルダー メソッドを使用します。 次の例では、ストリーム オプションを指定し、イメージ後ドキュメントを返す変更ストリームを作成します。
let post_image = FullDocumentType::WhenAvailable; let mut change_stream = my_coll.watch() .full_document(post_image) .await?; while let Some(event) = change_stream.next().await.transpose()? { println!("Operation performed: {:?}", event.operation_type); println!("Post-image: {:?}", event.full_document); }
上記の例では、 FullDocument::WhenAvailable
の値をfull_document()
オプション ビルダ メソッドに渡します。 このメソッドは、変更後のイメージが利用可能な場合、置換、アップデート、削除の変更イベントの変更後のイメージを返すように変更ストリームを構成します。
name
の値が"Todd Haynes"
であるドキュメントを削除すると、変更イベントによって次の出力が生成されます。
Operation performed: Delete Post-image: None
詳細情報
API ドキュメント
このガイドで言及されているメソッドや型の詳細については、以下のAPIドキュメントを参照してください。