Docs Menu
Docs Home
/ / /
C#/.NET
/ / /

データの変更を監視

項目一覧

  • Overview
  • サンプル データ
  • 変更ストリームを開く
  • 変更ストリーム出力の変更
  • Watch()の動作を変更
  • 変更前と変更後のイメージを含めます
  • 詳細情報
  • API ドキュメント

このガイドでは、変更ストリームを使用してデータに対するリアルタイムの変更を監視する方法を学習できます。 変更ストリームは、アプリケーションがコレクション、データベース、または配置のデータ変更をサブスクライブできる MongoDB Server の機能です。

このガイドの例では、 Atlas サンプル データセット sample_restaurants.restaurantsコレクションを使用します。 無料の MongoDB Atlas クラスターを作成し、サンプル データセットをロードする方法については、 クイック スタートを参照してください。

このページの例では、次の Restaurant クラス、Address クラス、GradeEntry クラスをモデルとして使用します。

public class Restaurant
{
public ObjectId Id { get; set; }
public string Name { get; set; }
[BsonElement("restaurant_id")]
public string RestaurantId { get; set; }
public string Cuisine { get; set; }
public Address Address { get; set; }
public string Borough { get; set; }
public List<GradeEntry> Grades { get; set; }
}
public class Address
{
public string Building { get; set; }
[BsonElement("coord")]
public double[] Coordinates { get; set; }
public string Street { get; set; }
[BsonElement("zipcode")]
public string ZipCode { get; set; }
}
public class GradeEntry
{
public DateTime Date { get; set; }
public string Grade { get; set; }
public float? Score { get; set; }
}

注意

restaurantsコレクションのドキュメントは、スニペット ケースの命名規則を使用します。このガイドの例では、ConventionPack を使用してコレクション内のフィールドをパスカル ケースに逆シリアル化し、Restaurantクラスのプロパティにマップします。

カスタム直列化について詳しくは、「カスタム直列化」を参照してください。

変更ストリームを開くには、 メソッドまたはWatch() WatchAsync()メソッドを呼び出します。メソッドを呼び出す インスタンスによって、変更ストリームがリッスンするイベントの範囲が決まります。 Watch()WatchAsync()次のクラスで メソッドまたは メソッドを呼び出すことができます。

  • MongoClient: MongoDB 配置のすべての変更を監視

  • Database: データベース内のすべてのコレクションの変更を監視するには

  • Collection: コレクションの変更をモニターするには

次の例では、 restaurantsコレクションの変更ストリームを開き、変更が発生に応じて出力します。 AsynchronousSynchronous対応するコードを表示するには、 タブまたは タブを選択します。

var database = client.GetDatabase("sample_restaurants");
var collection = database.GetCollection<Restaurant>("restaurants");
// Opens a change streams and print the changes as they're received
using var cursor = await collection.WatchAsync();
await cursor.ForEachAsync(change =>
{
Console.WriteLine("Received the following type of change: " + change.BackingDocument);
});
var database = client.GetDatabase("sample_restaurants");
var collection = database.GetCollection<Restaurant>("restaurants");
// Opens a change stream and prints the changes as they're received
using (var cursor = collection.Watch())
{
foreach (var change in cursor.ToEnumerable())
{
Console.WriteLine("Received the following type of change: " + change.BackingDocument);
}
}

変更の監視を開始するには、アプリケーションを実行します。 次に、別のアプリケーションまたは shell で、 restaurantsコレクションを変更します。 "name"の値が"Blarney Castle"であるドキュメントを更新すると、次の変更ストリーム出力が生成されます。

{ "_id" : { "_data" : "..." }, "operationType" : "update", "clusterTime" : Timestamp(...),
"wallTime" : ISODate("..."), "ns" : { "db" : "sample_restaurants", "coll" : "restaurants" },
"documentKey" : { "_id" : ObjectId("...") }, "updateDescription" : { "updatedFields" : { "cuisine" : "Irish" },
"removedFields" : [], "truncatedArrays" : [] } }

