Change Streams
On this page
- Availability
- Connect
- Watch a Collection, Database, or Deployment
- Change Stream Performance Considerations
- Open A Change Stream
- Modify Change Stream Output
- Lookup Full Document for Update Operations
- Resume a Change Stream
- Use Cases
- Access Control
- Event Notification
- Collation
- Change Streams and Orphan Documents
- Change Streams with Document Pre- and Post-Images
Change streams allow applications to access real-time data changes without the complexity and risk of tailing the oplog. Applications can use change streams to subscribe to all data changes on a single collection, a database, or an entire deployment, and immediately react to them. Because change streams use the aggregation framework, applications can also filter for specific changes or transform the notifications at will.
Starting in MongoDB 5.1, change streams are optimized, providing more efficient resource utilization and faster execution of some aggregation pipeline stages.
Availability
Change streams are available for replica sets and sharded clusters:
Storage Engine.
The replica sets and sharded clusters must use the WiredTiger storage engine. Change streams can also be used on deployments that employ MongoDB's encryption-at-rest feature.
Replica Set Protocol Version.
The replica sets and sharded clusters must use replica set protocol version 1 (
pv1
).Read Concern "majority" Enablement.
Starting in MongoDB 4.2, change streams are available regardless of the
"majority"
read concern support; that is, read concernmajority
support can be either enabled (default) or disabled to use change streams.In MongoDB 4.0 and earlier, change streams are available only if
"majority"
read concern support is enabled (default).
Connect
Connections for a change stream can either use DNS seed lists
with the +srv
connection option or by listing the servers individually
in the connection string.
If the driver loses the connection to a change stream or the connection goes down, it attempts to reestablish a connection to the change stream through another node in the cluster that has a matching read preference. If the driver cannot find a node with the correct read preference, it throws an exception.
For more information, see Connection String URI Format.
Watch a Collection, Database, or Deployment
You can open change streams against:
Target | Description |
---|---|
A collection | You can open a change stream cursor for a single collection
(except The examples on this page use the MongoDB drivers to open and
work with a change stream cursor for a single collection. See
also the |
A database | Starting in MongoDB 4.0, you can open a change stream cursor for
a single database (excluding For the MongoDB driver method, refer to your driver
documentation. See also the |
A deployment | Starting in MongoDB 4.0, you can open a change stream cursor for
a deployment (either a replica set or a sharded cluster) to
watch for changes to all non-system collections across all
databases except for For the MongoDB driver method, refer to your driver
documentation. See also the |
Note
Change Stream Examples
The examples on this page use the MongoDB drivers to illustrate how to open a change stream cursor for a collection and work with the change stream cursor.
Change Stream Performance Considerations
If the amount of active change streams opened against a database exceeds the connection pool size, you may experience notification latency. Each change stream uses a connection and a getMore operation on the change stream for the period of time that it waits for the next event. To avoid any latency issues, you should ensure that the pool size is greater than the number of opened change streams. For details see the maxPoolSize setting.
Open A Change Stream
To open a change stream:
For a replica set, you can issue the open change stream operation from any of the data-bearing members.
For a sharded cluster, you must issue the open change stream operation from the
mongos
.
The following example opens a change stream for a collection and iterates over the cursor to retrieve the change stream documents. [1]
➤ Use the Select your language drop-down menu in the upper-right to set the language of the examples on this page.
The C examples below assume that you have connected to a MongoDB replica set and have accessed a database
that contains an inventory
collection.
mongoc_collection_t *collection; bson_t *pipeline = bson_new (); bson_t opts = BSON_INITIALIZER; mongoc_change_stream_t *stream; const bson_t *change; const bson_t *resume_token; bson_error_t error; collection = mongoc_database_get_collection (db, "inventory"); stream = mongoc_collection_watch (collection, pipeline, NULL /* opts */); mongoc_change_stream_next (stream, &change); if (mongoc_change_stream_error_document (stream, &error, NULL)) { MONGOC_ERROR ("%s\n", error.message); } mongoc_change_stream_destroy (stream);
The C# examples below assume that you have connected to a MongoDB replica set and have accessed a database
that contains an inventory
collection.
var cursor = inventory.Watch(); while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch var next = cursor.Current.First(); cursor.Dispose();
The Go examples below assume that you have connected to a MongoDB replica set and have accessed a database
that contains an inventory
collection.
cs, err := coll.Watch(ctx, mongo.Pipeline{}) assert.NoError(t, err) defer cs.Close(ctx) ok := cs.Next(ctx) next := cs.Current
The Java examples below assume that you have connected to a MongoDB replica set and have accessed a database
that contains an inventory
collection.
MongoCursor<ChangeStreamDocument<Document>> cursor = inventory.watch().iterator(); ChangeStreamDocument<Document> next = cursor.next();
The examples below assume that you have connected to a MongoDB replica set and have accessed a database
that contains an inventory
collection.
cursor = db.inventory.watch() document = await cursor.next()
The Node.js examples below assume that you have connected to a MongoDB replica set and have accessed a database
that contains an inventory
collection.
The following example uses stream to process the change events.
const collection = db.collection('inventory'); const changeStream = collection.watch(); changeStream.on('change', next => { // process next document });
Alternatively, you can also use iterator to process the change events:
const collection = db.collection('inventory'); const changeStream = collection.watch(); const next = await changeStream.next();
The examples below assume that you have connected to a MongoDB replica set and have accessed a database
that contains an inventory
collection.
$changeStream = $db->inventory->watch(); $changeStream->rewind(); $firstChange = $changeStream->current(); $changeStream->next(); $secondChange = $changeStream->current();
The Python examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory
collection.
cursor = db.inventory.watch() next(cursor)
The examples below assume that you have connected to a MongoDB replica set and have accessed a database
that contains an inventory
collection.
cursor = inventory.watch.to_enum next_change = cursor.next
The Swift (Async) examples below assume that you have
connected to a MongoDB replica set and have accessed a
database
that contains an inventory
collection.
let inventory = db.collection("inventory") // Option 1: retrieve next document via next() let next = inventory.watch().flatMap { cursor in cursor.next() } // Option 2: register a callback to execute for each document let result = inventory.watch().flatMap { cursor in cursor.forEach { event in // process event print(event) } }
The Swift (Sync) examples below assume that you have
connected to a MongoDB replica set and have accessed a
database
that contains an inventory
collection.
let inventory = db.collection("inventory") let changeStream = try inventory.watch() let next = changeStream.next()
To retrieve the data change event from the cursor, iterate the change stream cursor. For information on the change stream event, see Change Events.
While the connection to the MongoDB deployment remains open, the cursor remains open until one of the following occurs:
The cursor is explicitly closed.
An invalidate event occurs.
If the deployment is a sharded cluster, a shard removal may cause an open change stream cursor to close, and the closed change stream cursor may not be fully resumable.
Note
The lifecycle of an unclosed cursor is language-dependent.
[1] | Starting in MongoDB 4.0, you can specify a startAtOperationTime
to open the cursor at a particular point in time. If the specified
starting point is in the past, it must be in the time range of the
oplog. |
Modify Change Stream Output
➤ Use the Select your language drop-down menu in the upper-right to set the language of the examples on this page.
You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:
$replaceWith
(Available starting in MongoDB 4.2)$set
(Available starting in MongoDB 4.2)$unset
(Available starting in MongoDB 4.2)
pipeline = BCON_NEW ("pipeline", "[", "{", "$match", "{", "fullDocument.username", BCON_UTF8 ("alice"), "}", "}", "{", "$addFields", "{", "newField", BCON_UTF8 ("this is an added field!"), "}", "}", "]"); stream = mongoc_collection_watch (collection, pipeline, &opts); mongoc_change_stream_next (stream, &change); if (mongoc_change_stream_error_document (stream, &error, NULL)) { MONGOC_ERROR ("%s\n", error.message); } mongoc_change_stream_destroy (stream);
You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:
$replaceWith
(Available starting in MongoDB 4.2)$set
(Available starting in MongoDB 4.2)$unset
(Available starting in MongoDB 4.2)
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>() .Match(change => change.FullDocument["username"] == "alice" || change.OperationType == ChangeStreamOperationType.Delete) .AppendStage<ChangeStreamDocument<BsonDocument>, ChangeStreamDocument<BsonDocument>, BsonDocument>( "{ $addFields : { newField : 'this is an added field!' } }"); var collection = database.GetCollection<BsonDocument>("inventory"); using (var cursor = collection.Watch(pipeline)) { while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch var next = cursor.Current.First(); }
You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:
$replaceWith
(Available starting in MongoDB 4.2)$set
(Available starting in MongoDB 4.2)$unset
(Available starting in MongoDB 4.2)
pipeline := mongo.Pipeline{bson.D{{"$match", bson.D{{"$or", bson.A{ bson.D{{"fullDocument.username", "alice"}}, bson.D{{"operationType", "delete"}}}}}, }}} cs, err := coll.Watch(ctx, pipeline) assert.NoError(t, err) defer cs.Close(ctx) ok := cs.Next(ctx) next := cs.Current
You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:
$replaceWith
(Available starting in MongoDB 4.2)$set
(Available starting in MongoDB 4.2)$unset
(Available starting in MongoDB 4.2)
MongoClient mongoClient = MongoClients.create("mongodb://<username>:<password>@<host>:<port>"); // Select the MongoDB database and collection to open the change stream against MongoDatabase db = mongoClient.getDatabase("myTargetDatabase"); MongoCollection<Document> collection = db.getCollection("myTargetCollection"); // Create $match pipeline stage. List<Bson> pipeline = singletonList(Aggregates.match(Filters.or( Document.parse("{'fullDocument.username': 'alice'}"), Filters.in("operationType", asList("delete"))))); // Create the change stream cursor, passing the pipeline to the // collection.watch() method MongoCursor<Document> cursor = collection.watch(pipeline).iterator();
The pipeline
list includes a single $match
stage that
filters any operations where the username
is alice
, or
operations where the operationType
is delete
.
Passing the pipeline
to the watch()
method directs the
change stream to return notifications after passing them through the
specified pipeline
.
You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:
$replaceWith
(Available starting in MongoDB 4.2)$set
(Available starting in MongoDB 4.2)$unset
(Available starting in MongoDB 4.2)
pipeline = [ {"$match": {"fullDocument.username": "alice"}}, {"$addFields": {"newField": "this is an added field!"}}, ] cursor = db.inventory.watch(pipeline=pipeline) document = await cursor.next()
You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:
$replaceWith
(Available starting in MongoDB 4.2)$set
(Available starting in MongoDB 4.2)$unset
(Available starting in MongoDB 4.2)
The following example uses stream to process the change events.
const pipeline = [ { $match: { 'fullDocument.username': 'alice' } }, { $addFields: { newField: 'this is an added field!' } } ]; const collection = db.collection('inventory'); const changeStream = collection.watch(pipeline); changeStream.on('change', next => { // process next document });
Alternatively, you can also use iterator to process the change events:
const changeStreamIterator = collection.watch(pipeline); const next = await changeStreamIterator.next();
You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:
$replaceWith
(Available starting in MongoDB 4.2)$set
(Available starting in MongoDB 4.2)$unset
(Available starting in MongoDB 4.2)
$pipeline = [ ['$match' => ['fullDocument.username' => 'alice']], ['$addFields' => ['newField' => 'this is an added field!']], ]; $changeStream = $db->inventory->watch($pipeline); $changeStream->rewind(); $firstChange = $changeStream->current(); $changeStream->next(); $secondChange = $changeStream->current();
You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:
$replaceWith
(Available starting in MongoDB 4.2)$set
(Available starting in MongoDB 4.2)$unset
(Available starting in MongoDB 4.2)
pipeline = [ {"$match": {"fullDocument.username": "alice"}}, {"$addFields": {"newField": "this is an added field!"}}, ] cursor = db.inventory.watch(pipeline=pipeline) next(cursor)
You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:
$replaceWith
(Available starting in MongoDB 4.2)$set
(Available starting in MongoDB 4.2)$unset
(Available starting in MongoDB 4.2)
You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:
$replaceWith
(Available starting in MongoDB 4.2)$set
(Available starting in MongoDB 4.2)$unset
(Available starting in MongoDB 4.2)
let pipeline: [BSONDocument] = [ ["$match": ["fullDocument.username": "alice"]], ["$addFields": ["newField": "this is an added field!"]] ] let inventory = db.collection("inventory") // Option 1: use next() to iterate let next = inventory.watch(pipeline, withEventType: BSONDocument.self).flatMap { changeStream in changeStream.next() } // Option 2: register a callback to execute for each document let result = inventory.watch(pipeline, withEventType: BSONDocument.self).flatMap { changeStream in changeStream.forEach { event in // process event print(event) } }
You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:
$replaceWith
(Available starting in MongoDB 4.2)$set
(Available starting in MongoDB 4.2)$unset
(Available starting in MongoDB 4.2)
let pipeline: [BSONDocument] = [ ["$match": ["fullDocument.username": "alice"]], ["$addFields": ["newField": "this is an added field!"]] ] let inventory = db.collection("inventory") let changeStream = try inventory.watch(pipeline, withEventType: BSONDocument.self) let next = changeStream.next()
Tip
The _id field of the change stream
event document act as the resume token. Do not use the pipeline to modify or remove
the change stream event's _id
field.
Starting in MongoDB 4.2, change streams will throw an exception if the change stream aggregation pipeline modifies an event's _id field.
See Change Events for more information on the change stream response document format.
Lookup Full Document for Update Operations
By default, change streams only return the delta of fields during the update operation. However, you can configure the change stream to return the most current majority-committed version of the updated document.
➤ Use the Select your language drop-down menu in the upper-right to set the language of the examples on this page.
To return the most current majority-committed version of the updated
document, pass the "fullDocument"
option with the "updateLookup"
value to the
mongoc_collection_watch
method.
In the example below, all update operations notifications
include a fullDocument
field that represents the current
version of the document affected by the update operation.
BSON_APPEND_UTF8 (&opts, "fullDocument", "updateLookup"); stream = mongoc_collection_watch (collection, pipeline, &opts); mongoc_change_stream_next (stream, &change); if (mongoc_change_stream_error_document (stream, &error, NULL)) { MONGOC_ERROR ("%s\n", error.message); } mongoc_change_stream_destroy (stream);
To return the most current majority-committed version of the updated
document, pass "FullDocument = ChangeStreamFullDocumentOption.UpdateLookup"
to the
db.collection.watch()
method.
In the example below, all update operations notifications
include a FullDocument
field that represents the current
version of the document affected by the update operation.
var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup }; var cursor = inventory.Watch(options); while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch var next = cursor.Current.First(); cursor.Dispose();
To return the most current majority-committed version of the
updated document, SetFullDocument(options.UpdateLookup)
change stream option.
cs, err := coll.Watch(ctx, mongo.Pipeline{}, options.ChangeStream().SetFullDocument(options.UpdateLookup)) assert.NoError(t, err) defer cs.Close(ctx) ok := cs.Next(ctx) next := cs.Current
To return the most current majority-committed version of the updated
document, pass FullDocument.UPDATE_LOOKUP
to the
db.collection.watch.fullDocument()
method.
In the example below, all update operations notifications
include a FullDocument
field that represents the current
version of the document affected by the update operation.
cursor = inventory.watch().fullDocument(FullDocument.UPDATE_LOOKUP).iterator(); next = cursor.next();
To return the most current majority-committed version of the updated
document, pass full_document='updateLookup'
to the
db.collection.watch()
method.
In the example below, all update operations notifications
include a `full_document
field that represents the current
version of the document affected by the update operation.
cursor = db.inventory.watch(full_document="updateLookup") document = await cursor.next()
To return the most current majority-committed version of the updated
document, pass { fullDocument: 'updateLookup' }
to the
db.collection.watch()
method.
In the example below, all update operations notifications
include a fullDocument
field that represents the current
version of the document affected by the update operation.
The following example uses stream to process the change events.
const collection = db.collection('inventory'); const changeStream = collection.watch([], { fullDocument: 'updateLookup' }); changeStream.on('change', next => { // process next document });
Alternatively, you can also use iterator to process the change events:
const changeStreamIterator = collection.watch([], { fullDocument: 'updateLookup' }); const next = await changeStreamIterator.next();
To return the most current
majority-committed version of the updated document, pass
"fullDocument' => \MongoDB\Operation\ChangeStreamCommand::FULL_DOCUMENT_UPDATE_LOOKUP"
to the db.watch()
method.
In the example below, all update operations notifications
include a fullDocument
field that represents the current
version of the document affected by the update operation.
$changeStream = $db->inventory->watch([], ['fullDocument' => \MongoDB\Operation\Watch::FULL_DOCUMENT_UPDATE_LOOKUP]); $changeStream->rewind(); $firstChange = $changeStream->current(); $changeStream->next(); $secondChange = $changeStream->current();
To return the most current majority-committed version of the updated
document, pass full_document='updateLookup'
to the
db.collection.watch()
method.
In the example below, all update operations notifications
include a full_document
field that represents the current
version of the document affected by the update operation.
cursor = db.inventory.watch(full_document="updateLookup") next(cursor)
To return the most current majority-committed version of the updated
document, pass full_document: 'updateLookup'
to the
db.watch()
method.
In the example below, all update operations notifications
include a full_document
field that represents the current
version of the document affected by the update operation.
cursor = inventory.watch([], full_document: 'updateLookup').to_enum next_change = cursor.next
To return the most current majority-committed version of
the updated document, pass options:
ChangeStreamOptions(fullDocument: .updateLookup)
to the
watch()
method.
let inventory = db.collection("inventory") // Option 1: use next() to iterate let next = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) .flatMap { changeStream in changeStream.next() } // Option 2: register a callback to execute for each document let result = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) .flatMap { changeStream in changeStream.forEach { event in // process event print(event) } }
To return the most current majority-committed version of
the updated document, pass options:
ChangeStreamOptions(fullDocument: .updateLookup)
to the
watch()
method.
let inventory = db.collection("inventory") let changeStream = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) let next = changeStream.next()
Note
If there are one or more majority-committed operations that modified the updated document after the update operation but before the lookup, the full document returned may differ significantly from the document at the time of the update operation.
However, the deltas included in the change stream document always correctly describe the watched collection changes that applied to that change stream event.
See Change Events for more information on the change stream response document format.
Resume a Change Stream
Change streams are resumable by specifying a resume token to either resumeAfter or startAfter when opening the cursor.
resumeAfter
for Change Streams
You can resume a change stream after a specific event by passing a resume token
to resumeAfter
when opening the cursor.
See Resume Tokens for more information on the resume token.
Important
The oplog must have enough history to locate the operation associated with the token or the timestamp, if the timestamp is in the past.
You cannot use
resumeAfter
to resume a change stream after an invalidate event (for example, a collection drop or rename) closes the stream. Starting in MongoDB 4.2, you can use startAfter to start a new change stream after an invalidate event.
In the example below, the resumeAfter
option is appended to the stream options
to recreate the stream after it has been destroyed. Passing the _id
to
the change stream attempts to resume notifications starting after the
operation specified.
stream = mongoc_collection_watch (collection, pipeline, NULL); if (mongoc_change_stream_next (stream, &change)) { resume_token = mongoc_change_stream_get_resume_token (stream); BSON_APPEND_DOCUMENT (&opts, "resumeAfter", resume_token); mongoc_change_stream_destroy (stream); stream = mongoc_collection_watch (collection, pipeline, &opts); mongoc_change_stream_next (stream, &change); mongoc_change_stream_destroy (stream); } else { if (mongoc_change_stream_error_document (stream, &error, NULL)) { MONGOC_ERROR ("%s\n", error.message); } mongoc_change_stream_destroy (stream); }
In the example below, the resumeToken
is retrieved from the last change stream document
and passed to the Watch()
method as an option. Passing the resumeToken
to the Watch()
method directs
the change stream to attempt to resume notifications starting after the
operation specified in the resume token.
var resumeToken = previousCursor.GetResumeToken(); var options = new ChangeStreamOptions { ResumeAfter = resumeToken }; var cursor = inventory.Watch(options); cursor.MoveNext(); var next = cursor.Current.First(); cursor.Dispose();
You can use ChangeStreamOptions.SetResumeAfter
to specify the resume
token for the change stream. If the resumeAfter option is set,
the change stream resumes notifications after the operation
specified in the resume token. The SetResumeAfter
takes a
value that must resolve to a resume token, e.g.
resumeToken
in the example below.
resumeToken := original.ResumeToken() cs, err := coll.Watch(ctx, mongo.Pipeline{}, options.ChangeStream().SetResumeAfter(resumeToken)) assert.NoError(t, err) defer cs.Close(ctx) ok = cs.Next(ctx) result := cs.Current
You can use the resumeAfter()
method to resume
notifications after the operation specified in the resume
token. The resumeAfter()
method takes a value that must
resolve to a resume token, e.g. resumeToken
in the
example below.
BsonDocument resumeToken = next.getResumeToken(); cursor = inventory.watch().resumeAfter(resumeToken).iterator(); next = cursor.next();
You can use the resume_after
modifier to resume
notifications after the operation specified in the resume
token. The resume_after
modifier takes a value that must
resolve to a resume token, e.g. resume_token
in the
example below.
resume_token = cursor.resume_token cursor = db.inventory.watch(resume_after=resume_token) document = await cursor.next()
You can use the resumeAfter
option to resume
notifications after the operation specified in the resume
token. The resumeAfter
option takes a value that must
resolve to a resume token, e.g. resumeToken
in the
example below.
const collection = db.collection('inventory'); const changeStream = collection.watch(); let newChangeStream; changeStream.once('change', next => { const resumeToken = changeStream.resumeToken; changeStream.close(); newChangeStream = collection.watch([], { resumeAfter: resumeToken }); newChangeStream.on('change', next => { processChange(next); }); });
You can use the resumeAfter
option to resume
notifications after the operation specified in the resume
token. The resumeAfter
option takes a value that must
resolve to a resume token, e.g. $resumeToken
in the
example below.
$resumeToken = $changeStream->getResumeToken(); if ($resumeToken === null) { throw new \Exception('Resume token was not found'); } $changeStream = $db->inventory->watch([], ['resumeAfter' => $resumeToken]); $changeStream->rewind(); $firstChange = $changeStream->current();
You can use the resume_after
modifier to resume
notifications after the operation specified in the resume
token. The resume_after
modifier takes a value that must
resolve to a resume token, e.g. resume_token
in the
example below.
resume_token = cursor.resume_token cursor = db.inventory.watch(resume_after=resume_token) next(cursor)
You can use the resume_after
modifier to resume
notifications after the operation specified in the resume
token. The resume_after
modifier takes a value that must
resolve to a resume token, e.g. resume_token
in the
example below.
change_stream = inventory.watch cursor = change_stream.to_enum next_change = cursor.next resume_token = change_stream.resume_token new_cursor = inventory.watch([], resume_after: resume_token).to_enum resumed_change = new_cursor.next
You can use the resumeAfter
option to resume
notifications after the operation specified in the resume
token. The resumeAfter
option takes a value that must
resolve to a resume token, e.g. resumeToken
in the
example below.
let inventory = db.collection("inventory") inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) .flatMap { changeStream in changeStream.next().map { _ in changeStream.resumeToken }.always { _ in _ = changeStream.kill() } }.flatMap { resumeToken in inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken)).flatMap { newStream in newStream.forEach { event in // process event print(event) } } }
You can use the resumeAfter
option to resume
notifications after the operation specified in the resume
token. The resumeAfter
option takes a value that must
resolve to a resume token, e.g. resumeToken
in the
example below.
let inventory = db.collection("inventory") let changeStream = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) let next = changeStream.next() let resumeToken = changeStream.resumeToken let resumedChangeStream = try inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken)) let nextAfterResume = resumedChangeStream.next()
startAfter
for Change Streams
New in version 4.2.
You can start a new change stream after a specific event by passing a resume
token to startAfter
when opening the cursor. Unlike
resumeAfter, startAfter
can
resume notifications after an invalidate event
by creating a new change stream.
See Resume Tokens for more information on the resume token.
Important
The oplog must have enough history to locate the operation associated with the token or the timestamp, if the timestamp is in the past.
Resume Tokens
The resume token is available from multiple sources:
Source | Description |
---|---|
Each change event notification includes a resume token
on the _id field. | |
The This field only appears when using the | |
The getMore command includes a resume token on the
cursor.postBatchResumeToken field. |
Changed in version 4.2: Starting in MongoDB 4.2, change streams will throw an exception if the change stream aggregation pipeline modifies an event's _id field.
Tip
MongoDB provides a "snippet", an
extension to mongosh
, that decodes hex-encoded
resume tokens.
You can install and run the resumetoken
snippet from mongosh
:
snippet install resumetoken decodeResumeToken('<RESUME TOKEN>')
You can also run resumetoken
from the command line (without using mongosh
) if npm
is installed on your system:
npx mongodb-resumetoken-decoder <RESUME TOKEN>
See the following for more details on:
Resume Tokens from Change Events
Change event notifications include a resume token on the _id
field:
{ "_id": { "_data": "82635019A0000000012B042C0100296E5A1004AB1154ACACD849A48C61756D70D3B21F463C6F7065726174696F6E54797065003C696E736572740046646F63756D656E744B65790046645F69640064635019A078BE67426D7CF4D2000004" }, "operationType": "insert", "clusterTime": Timestamp({ "t": 1666193824, "i": 1 }), "collectionUUID": new UUID("ab1154ac-acd8-49a4-8c61-756d70d3b21f"), "wallTime": ISODate("2022-10-19T15:37:04.604Z"), "fullDocument": { "_id": ObjectId("635019a078be67426d7cf4d2"'), "name": "Giovanni Verga" }, "ns": { "db": "test", "coll": "names" }, "documentKey": { "_id": ObjectId("635019a078be67426d7cf4d2") } }
Resume Tokens from aggregate
When using the aggregate
command, the $changeStream
aggregation stage includes a resume token on the
cursor.postBatchResumeToken
field:
{ "cursor": { "firstBatch": [], "postBatchResumeToken": { "_data": "8263515EAC000000022B0429296E1404" }, "id": Long("4309380460777152828"), "ns": "test.names" }, "ok": 1, "$clusterTime": { "clusterTime": Timestamp({ "t": 1666277036, "i": 1 }), "signature": { "hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0), "keyId": Long("0") } }, "operationTime": Timestamp({ "t": 1666277036, "i": 1 }) }
Resume Tokens from getMore
The getMore
command also includes a resume token on the
cursor.postBatchResumeToken
field:
{ "cursor": { "nextBatch": [], "postBatchResumeToken": { "_data": "8263515979000000022B0429296E1404" }, "id": Long("7049907285270685005"), "ns": "test.names" }, "ok": 1, "$clusterTime": { "clusterTime": Timestamp( { "t": 1666275705, "i": 1 } ), "signature": { "hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0), "keyId": Long("0") } }, "operationTime": Timestamp({ "t": 1666275705, "i": 1 }) }
Use Cases
Change streams can benefit architectures with reliant business systems, informing downstream systems once data changes are durable. For example, change streams can save time for developers when implementing Extract, Transform, and Load (ETL) services, cross-platform synchronization, collaboration functionality, and notification services.
Access Control
For deployments enforcing Authentication and authorization:
To open a change stream against specific collection, applications must have privileges that grant
changeStream
andfind
actions on the corresponding collection.{ resource: { db: <dbname>, collection: <collection> }, actions: [ "find", "changeStream" ] } To open a change stream on a single database, applications must have privileges that grant
changeStream
andfind
actions on all non-system
collections in the database.{ resource: { db: <dbname>, collection: "" }, actions: [ "find", "changeStream" ] } To open a change stream on an entire deployment, applications must have privileges that grant
changeStream
andfind
actions on all non-system
collections for all databases in the deployment.{ resource: { db: "", collection: "" }, actions: [ "find", "changeStream" ] }
Event Notification
Change streams only notify on data changes that have persisted to a majority of data-bearing members in the replica set. This ensures that notifications are triggered only by majority-committed changes that are durable in failure scenarios.
For example, consider a 3-member replica set with a change stream cursor opened against the primary. If a client issues an insert operation, the change stream only notifies the application of the data change once that insert has persisted to a majority of data-bearing members.
If an operation is associated with a transaction, the change event document includes the
txnNumber
and the lsid
.
Collation
Starting in MongoDB 4.2, change streams use simple
binary
comparisons unless an explicit collation is provided. In earlier
versions, change streams opened on a single collection
(db.collection.watch()
) would inherit that collection's
default collation.
Change Streams and Orphan Documents
Starting in MongoDB 5.3, during range migration, change stream events are not generated for updates to orphaned documents.
Change Streams with Document Pre- and Post-Images
Starting in MongoDB 6.0, you can use change stream events to output the version of a document before and after changes (the document pre- and post-images):
The pre-image is the document before it was replaced, updated, or deleted. There is no pre-image for an inserted document.
The post-image is the document after it was inserted, replaced, or updated. There is no post-image for a deleted document.
Enable
changeStreamPreAndPostImages
for a collection usingdb.createCollection()
,create
, orcollMod
.
Pre- and post-images are not available for a change stream event if the images were:
Not enabled on the collection at the time of a document update or delete operation.
Removed after the pre- and post-image retention time set in
expireAfterSeconds
.The following example sets
expireAfterSeconds
to100
seconds:use admin db.runCommand( { setClusterParameter: { changeStreamOptions: { preAndPostImages: { expireAfterSeconds: 100 } } } } ) The following example returns the current
changeStreamOptions
settings, includingexpireAfterSeconds
:db.adminCommand( { getClusterParameter: "changeStreamOptions" } ) Setting
expireAfterSeconds
tooff
uses the default retention policy: pre- and post-images are retained until the corresponding change stream events are removed from the oplog.If a change stream event is removed from the oplog, then the corresponding pre- and post-images are also deleted regardless of the
expireAfterSeconds
pre- and post-image retention time.
Additional considerations:
Enabling pre- and post-images consumes storage space and adds processing time. Only enable pre- and post-images if you need them.
Limit the change stream event size to less than 16 megabytes. To limit the event size, you can:
Limit the document size to 8 megabytes. You can request pre- and post-images simultaneously in the change stream output if other change stream event fields like
updateDescription
are not large.Request only post-images in the change stream output for documents up to 16 megabytes if other change stream event fields like
updateDescription
are not large.Request only pre-images in the change stream output for documents up to 16 megabytes if:
document updates affect only a small fraction of the document structure or content, and
do not cause a
replace
change event. Areplace
event always includes the post-image.
To request a pre-image, you set
fullDocumentBeforeChange
torequired
orwhenAvailable
indb.collection.watch()
. To request a post-image, you setfullDocument
using the same method.Pre-images are written to the
config.system.preimages
collection.The
config.system.preimages
collection may become large. To limit the collection size, you can setexpireAfterSeconds
time for the pre-images as shown earlier.Pre-images are removed asynchronously by a background process.
Important
Backward-Incompatible Feature
Starting in MongoDB 6.0, if you are using document pre- and post-images
for change streams, you must disable
changeStreamPreAndPostImages for each collection using
the collMod
command before you can downgrade to an earlier
MongoDB version.
Tip
See also:
For change stream events and output, see Change Events.
To watch a collection for changes, see
db.collection.watch()
.For complete examples with the change stream output, see Change Streams with Document Pre- and Post-Images.
For complete examples with the change stream output, see Change Streams with Document Pre- and Post-Images.