Docs Menu
Docs Home
/ / /
Kotlin Sync ドライバー
/

データの変更を監視

項目一覧

  • Overview
  • サンプル データ
  • 変更ストリームを開く
  • 変更ストリームを開く例
  • 変更ストリーム出力の変更
  • 特定のイベントの例との一致
  • watch() の動作を変更する
  • 変更前と変更後のイメージを含めます
  • 詳細情報
  • API ドキュメント

このガイドでは、 Kotlin Sync ドライバーを使用して変更ストリームを監視し、データベースに対するリアルタイムの変更を表示できるようにする方法を説明します。 変更ストリームは、コレクション、データベース、または配置のデータ変更を公開する MongoDB Server の機能です。 アプリケーションは 変更ストリーム をサブスクライブし、イベントを使用して他のアクションを実行することができます。

このガイドの例では、 Atlas サンプル データセットsample_restaurantsデータベース内の restaurantsコレクションを使用します。 MongoDB Atlas クラスターを無料で作成して、サンプル データセットをロードする方法については、 「 Atlas を使い始める 」ガイドを参照してください。

次の Kotlin データ クラスは、このコレクション内のドキュメントをモデル化します。

data class Restaurant(
val name: String,
val cuisine: String,
)

変更ストリームを開くには、 watch()メソッドを呼び出します。 watch()メソッドを呼び出す インスタンスによって、変更ストリームがリッスンするイベントの範囲が決まります。 次のクラスのインスタンスでwatch()メソッドを呼び出すことができます。

  • MongoClient: MongoDB 配置のすべての変更を監視

  • MongoDatabase: データベース内のすべてのコレクションの変更を監視するには

  • MongoCollection: コレクションの変更をモニターするには

次の例では、 restaurantsコレクションの変更ストリームを開き、変更が発生に応じて出力します。

collection.watch().forEach { change ->
println(change)
}

変更の監視を開始するには、アプリケーションを実行します。 次に、別のアプリケーションまたは shell で、 restaurantsコレクションに対して書込み操作を実行します。 次の例えでは、 nameの値が"Blarney Castle"であるドキュメントを更新します。

val filter = Filters.eq(Restaurant::name.name, "Blarney Castle")
val update = Updates.set(Restaurant::cuisine.name, "Irish")
val result = collection.updateOne(filter, update)

コレクションを更新すると、変更ストリーム アプリケーションは変更が発生に応じて出力します。 出力される変更イベントは、次のようになります。

{
"_id": { ... },
"operationType": "update",
"clusterTime": { ... },
"ns": {
"db": "sample_restaurants",
"coll": "restaurants"
},
"updateDescription": {
"updatedFields": {
"cuisine": "Irish"
},
"removedFields": [],
"truncatedArrays": []
}
...
}

pipelineパラメータをwatch()メソッドに渡して、変更ストリーム出力を変更できます。 このパラメーターを使用すると、指定された変更イベントのみを監視できます。 パラメーターを、それぞれが集計ステージを表すオブジェクトのリストとして形式します。

pipelineパラメーターでは、次のステージを指定できます。

  • $addFields

  • $match

  • $project

  • $replaceRoot

  • $replaceWith

  • $redact

  • $set

  • $unset

次の例では、 $matchを含むpipelineパラメーターを使用して、アップデート操作のみを記録する変更ストリームを開きます。

val pipeline = listOf(
Aggregates.match(Filters.eq("operationType", "update"))
)
collection.watch(pipeline).forEach { change ->
println(change)
}

変更ストリーム出力の変更の詳細については、MongoDB Server マニュアルの「 変更ストリーム出力 の変更 」セクションを参照してください。

watch()メソッド呼び出しによって返されるChangeStreamIterableオブジェクトにメソッドを連鎖させることで、 watch()を変更できます。 オプションを指定しない場合、ドライバーは操作をカスタマイズしません。

次の表では、 watch()の動作をカスタマイズするために使用できる方法について説明します。

