変更ストリーム
MongoDB サーバーのバージョン 3.6 以降では、集計フレームワークで新しい $changeStream
パイプライン ステージがサポートされています。 このステージを集計パイプラインの最初に指定すると、ユーザーは特定のコレクションに対するすべての変更に対して通知を送信するようリクエストできます。 MongoDB 4.0 以降、変更ストリームはコレクションに加えてデータベースとクラスターでサポートされています。
Ruby ドライバーは、この新しいパイプライン ステージを使用して特定のコレクション、データベース、またはクラスターに対する変更の通知を受信するための API を提供します。 パイプライン演算子と集計フレームワークを直接使用して変更ストリームを直接作成できますが、タイムアウト、ネットワークエラー、サーバーエラーが発生した場合にドライバーが変更ストリームを一度に再開するため、以下で説明するドライバー API を使用することをお勧めします。フェイルオーバーが発生しているか、別のタイプの再開可能なエラーが発生している。
サーバー上の変更ストリームには、 "majority"
の読み取り保証が必要であるか、読み取り保証がありません。
変更ストリームが JRuby では正しく動作しないのは、 ここで 説明されている問題が原因です 。つまり、JRuby はバックグラウンドの緑色のスレッドの列挙型で#next
を早期に評価するため、変更ストリームで#next
を呼び出すと、getMores がバックグラウンドのループで呼び出されます。
コレクションの変更の監視
コレクション変更ストリームは、コレクションで#watch
メソッドを呼び出すことで作成されます。
client = Mongo::Client.new([ '127.0.0.1:27017' ], :database => 'test') collection = client[:test] stream = collection.watch collection.insert_one(a: 1) doc = stream.to_enum.next process(doc)
通知は利用可能になったときに受け取ることもできます。
stream = collection.watch enum = stream.to_enum while doc = enum.next process(doc) end
next
メソッドは、変更が利用可能になるまでクラスターをブロックし、ポーリングします。 ブロックせずに変更ストリームを反復処理するには、 try_next
メソッドを使用します。このメソッドは最大_await_time_ms ミリ秒まで待機し、変更が受信されない場合は nil を返します。 再開できないエラーが発生した場合、 next
とtry_next
の両方で例外が発生します。 コレクションから変更を無期限に読み取る例については、以下の「 変更ストリームの再開 」セクションを参照してください。
変更ストリームは、 集計フレームワークパイプライン演算子 形式のフィルターを取ることができます。
stream = collection.watch([{'$match' => { 'operationType' => {'$in' => ['insert', 'replace'] } } }, {'$match' => { 'fullDocument.n' => { '$gte' => 1 } } } ]) enum = stream.to_enum while doc = enum.next process(doc) end
データベースの変更の監視
データベース変更ストリームは、データベース内のコレクションの変更や、データベースの削除などのデータベース全体のイベントを通知します。
データベース変更ストリームは、データベース オブジェクトで#watch
メソッドを呼び出すことで作成されます。
client = Mongo::Client.new([ '127.0.0.1:27017' ], :database => 'test') database = client.database stream = database.watch client[:test].insert_one(a: 1) doc = stream.to_enum.next process(doc)
クラスターでの変更の監視
クラスター変更ストリームは、任意のコレクション、クラスター内の任意のデータベース、およびクラスター全体のイベントの変更を通知します。
クラスター変更ストリームは、クライアント オブジェクト(クラスター オブジェクトではない)で#watch
メソッドを呼び出すことで作成されます。
client = Mongo::Client.new([ '127.0.0.1:27017' ], :database => 'test') stream = client.watch client[:test].insert_one(a: 1) doc = stream.to_enum.next process(doc)
変更ストリームを閉じる
変更ストリームを閉じるには、その#close
メソッドを呼び出します。
stream.close
変更ストリームの再開
変更ストリームは、初期集計と次の変更バッチを受信するためのgetMore
リクエストの 2 種類の操作で構成されています。
ドライバーは、ネットワークエラーが発生した場合と、状態が変更されたことを示すエラーを返すと、各getMore
操作を自動的に再試行します(たとえば、プライマリでなくなった場合)。 ドライバーは最初の集計を再試行しません。
実際には、これはたとえば次のことを意味します。
クラスターに
"majority"
読み込み設定(read preference)を満たすのに十分な使用可能なノードがない場合、collection.watch
の呼び出しは失敗します。collection.watch
が正常に返された以降、クラスターで選挙が行われたり、ノードが失われた場合でも、十分な迅速に回復した場合でも、next
またはeach
メソッドによる変更ストリームの読み取りは、アプリケーションに対して透過的に続行されます。
変更を失ったり変更を複数回処理したりすることなく、変更を無期限かつ確実に監視するには、アプリケーションは変更ストリームの再開トークンを追跡し、ドライバーの自動再開も失敗する拡張エラーが発生したときに変更ストリームを再起動する必要があります。 次のコード スニペットは、変更ストリームを無期限に反復処理する方法、 resume_token
変更ストリーム メソッドを使用して再開トークンを取得する方法、すべての MongoDB またはネットワーク エラーで:resume_after
オプションを使用して変更ストリームを再起動する方法を示しています。
token = nil loop do begin stream = collection.watch([], resume_after: token) enum = stream.to_enum while doc = enum.next process(doc) token = stream.resume_token end rescue Mongo::Error sleep 1 end end
上記の反復はenum.next
呼び出しでブロックされており、このコードを実行している Ruby プロセスが終了した場合には処理の再開は許可されません。 このドライバーは、変更ストリームに変更がない場合に無期限にブロックする代わりにnil
(待機期間後に)を返すtry_next
メソッドも提供します。 try_next
メソッドを使用すると、特定のリクエストが変更を返さない場合でも、 getMore
リクエストごとに再開トークンが保持されます。これにより、再開トークンは oplog の先頭に残り、アプリケーションは次の機会を利用できます:変更を処理するプロセスが終了した場合に永続化されます。
token = nil loop do begin stream = collection.watch([], resume_after: token) enum = stream.to_enum doc = enum.try_next if doc process(doc) end token = stream.resume_token # Persist +token+ to support resuming processing upon process restart rescue Mongo::Error sleep 1 end end
再開トークンは、呼び出しでドキュメントが返されない場合でも、 try_next
を呼び出すごとに変更ストリームから取得される必要があることに注意してください。
再開トークンは、各変更ストリーム ドキュメントの_id
フィールドにも提供されます。 _id
フィールドの読み取りは推奨されません。このフィールドはアプリケーションによってプロジェクションされる可能性があり、 _id
フィールドのみを使用すると、 getMore
がドキュメントを返さない場合に再開トークンが進みません。