変更ストリーム出力を変更するには、 パラメータを メソッドと メソッドに渡します。pipelineWatch()WatchAsync()このパラメーターを使用すると、指定された変更イベントのみを監視できます。 EmptyPipelineDefinitionクラスを使用し、関連する集計ステージ メソッドを追加して、パイプラインを作成します。

pipelineパラメーターでは次の集計ステージを指定できます。

  • $addFields

  • $match

  • $project

  • $replaceRoot

  • $replaceWith

  • $redact

  • $set

  • $unset

PipelineDefinitionBuilderクラスを使用して集計パイプラインを構築する方法については、「 ビルダによる操作の集計パイプラインの構築 」を参照してください。

次の例では、 pipelineパラメータを使用して、アップデート操作のみを記録する変更ストリームを開きます。 AsynchronousSynchronous対応するコードを表示するには、 タブまたは タブを選択します。

var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>()
.Match(change => change.OperationType == ChangeStreamOperationType.Update);
// Opens a change stream and prints the changes as they're received
using (var cursor = await _restaurantsCollection.WatchAsync(pipeline))
{
await cursor.ForEachAsync(change =>
{
Console.WriteLine("Received the following change: " + change);
});
}
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>()
.Match(change => change.OperationType == ChangeStreamOperationType.Update);
// Opens a change streams and print the changes as they're received
using (var cursor = _restaurantsCollection.Watch(pipeline))
{
foreach (var change in cursor.ToEnumerable())
{
Console.WriteLine("Received the following change: " + change);
}
}

変更ストリーム出力の変更の詳細については、MongoDB Server マニュアルの「 変更ストリーム出力 の変更 」セクションを参照してください。

Watch()メソッドとWatchAsync()メソッドは、操作を構成するために使用できるオプションを表す任意のパラメーターを受け入れます。 オプションを指定しない場合、ドライバーは操作をカスタマイズしません。

次の表では、 Watch()WatchAsync()の動作をカスタマイズするために設定できるオプションについて説明します。

オプション
説明

FullDocument

Specifies whether to show the full document after the change, rather than showing only the changes made to the document. To learn more about this option, see Include Pre-Images and Post-Images.

FullDocumentBeforeChange

Specifies whether to show the full document as it was before the change, rather than showing only the changes made to the document. To learn more about this option, see Include Pre-Images and Post-Images.

ResumeAfter

Directs Watch() or WatchAsync() to resume returning changes after the operation specified in the resume token.
Each change stream event document includes a resume token as the _id field. Pass the entire _id field of the change event document that represents the operation you want to resume after.
ResumeAfter is mutually exclusive with StartAfter and StartAtOperationTime.

StartAfter

Directs Watch() or WatchAsync() to start a new change stream after the operation specified in the resume token. Allows notifications to resume after an invalidate event.
Each change stream event document includes a resume token as the _id field. Pass the entire _id field of the change event document that represents the operation you want to resume after.
StartAfter is mutually exclusive with ResumeAfter and StartAtOperationTime.

StartAtOperationTime

Directs Watch() or WatchAsync() to return only events that occur after the specified timestamp.
StartAtOperationTime is mutually exclusive with ResumeAfter and StartAfter.

MaxAwaitTime

Specifies the maximum amount of time, in milliseconds, the server waits for new data changes to report to the change stream cursor before returning an empty batch. Defaults to 1000 milliseconds.

ShowExpandedEvents

Starting in MongoDB Server v6.0, change streams support change notifications for Data Definition Language (DDL) events, such as the createIndexes and dropIndexes events. To include expanded events in a change stream, create the change stream cursor and set this parameter to True.

BatchSize

Specifies the maximum number of change events to return in each batch of the response from the MongoDB cluster.

Collation

Specifies the collation to use for the change stream cursor.

Comment

Attaches a comment to the operation.

重要

配置で MongoDB v 6.0以降が使用されている場合にのみ、コレクションで変更前と変更後のイメージを有効にできます。

