Change Streams
On this page
As of version 3.6 of the MongoDB server, a new $changeStream
pipeline stage
is supported in the aggregation framework. Specifying this stage first in an
aggregation pipeline allows users to request that notifications are sent for all
changes to a particular collection. As of MongoDB 4.0, change streams are
supported on databases and clusters in addition to collections.
The Ruby driver provides an API for receiving notifications for changes to a particular collection, database or cluster using this new pipeline stage. Although you can create a change stream using the pipeline operator and aggregation framework directly, it is recommended to use the driver API described below as the driver resumes the change stream one time if there is a timeout, a network error, a server error indicating that a failover is taking place or another type of a resumable error.
Change streams on the server require a "majority"
read concern or no
read concern.
Change streams do not work properly with JRuby because of the issue documented here.
Namely, JRuby eagerly evaluates #next
on an Enumerator in a background
green thread, therefore calling #next
on the change stream will cause
getMores to be called in a loop in the background.
Watching for Changes on a Collection
A collection change stream is created by calling the #watch
method on a
collection:
client = Mongo::Client.new([ '127.0.0.1:27017' ], :database => 'test') collection = client[:test] stream = collection.watch collection.insert_one(a: 1) doc = stream.to_enum.next process(doc)
You can also receive the notifications as they become available:
stream = collection.watch enum = stream.to_enum while doc = enum.next process(doc) end
The next
method blocks and polls the cluster until a change is available.
Use the try_next
method to iterate a change stream without blocking; this
method will wait up to max_await_time_ms milliseconds for changes from the server,
and if no changes are received it will return nil. If there is a non-resumable
error, both next
and try_next
will raise an exception.
See Resuming a Change Stream section below for an example that reads
changes from a collection indefinitely.
The change stream can take filters in the aggregation framework pipeline operator format:
stream = collection.watch([{'$match' => { 'operationType' => {'$in' => ['insert', 'replace'] } } }, {'$match' => { 'fullDocument.n' => { '$gte' => 1 } } } ]) enum = stream.to_enum while doc = enum.next process(doc) end
Watching for Changes on a Database
A database change stream notifies on changes on any collection within the database as well as database-wide events, such as the database being dropped.
A database change stream is created by calling the #watch
method on a
database object:
client = Mongo::Client.new([ '127.0.0.1:27017' ], :database => 'test') database = client.database stream = database.watch client[:test].insert_one(a: 1) doc = stream.to_enum.next process(doc)
Watching for Changes on a Cluster
A cluster change stream notifies on changes on any collection, any database within the cluster as well as cluster-wide events.
A cluster change stream is created by calling the #watch
method on a
client object (not the cluster object):
client = Mongo::Client.new([ '127.0.0.1:27017' ], :database => 'test') stream = client.watch client[:test].insert_one(a: 1) doc = stream.to_enum.next process(doc)
Closing a Change Stream
You can close a change stream by calling its #close
method:
stream.close
Resuming a Change Stream
A change stream consists of two types of operations: the initial aggregation
and getMore
requests to receive the next batch of changes.
The driver will automatically retry each getMore
operation once on
network errors and when the server returns an error indicating it changed
state (for example, it is no longer the primary). The driver does not retry
the initial aggregation.
In practical terms this means that, for example:
Calling
collection.watch
will fail if the cluster does not have enough available nodes to satisfy the"majority"
read preference.Once
collection.watch
successfully returns, if the cluster subsequently experiences an election or loses a node, but heals quickly enough, change stream reads vianext
oreach
methods will continue transparently to the application.
To indefinitely and reliably watch for changes without losing any changes or
processing a change more than once, the application must track the resume
token for the change stream and restart the change stream when it experiences
extended error conditions that cause the driver's automatic resume to also
fail. The following code snippet shows an example of iterating a change stream
indefinitely, retrieving the resume token using the resume_token
change
stream method and restarting the change stream using the :resume_after
option on all MongoDB or network errors:
token = nil loop do begin stream = collection.watch([], resume_after: token) enum = stream.to_enum while doc = enum.next process(doc) token = stream.resume_token end rescue Mongo::Error sleep 1 end end
The above iteration is blocking at the enum.next
call, and does not
permit resuming processing in the event the Ruby process running this code
is terminated. The driver also provides the try_next
method which returns
nil
(after a small waiting period) instead of blocking indefinitely when
there are no changes in the change stream. Using the try_next
method,
the resume token may be persisted after each getMore
request, even when
a particular request does not return any changes, such that the resume token
remains at the top of the oplog and the application has an opportunity to
persist it should the process handling changes terminates:
token = nil loop do begin stream = collection.watch([], resume_after: token) enum = stream.to_enum doc = enum.try_next if doc process(doc) end token = stream.resume_token # Persist +token+ to support resuming processing upon process restart rescue Mongo::Error sleep 1 end end
Note that the resume token should be retrieved from the change stream after
every try_next
call, even if the call returned no document.
The resume token is also provided in the _id
field of each change stream
document. Reading the _id
field is not recommended because it may be
projected out by the application, and because using only the _id
field
would not advance the resume token when a getMore
returns no documents.