Docs Menu
Docs Home
/ / /
Go
/ / /

データの変更を監視

項目一覧

  • Overview
  • サンプル データ
  • 変更ストリームを開く
  • 変更ストリーム出力の変更
  • の動作を変更する Watch()
  • 変更前と変更後のイメージ
  • 詳細情報
  • API ドキュメント

このガイドでは、変更ストリームを使用してドキュメントの変更を監視する方法を学習できます。

変更ストリームは新しい変更イベントを出力し、リアルタイムのデータ変更にアクセスできるようにします。 コレクション、データベース、またはクライアント オブジェクトに対して変更ストリームを開くことができます。

このガイドの例では、 coursesコレクション内のドキュメントのモデルとして、次の Course構造体を使用します。

type Course struct {
Title string
Enrollment int32
}

このガイドの例を実行するには、次のスニペットを使用して、これらのドキュメントをdbデータベースのcoursesコレクションにロードします。

coll := client.Database("db").Collection("courses")
docs := []interface{}{
Course{Title: "World Fiction", Enrollment: 35},
Course{Title: "Abstract Algebra", Enrollment: 60},
}
result, err := coll.InsertMany(context.TODO(), docs)

Tip

存在しないデータベースとコレクション

書き込み操作を実行するときに必要なデータベースとコレクションが存在しない場合は、サーバーが暗黙的にそれらを作成します。

各ドキュメントには、各ドキュメントの フィールドとtitle enrollmentフィールドに対応する、コース名と最大登録者数を含む大学コースの説明が含まれています。

注意

各出力例では、切り捨てられた_dataclusterTimeObjectIDの値が表示されます。これらの値はドライバーが独自に生成するためです。

変更ストリームを開くには、 Watch()メソッドを使用します。 Watch()メソッドにはコンテキスト パラメータとパイプライン パラメータが必要です。 すべての変更を返すには、空のPipelineオブジェクトを渡します。

次の例では、 coursesコレクションの変更ストリームを開き、すべての変更を出力します。

changeStream, err := coll.Watch(context.TODO(), mongo.Pipeline{})
if err != nil {
panic(err)
}
defer changeStream.Close(context.TODO())
// Iterates over the cursor to print the change stream events
for changeStream.Next(context.TODO()) {
fmt.Println(changeStream.Current)
}

別のプログラムまたは shell でcoursesコレクションを変更すると、このコードは変更が発生に応じて出力します。 title値が"Advanced Screenwriting"で、かつenrollment値が20であるドキュメントを挿入すると、次の変更イベントが発生します。

map[_id:map[_data:...] clusterTime: {...} documentKey:map[_id:ObjectID("...")]
fullDocument:map[_id:ObjectID("...") enrollment:20 title:Advanced Screenwriting] ns:
map[coll:courses db:db] operationType:insert]

変更ストリーム出力を変更するには、 パイプライン パラメーターを使用します。 このパラメーターを使用すると、特定の変更イベントのみを監視できます。 パイプライン パラメータをドキュメントの配列として形式し、各ドキュメントは集計ステージを表します。

このパラメーターでは、次のパイプライン ステージを使用できます。

  • $addFields

  • $match

  • $project

  • $replaceRoot

  • $replaceWith

  • $redact

  • $set

  • $unset

次の例では、 dbデータベースで変更ストリームを開きますが、新しい削除操作のみを監視します。

db := client.Database("db")
pipeline := bson.D{{"$match", bson.D{{"operationType", "delete"}}}}
changeStream, err := db.Watch(context.TODO(), mongo.Pipeline{pipeline})
if err != nil {
panic(err)
}
defer changeStream.Close(context.TODO())
// Iterates over the cursor to print the delete operation change events
for changeStream.Next(context.TODO()) {
fmt.Println(changeStream.Current)
}

注意

Watch()メソッドはdbデータベースで呼び出されたため、コードはこのデータベース内の任意のコレクションに対する新しい削除操作を出力します。