デフォルトでは、コレクションに対して操作を実行すると、対応する変更イベントにはその操作によって変更されたフィールドのデルタのみが含まれます。 変更前または変更後の完全なドキュメントを表示するには、 ChangeStreamOptionsオブジェクトを作成し、 FullDocumentBeforeChangeまたはFullDocumentオプションを指定します。 次に、 ChangeStreamOptionsオブジェクトをWatch()またはWatchAsync()メソッドに渡します。

変更前のイメージは、変更のドキュメントの完全なバージョンです。 変更ストリーム イベントに変更前のイメージを含めるには、 FullDocumentBeforeChangeオプションを次のいずれかの値に設定します。

  • ChangeStreamFullDocumentBeforeChangeOption.WhenAvailable: 変更イベントには、変更前のイメージが利用可能な場合にのみ、 変更イベント 用の変更されたドキュメントの変更前のイメージが含まれます。

  • ChangeStreamFullDocumentBeforeChangeOption.Required: 変更イベントには、変更イベント用に変更されたドキュメントの変更前のイメージが含まれます。 変更前のイメージが利用できない場合、ドライバーはエラーを発生させます。

変更後のイメージとは、変更のドキュメントの完全なバージョンです。 変更ストリーム イベントに変更後のイメージを含めるには、 FullDocumentオプションを次のいずれかの値に設定します。

  • ChangeStreamFullDocumentOption.UpdateLookup: 変更イベントには、変更後一定時間の変更されたドキュメント全体のコピーが含まれます。

  • ChangeStreamFullDocumentOption.WhenAvailable: 変更イベントには、変更後のイメージが利用可能な場合にのみ、 変更イベント 用の変更されたドキュメントの変更後のイメージが含まれます。

  • ChangeStreamFullDocumentOption.Required: 変更イベントには、変更イベントの変更されたドキュメントの変更後のイメージが含まれます。 変更後のイメージが利用できない場合、ドライバーはエラーを発生させます。

次の例では、コレクションの変更ストリームを開き、 FullDocumentオプションを指定して更新されたドキュメントの変更後のイメージを含めます。 AsynchronousSynchronous対応するコードを表示するには、 タブまたは タブを選択します。

var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>()
.Match(change => change.OperationType == ChangeStreamOperationType.Update);
var options = new ChangeStreamOptions
{
FullDocument = ChangeStreamFullDocumentOption.UpdateLookup,
};
using var cursor = await _restaurantsCollection.WatchAsync(pipeline, options);
await cursor.ForEachAsync(change =>
{
Console.WriteLine(change.FullDocument.ToBsonDocument());
});
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>()
.Match(change => change.OperationType == ChangeStreamOperationType.Update);
var options = new ChangeStreamOptions
{
FullDocument = ChangeStreamFullDocumentOption.UpdateLookup,
};
using (var cursor = _restaurantsCollection.Watch(pipeline, options))
{
foreach (var change in cursor.ToEnumerable())
{
Console.WriteLine(change.FullDocument.ToBsonDocument());
}
}

上記のコード例を実行し、 "name"値が"Blarney Castle"であるドキュメントを更新すると、次の変更ストリーム出力が生成されます。

{ "_id" : ObjectId("..."), "name" : "Blarney Castle", "restaurant_id" : "40366356",
"cuisine" : "Traditional Irish", "address" : { "building" : "202-24", "coord" : [-73.925044200000002, 40.5595462],
"street" : "Rockaway Point Boulevard", "zipcode" : "11697" }, "borough" : "Queens", "grades" : [...] }

変更前と変更後のイメージの詳細については、Change Streams MongoDB Serverマニュアルの「 とドキュメントの変更 前イメージおよび変更後イメージ 」を参照してください。

Change Streams変更ストリームの詳細については、MongoDB Server マニュアルの 「 ストリーム」 を参照してください。

このガイドで説明したメソッドや型の詳細については、次の API ドキュメントを参照してください。

戻る

個別のフィールド値の取得