MongoDB\Collection::watch()
New in version 1.3.
Definition
MongoDB\Collection::watch()
Executes a change stream operation on the collection. The change stream can be watched for collection-level changes.
function watch( array $pipeline = [], array $options = [] ): MongoDB\ChangeStream
Parameters
$pipeline
: array|object- The pipeline of stages to append to an initial
$changeStream
stage. $options
: arrayAn array specifying the desired options.
NameTypeDescriptionbatchSize
integer
Specifies the batch size for the cursor, which will apply to both the initial
aggregate
command and any subsequentgetMore
commands. This determines the maximum number of change events to return in each response from the server.Irrespective of the
batchSize
option, the initialaggregate
command response for a change stream generally does not include any documents unless another option is used to configure its starting point (e.g.startAfter
).codec
MongoDB\Codec\DocumentCodec
The codec to use for encoding or decoding documents. This option is mutually exclusive with the
typeMap
option.Defaults to the collection's codec. Inheritance for a default
codec
option takes precedence over that of thetypeMap
option.New in version 1.17.
collation
array|object
Collation allows users to specify language-specific rules for string comparison, such as rules for lettercase and accent marks. When specifying collation, the
locale
field is mandatory; all other collation fields are optional. For descriptions of the fields, see Collation Document.Starting in MongoDB 4.2, defaults to simple binary comparison if omitted. In earlier versions, change streams opened on a single collection would inherit the collection's default collation.
comment
mixed
Enables users to specify an arbitrary comment to help trace the operation through the database profiler, currentOp output, and logs.
The comment can be any valid BSON type since MongoDB 4.4. Earlier server versions only support string values.
New in version 1.13.
fullDocument
string
Determines how the
fullDocument
response field will be populated for update operations.By default, change streams only return the delta of fields (via an
updateDescription
field) for update operations andfullDocument
is omitted. Insert and replace operations always include thefullDocument
field. Delete operations omit the field as the document no longer exists.Specify "updateLookup" to return the current majority-committed version of the updated document.
MongoDB 6.0+ allows returning the post-image of the modified document if the collection has
changeStreamPreAndPostImages
enabled. Specify "whenAvailable" to return the post-image if available or a null value if not. Specify "required" to return the post-image if available or raise an error if not.The following values are supported:
MongoDB\Operation\Watch::FULL_DOCUMENT_UPDATE_LOOKUP
MongoDB\Operation\Watch::FULL_DOCUMENT_WHEN_AVAILABLE
MongoDB\Operation\Watch::FULL_DOCUMENT_REQUIRED
This is an option of the
$changeStream
pipeline stage.fullDocumentBeforeChange
string
Determines how the
fullDocumentBeforeChange
response field will be populated. By default, the field is omitted.MongoDB 6.0+ allows returning the pre-image of the modified document if the collection has
changeStreamPreAndPostImages
enabled. Specify "whenAvailable" to return the pre-image if available or a null value if not. Specify "required" to return the pre-image if available or raise an error if not.The following values are supported:
MongoDB\Operation\Watch::FULL_DOCUMENT_BEFORE_CHANGE_WHEN_AVAILABLE
MongoDB\Operation\Watch::FULL_DOCUMENT_BEFORE_CHANGE_REQUIRED
This is an option of the
$changeStream
pipeline stage.New in version 1.13.
maxAwaitTimeMS
integer
Positive integer denoting the time limit in milliseconds for the server to block a getMore operation if no data is available.
readConcern
Read concern to use for the operation. Defaults to the collection's read concern.
readPreference
Read preference to use for the operation. Defaults to the collection's read preference.
This is used for both the initial change stream aggregation and for server selection during an automatic resume.
resumeAfter
array|object
Specifies the logical starting point for the new change stream. The
_id
field in documents returned by the change stream may be used here.Using this option in conjunction with
startAfter
and/orstartAtOperationTime
will result in a server error. The options are mutually exclusive.This is an option of the
$changeStream
pipeline stage.session
Client session to associate with the operation.
showExpandedEvents
boolean
If true, instructs the server to include additional DDL events in the change stream. The additional events that may be included are:
createIndexes
dropIndexes
modify
create
shardCollection
reshardCollection
(server 6.1+)refineCollectionShardKey
(server 6.1+)
This is not supported for server versions prior to 6.0 and will result in an exception at execution time if used.
This is an option of the
$changeStream
pipeline stage.New in version 1.13.
startAfter
array|object
Specifies the logical starting point for the new change stream. The
_id
field in documents returned by the change stream may be used here. UnlikeresumeAfter
, this option can be used with a resume token from an "invalidate" event.Using this option in conjunction with
resumeAfter
and/orstartAtOperationTime
will result in a server error. The options are mutually exclusive.This is not supported for server versions prior to 4.2 and will result in an exception at execution time if used.
This is an option of the
$changeStream
pipeline stage.New in version 1.5.
startAtOperationTime
If specified, the change stream will only provide changes that occurred at or after the specified timestamp. Command responses from a MongoDB 4.0+ server include an
operationTime
that can be used here. By default, theoperationTime
returned by the initialaggregate
command will be used if available.Using this option in conjunction with
resumeAfter
and/orstartAfter
will result in a server error. The options are mutually exclusive.This is not supported for server versions prior to 4.0 and will result in an exception at execution time if used.
This is an option of the
$changeStream
pipeline stage.typeMap
array
The type map to apply to cursors, which determines how BSON documents are converted to PHP values. Defaults to the collection's type map.
Return Values
A MongoDB\ChangeStream
object, which allows for iteration of
events in the change stream via the Iterator interface.
Errors/Exceptions
MongoDB\Exception\UnexpectedValueException
if the command
response from the server was malformed.
MongoDB\Exception\UnsupportedException
if options are used and
not supported by the selected server (e.g. collation
, readConcern
,
writeConcern
).
MongoDB\Exception\InvalidArgumentException
for errors related to
the parsing of parameters or options.
MongoDB\Driver\Exception\RuntimeException for other errors at the extension level (e.g. connection errors).
Examples
This example reports events while iterating a change stream.
$uri = 'mongodb://rs1.example.com,rs2.example.com/?replicaSet=myReplicaSet'; $collection = (new MongoDB\Client($uri))->test->inventory; $changeStream = $collection->watch(); for ($changeStream->rewind(); true; $changeStream->next()) { if ( ! $changeStream->valid()) { continue; } $event = $changeStream->current(); if ($event['operationType'] === 'invalidate') { break; } $ns = sprintf('%s.%s', $event['ns']['db'], $event['ns']['coll']); $id = json_encode($event['documentKey']['_id']); switch ($event['operationType']) { case 'delete': printf("Deleted document in %s with _id: %s\n\n", $ns, $id); break; case 'insert': printf("Inserted new document in %s\n", $ns); echo json_encode($event['fullDocument']), "\n\n"; break; case 'replace': printf("Replaced new document in %s with _id: %s\n", $ns, $id); echo json_encode($event['fullDocument']), "\n\n"; break; case 'update': printf("Updated document in %s with _id: %s\n", $ns, $id); echo json_encode($event['updateDescription']), "\n\n"; break; } }
Assuming that a document was inserted, updated, and deleted while the above script was iterating the change stream, the output would then resemble:
Inserted new document in test.user {"_id":{"$oid":"5b329c4874083047cc05e60a"},"username":"bob"} Inserted new document in test.products {"_id":{"$oid":"5b329c4d74083047cc05e60b"},"name":"Widget","quantity":5} Updated document in test.user with _id: {"$oid":"5b329a4f74083047cc05e603"} {"updatedFields":{"username":"robert"},"removedFields":[]}
See Also
Aggregation Pipeline documentation in the MongoDB Manual
Change Streams documentation in the MongoDB manual
Change Events documentation in the MongoDB manual