Watch()メソッドの動作を変更するには、 optionsパラメーターを使用します。

Watch()メソッドでは次のオプションを指定できます。

  • ResumeAfter

  • StartAfter

  • FullDocument

  • FullDocumentBeforeChange

  • BatchSize

  • MaxAwaitTime

  • Collation

  • StartAtOperationTime

  • Comment

  • ShowExpandedEvents

  • StartAtOperationTime

  • Custom

  • CustomPipeline

これらのオプションの詳細については、 MongoDB Server マニュアルをご覧ください。

コレクションに対して CRUD 操作を実行すると、デフォルトでは、対応する変更イベント ドキュメントには、操作によって変更されたフィールドのデルタのみが含まれます。 Watch()メソッドのoptionsパラメータで 設定を指定すると、デルタに加えて、変更の前と変更後の完全なドキュメントを表示できます。

ドキュメントの変更後のイメージ、変更後のドキュメントの完全なバージョンを表示するには、 optionsパラメータのFullDocumentフィールドを次のいずれかの値に設定します。

  • UpdateLookup: 変更イベント ドキュメントには、変更されたドキュメント全体のコピーが含まれます。

  • WhenAvailable: 変更イベント ドキュメントには、変更イベント用に変更されたドキュメントの変更後のイメージが利用可能な場合、その変更が含まれます。

  • Required: 出力はWhenAvailableの と同じですが、変更後のイメージが利用できない場合、ドライバーはサーバー側のエラーを発生させます。

変更前のドキュメントの変更前のイメージ、つまりドキュメントの完全なバージョンを表示するには、 optionsパラメータのFullDocumentBeforeChangeフィールドを次のいずれかの値に設定します。

  • WhenAvailable: 変更イベント ドキュメントには、変更前のイメージが使用可能な場合、変更されたドキュメントの変更前のイメージが含まれます。

  • Required: 出力はWhenAvailableの と同じですが、変更前のイメージが利用できない場合、ドライバーはサーバー側のエラーを発生させます。

重要

ドキュメントの変更前と変更後のイメージにアクセスするには、コレクションに対してchangeStreamPreAndPostImagesを有効にする必要があります。 手順と詳細については、 MongoDB Server マニュアルを参照してください。

注意

挿入されたドキュメントには変更前のイメージはなく、削除されたドキュメントには変更後のイメージはありません。

次の例では、 coursesコレクションでWatch()メソッドを呼び出します。 optionsパラメータのFullDocumentフィールドに値を指定すると、変更されたフィールドのみではなく、変更されたドキュメント全体のコピーが出力されます。

opts := options.ChangeStream().SetFullDocument(options.UpdateLookup)
changeStream, err := coll.Watch(context.TODO(), mongo.Pipeline{}, opts)
if err != nil {
panic(err)
}
defer changeStream.Close(context.TODO())
for changeStream.Next(context.TODO()) {
fmt.Println(changeStream.Current)
}

"World Fiction"titleを使用してドキュメントのenrollment値を35から30に更新すると、次の変更イベントが発生します。

{"_id": {"_data": "..."},"operationType": "update","clusterTime": {"$timestamp":
{"t":"...","i":"..."}},"fullDocument": {"_id":
{"$oid":"..."},"title": "World Fiction","enrollment":
{"$numberInt":"30"}}, "ns": {"db": "db","coll": "courses"},"documentKey": {"_id":
{"$oid":"..."}}, "updateDescription": {"updatedFields": {"enrollment": {"$numberInt":"30"}},
"removedFields": [],"truncatedArrays": []}}

FullDocumentオプションを指定しない場合、同じアップデート操作では変更イベント ドキュメントに"fullDocument"値が出力されなくなります。

変更ストリームの実行可能な例については、「 データ変更のモニタリング 」を参照してください。

変更ストリームの詳細については、「 Change Streamsストリーム 」を参照してください。

Watch()メソッドの詳細については、次の API ドキュメント リンクをご覧ください。

戻る

検索テキスト