データの変更を監視
Overview
このガイドでは、変更ストリームを使用してドキュメントの変更を監視する方法を学習できます。
変更ストリームは新しい変更イベントを出力し、リアルタイムのデータ変更にアクセスできるようにします。 コレクション、データベース、またはクライアント オブジェクトに対して変更ストリームを開くことができます。
サンプル データ
このガイドの例では、 courses
コレクション内のドキュメントのモデルとして、次の Course
構造体を使用します。
type Course struct { Title string Enrollment int32 }
このガイドの例を実行するには、次のスニペットを使用して、これらのドキュメントをdb
データベースのcourses
コレクションにロードします。
coll := client.Database("db").Collection("courses") docs := []interface{}{ Course{Title: "World Fiction", Enrollment: 35}, Course{Title: "Abstract Algebra", Enrollment: 60}, } result, err := coll.InsertMany(context.TODO(), docs)
Tip
存在しないデータベースとコレクション
書き込み操作を実行するときに必要なデータベースとコレクションが存在しない場合は、サーバーが暗黙的にそれらを作成します。
各ドキュメントには、各ドキュメントの フィールドとtitle
enrollment
フィールドに対応する、コース名と最大登録者数を含む大学コースの説明が含まれています。
注意
各出力例では、切り捨てられた_data
、 clusterTime
、 ObjectID
の値が表示されます。これらの値はドライバーが独自に生成するためです。
変更ストリームを開く
変更ストリームを開くには、 Watch()
メソッドを使用します。 Watch()
メソッドにはコンテキスト パラメータとパイプライン パラメータが必要です。 すべての変更を返すには、空のPipeline
オブジェクトを渡します。
例
次の例では、 courses
コレクションの変更ストリームを開き、すべての変更を出力します。
changeStream, err := coll.Watch(context.TODO(), mongo.Pipeline{}) if err != nil { panic(err) } defer changeStream.Close(context.TODO()) // Iterates over the cursor to print the change stream events for changeStream.Next(context.TODO()) { fmt.Println(changeStream.Current) }
別のプログラムまたは shell でcourses
コレクションを変更すると、このコードは変更が発生に応じて出力します。 title
値が"Advanced Screenwriting"
で、かつenrollment
値が20
であるドキュメントを挿入すると、次の変更イベントが発生します。
map[_id:map[_data:...] clusterTime: {...} documentKey:map[_id:ObjectID("...")] fullDocument:map[_id:ObjectID("...") enrollment:20 title:Advanced Screenwriting] ns: map[coll:courses db:db] operationType:insert]
変更ストリーム出力の変更
変更ストリーム出力を変更するには、 パイプライン パラメーターを使用します。 このパラメーターを使用すると、特定の変更イベントのみを監視できます。 パイプライン パラメータをドキュメントの配列として形式し、各ドキュメントは集計ステージを表します。
このパラメーターでは、次のパイプライン ステージを使用できます。
$addFields
$match
$project
$replaceRoot
$replaceWith
$redact
$set
$unset
例
次の例では、 db
データベースで変更ストリームを開きますが、新しい削除操作のみを監視します。
db := client.Database("db") pipeline := bson.D{{"$match", bson.D{{"operationType", "delete"}}}} changeStream, err := db.Watch(context.TODO(), mongo.Pipeline{pipeline}) if err != nil { panic(err) } defer changeStream.Close(context.TODO()) // Iterates over the cursor to print the delete operation change events for changeStream.Next(context.TODO()) { fmt.Println(changeStream.Current) }
注意
Watch()
メソッドはdb
データベースで呼び出されたため、コードはこのデータベース内の任意のコレクションに対する新しい削除操作を出力します。
の動作を変更する Watch()
Watch()
メソッドの動作を変更するには、 options
パラメーターを使用します。
Watch()
メソッドでは次のオプションを指定できます。
ResumeAfter
StartAfter
FullDocument
FullDocumentBeforeChange
BatchSize
MaxAwaitTime
Collation
StartAtOperationTime
Comment
ShowExpandedEvents
StartAtOperationTime
Custom
CustomPipeline
これらのオプションの詳細については、 MongoDB Server マニュアルをご覧ください。
変更前と変更後のイメージ
コレクションに対して CRUD 操作を実行すると、デフォルトでは、対応する変更イベント ドキュメントには、操作によって変更されたフィールドのデルタのみが含まれます。 Watch()
メソッドのoptions
パラメータで 設定を指定すると、デルタに加えて、変更の前と変更後の完全なドキュメントを表示できます。
ドキュメントの変更後のイメージ、変更後のドキュメントの完全なバージョンを表示するには、 options
パラメータのFullDocument
フィールドを次のいずれかの値に設定します。
UpdateLookup
: 変更イベント ドキュメントには、変更されたドキュメント全体のコピーが含まれます。WhenAvailable
: 変更イベント ドキュメントには、変更イベント用に変更されたドキュメントの変更後のイメージが利用可能な場合、その変更が含まれます。Required
: 出力はWhenAvailable
の と同じですが、変更後のイメージが利用できない場合、ドライバーはサーバー側のエラーを発生させます。
変更前のドキュメントの変更前のイメージ、つまりドキュメントの完全なバージョンを表示するには、 options
パラメータのFullDocumentBeforeChange
フィールドを次のいずれかの値に設定します。
WhenAvailable
: 変更イベント ドキュメントには、変更前のイメージが使用可能な場合、変更されたドキュメントの変更前のイメージが含まれます。Required
: 出力はWhenAvailable
の と同じですが、変更前のイメージが利用できない場合、ドライバーはサーバー側のエラーを発生させます。
重要
ドキュメントの変更前と変更後のイメージにアクセスするには、コレクションに対してchangeStreamPreAndPostImages
を有効にする必要があります。 手順と詳細については、 MongoDB Server マニュアルを参照してください。
注意
挿入されたドキュメントには変更前のイメージはなく、削除されたドキュメントには変更後のイメージはありません。
例
次の例では、 courses
コレクションでWatch()
メソッドを呼び出します。 options
パラメータのFullDocument
フィールドに値を指定すると、変更されたフィールドのみではなく、変更されたドキュメント全体のコピーが出力されます。
opts := options.ChangeStream().SetFullDocument(options.UpdateLookup) changeStream, err := coll.Watch(context.TODO(), mongo.Pipeline{}, opts) if err != nil { panic(err) } defer changeStream.Close(context.TODO()) for changeStream.Next(context.TODO()) { fmt.Println(changeStream.Current) }
"World Fiction"
のtitle
を使用してドキュメントのenrollment
値を35
から30
に更新すると、次の変更イベントが発生します。
{"_id": {"_data": "..."},"operationType": "update","clusterTime": {"$timestamp": {"t":"...","i":"..."}},"fullDocument": {"_id": {"$oid":"..."},"title": "World Fiction","enrollment": {"$numberInt":"30"}}, "ns": {"db": "db","coll": "courses"},"documentKey": {"_id": {"$oid":"..."}}, "updateDescription": {"updatedFields": {"enrollment": {"$numberInt":"30"}}, "removedFields": [],"truncatedArrays": []}}
FullDocument
オプションを指定しない場合、同じアップデート操作では変更イベント ドキュメントに"fullDocument"
値が出力されなくなります。
詳細情報
変更ストリームの実行可能な例については、「 データ変更のモニタリング 」を参照してください。
変更ストリームの詳細については、「 Change Streamsストリーム 」を参照してください。
API ドキュメント
Watch()
メソッドの詳細については、次の API ドキュメント リンクをご覧ください。