方式
説明
batchSize()
Sets the number of documents to return per batch.
collation()
Specifies the kind of language collation to use when sorting results. For more information, see Collation in the MongoDB Server manual.
comment()
Specifies a comment to attach to the operation.
fullDocument()
Sets the fullDocument value. To learn more, see the Include Pre-Images and Post-Images section of this document.
fullDocumentBeforeChange()
Sets the fullDocumentBeforeChange value. To learn more, see the Include Pre-Images and Post-Images section of this document.
maxAwaitTime()
Sets the maximum await execution time on the server for this operation, in milliseconds.

watch()メソッドを構成するのに使用できるメソッドの完全なリストについては、 ChangeStreamIterable を参照してください API ドキュメント。

重要

配置で MongoDB v 6.0以降が使用されている場合にのみ、コレクションで変更前と変更後のイメージを有効にできます。

デフォルトでは、コレクションに対して操作を実行すると、対応する変更イベントにはその操作によって変更されたフィールドのデルタのみが含まれます。 変更前または変更後の完全なドキュメントを表示するには、 fullDocumentBeforeChange()メソッドまたはfullDocument()メソッドをwatch()メソッドに連鎖させます。

変更前のイメージは、変更のドキュメントの完全なバージョンです。 変更ストリーム イベントに変更前のイメージを含めるには、次のいずれかのオプションをfullDocumentBeforeChange()メソッドに渡します。

  • FullDocumentBeforeChange.WHEN_AVAILABLE: 変更イベントには、変更前のイメージが利用可能な場合にのみ、 変更イベント 用の変更されたドキュメントの変更前のイメージが含まれます。

  • FullDocumentBeforeChange.REQUIRED: 変更イベントには、変更イベント用に変更されたドキュメントの変更前のイメージが含まれます。 変更前のイメージが利用できない場合、ドライバーはエラーを発生させます。

変更後のイメージとは、変更のドキュメントの完全なバージョンです。 変更ストリーム イベントに変更後のイメージを含めるには、次のいずれかのオプションをfullDocument()メソッドに渡します。

  • FullDocument.UPDATE_LOOKUP: 変更イベントには、変更後一定時間の変更されたドキュメント全体のコピーが含まれます。

  • FullDocument.WHEN_AVAILABLE: 変更イベントには、変更後のイメージが利用可能な場合にのみ、 変更イベント 用の変更されたドキュメントの変更後のイメージが含まれます。

  • FullDocument.REQUIRED: 変更イベントには、変更イベントの変更されたドキュメントの変更後のイメージが含まれます。 変更後のイメージが利用できない場合、ドライバーはエラーを発生させます。

次の例では、コレクションでwatch()メソッドを呼び出し、 fullDocumentパラメータを指定して更新されたドキュメントの変更後のイメージを結果に含めます。

collection.watch().fullDocument(FullDocument.UPDATE_LOOKUP).forEach { change ->
println("Received a change: $change")
}

変更ストリーム アプリケーションが実行されている場合、前述の更新例を使用してrestaurantsコレクション内のドキュメントを更新すると、次のような変更イベントが出力されます。

ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."},
namespace=sample_restaurants.restaurants, destinationNamespace=null, fullDocument=Restaurant(name=Blarney Castle, cuisine=Irish),
fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "..."}},
clusterTime=Timestamp{value=..., seconds=..., inc=...},
updateDescription=UpdateDescription{removedFields=[], updatedFields={"cuisine": "Irish"},
truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null, splitEvent=null,
wallTime=BsonDateTime{value=...}}

変更前と変更後のイメージの詳細については、Change Streams MongoDB Serverマニュアルの「 とドキュメントの変更 前イメージおよび変更後イメージ 」を参照してください。

Change Streams変更ストリームの詳細については、MongoDB Server マニュアルの 「 ストリーム」 を参照してください。

このガイドで説明したメソッドや型の詳細については、次の API ドキュメントを参照してください。

戻る

カーソルからデータにアクセスする