Docs Menu
Docs Home
/ / /
Ruby MongoDB ドライバー
/

変更ストリーム

項目一覧

  • コレクションの変更の監視
  • データベースの変更の監視
  • クラスターでの変更の監視
  • 変更ストリームを閉じる
  • 変更ストリームの再開

MongoDB サーバーのバージョン 3.6 以降では、集計フレームワークで新しい $changeStreamパイプライン ステージがサポートされています。 このステージを集計パイプラインの最初に指定すると、ユーザーは特定のコレクションに対するすべての変更に対して通知を送信するようリクエストできます。 MongoDB 4.0 以降、変更ストリームはコレクションに加えてデータベースとクラスターでサポートされています。

Ruby ドライバーは、この新しいパイプライン ステージを使用して特定のコレクション、データベース、またはクラスターに対する変更の通知を受信するための API を提供します。 パイプライン演算子と集計フレームワークを直接使用して変更ストリームを直接作成できますが、タイムアウト、ネットワークエラー、サーバーエラーが発生した場合にドライバーが変更ストリームを一度に再開するため、以下で説明するドライバー API を使用することをお勧めします。フェイルオーバーが発生しているか、別のタイプの再開可能なエラーが発生している。

サーバー上の変更ストリームには、 "majority"の読み取り保証が必要であるか、読み取り保証がありません。

変更ストリームが JRuby では正しく動作しないのは、 ここで 説明されている問題が原因です 。つまり、JRuby はバックグラウンドの緑色のスレッドの列挙型で#nextを早期に評価するため、変更ストリームで#nextを呼び出すと、getMores がバックグラウンドのループで呼び出されます。

コレクション変更ストリームは、コレクションで#watchメソッドを呼び出すことで作成されます。

client = Mongo::Client.new([ '127.0.0.1:27017' ], :database => 'test')
collection = client[:test]
stream = collection.watch
collection.insert_one(a: 1)
doc = stream.to_enum.next
process(doc)

通知は利用可能になったときに受け取ることもできます。

stream = collection.watch
enum = stream.to_enum
while doc = enum.next
process(doc)
end

nextメソッドは、変更が利用可能になるまでクラスターをブロックし、ポーリングします。 ブロックせずに変更ストリームを反復処理するには、 try_nextメソッドを使用します。このメソッドは最大_await_time_ms ミリ秒まで待機し、変更が受信されない場合は nil を返します。 再開できないエラーが発生した場合、 nexttry_nextの両方で例外が発生します。 コレクションから変更を無期限に読み取る例については、以下の「 変更ストリームの再開 」セクションを参照してください。

変更ストリームは、 集計フレームワークパイプライン演算子 形式のフィルターを取ることができます。

stream = collection.watch([{'$match' => { 'operationType' => {'$in' => ['insert', 'replace'] } } },
{'$match' => { 'fullDocument.n' => { '$gte' => 1 } } }
])
enum = stream.to_enum
while doc = enum.next
process(doc)
end

データベース変更ストリームは、データベース内のコレクションの変更や、データベースの削除などのデータベース全体のイベントを通知します。

データベース変更ストリームは、データベース オブジェクトで#watchメソッドを呼び出すことで作成されます。

client = Mongo::Client.new([ '127.0.0.1:27017' ], :database => 'test')
database = client.database
stream = database.watch
client[:test].insert_one(a: 1)
doc = stream.to_enum.next
process(doc)

クラスター変更ストリームは、任意のコレクション、クラスター内の任意のデータベース、およびクラスター全体のイベントの変更を通知します。

クラスター変更ストリームは、クライアント オブジェクト(クラスター オブジェクトではない)で#watchメソッドを呼び出すことで作成されます。

client = Mongo::Client.new([ '127.0.0.1:27017' ], :database => 'test')
stream = client.watch
client[:test].insert_one(a: 1)
doc = stream.to_enum.next
process(doc)

変更ストリームを閉じるには、その#closeメソッドを呼び出します。

stream.close

変更ストリームは、初期集計と次の変更バッチを受信するためのgetMoreリクエストの 2 種類の操作で構成されています。

ドライバーは、ネットワークエラーが発生した場合と、状態が変更されたことを示すエラーを返すと、各getMore操作を自動的に再試行します(たとえば、プライマリでなくなった場合)。 ドライバーは最初の集計を再試行しません。

実際には、これはたとえば次のことを意味します。

  • クラスターに"majority"読み込み設定(read preference)を満たすのに十分な使用可能なノードがない場合、 collection.watchの呼び出しは失敗します。

  • collection.watchが正常に返された以降、クラスターで選挙が行われたり、ノードが失われた場合でも、十分な迅速に回復した場合でも、 nextまたはeachメソッドによる変更ストリームの読み取りは、アプリケーションに対して透過的に続行されます。

変更を失ったり変更を複数回処理したりすることなく、変更を無期限かつ確実に監視するには、アプリケーションは変更ストリームの再開トークンを追跡し、ドライバーの自動再開も失敗する拡張エラーが発生したときに変更ストリームを再起動する必要があります。 次のコード スニペットは、変更ストリームを無期限に反復処理する方法、 resume_token変更ストリーム メソッドを使用して再開トークンを取得する方法、すべての MongoDB またはネットワーク エラーで:resume_afterオプションを使用して変更ストリームを再起動する方法を示しています。

token = nil
loop do
begin
stream = collection.watch([], resume_after: token)
enum = stream.to_enum
while doc = enum.next
process(doc)
token = stream.resume_token
end
rescue Mongo::Error
sleep 1
end
end

上記の反復はenum.next呼び出しでブロックされており、このコードを実行している Ruby プロセスが終了した場合には処理の再開は許可されません。 このドライバーは、変更ストリームに変更がない場合に無期限にブロックする代わりにnil (待機期間後に)を返すtry_nextメソッドも提供します。 try_nextメソッドを使用すると、特定のリクエストが変更を返さない場合でも、 getMoreリクエストごとに再開トークンが保持されます。これにより、再開トークンは oplog の先頭に残り、アプリケーションは次の機会を利用できます:変更を処理するプロセスが終了した場合に永続化されます。

token = nil
loop do
begin
stream = collection.watch([], resume_after: token)
enum = stream.to_enum
doc = enum.try_next
if doc
process(doc)
end
token = stream.resume_token
# Persist +token+ to support resuming processing upon process restart
rescue Mongo::Error
sleep 1
end
end

再開トークンは、呼び出しでドキュメントが返されない場合でも、 try_nextを呼び出すごとに変更ストリームから取得される必要があることに注意してください。

再開トークンは、各変更ストリーム ドキュメントの_idフィールドにも提供されます。 _idフィールドの読み取りは推奨されません。このフィールドはアプリケーションによってプロジェクションされる可能性があり、 _idフィールドのみを使用すると、 getMoreがドキュメントを返さない場合に再開トークンが進みません。

戻る

GridFS