Change Streams
On this page
Overview
In this guide, you can learn about change streams and how they are used in a MongoDB Kafka source connector.
Change Streams
Change streams are a MongoDB feature that allow you to receive real-time updates on data changes. Change streams return change event documents.
A change event document contains idempotent instructions to describe a change that occurred in your MongoDB deployment and metadata related to that change. Change event documents are generated from data in the oplog.
Important
Change streams only run on MongoDB replica sets and sharded clusters
A standalone MongoDB instance cannot produce a change stream.
To view a list of all configuration options for change streams, see the Change Stream Properties page.
To learn more about change streams, see the following resources:
Change Streams in the MongoDB manual
An Introduction to Change Streams blog post
To learn more about the oplog, see the MongoDB manual entry on the Replica Set Oplog.
Aggregation
Use an aggregation pipeline to configure your source connector's change stream. You can configure a connector change stream to use an aggregation pipeline to perform tasks including the following operations:
Filter change events by operation type
Project specific fields
Update the value of fields
Add fields
Trim the amount of data generated by the change stream
To learn which aggregation operators you can use with a change stream, see the Modify Change Stream Output guide in the MongoDB manual.
To view examples that use an aggregation pipeline to modify a change stream, see the following pages:
Customize a Pipeline to Filter Change Events Usage Example
Copy Existing Data Usage Example
Change Event Structure
Find the complete structure of change event documents, including descriptions of all fields, in the MongoDB manual.
Note
The Full Document Option
If you want Kafka Connect to receive just the document created or modified
from your change operation, use the publish.full.document.only=true
option. For more information, see the Change Stream Properties
page.
Performance
The oplog is a special capped collection which cannot use indexes. For more information on this limitation, see Change Streams Production Recommendations.
If you want to improve change stream performance, use a faster disk for your MongoDB cluster and increase the size of your WiredTiger cache. To learn how to set your WiredTiger cache, see the guide on the WiredTiger Storage Engine.
Source Connectors
The source connector works by opening a single change stream with MongoDB and sending data from that change stream to Kafka Connect. Your source connector maintains its change stream for the duration of its runtime, and your connector closes its change stream when you stop it.
To view the available options to configure your source connector's change stream, see the Change Stream Properties page.
Resume Tokens
Your connector uses a resume token as its offset. An offset is a value
your connector stores in an Apache Kafka topic to keep track of what source data it
has processed. Your connector uses its offset value when it must recover from
a restart or crash. A resume token is a piece of data that references the
_id
field of a change event document in your MongoDB oplog.
If your source connector does not have an offset, such as when you start the connector for the first time, your connector starts a new change stream. Once your connector receives its first change event document and publishes that document to Apache Kafka, your connector stores the resume token of that document as its offset.
If the resume token value of your source connector's offset does not correspond to any entry in your MongoDB deployment's oplog, your connector has an invalid resume token. To learn how to recover from an invalid resume token, see the invalid token troubleshooting guide.
To learn more about resume tokens, see the following resources:
Resume a Change Stream in the MongoDB manual
Change Events in the MongoDB manual
To learn more about offsets, see the following resources:
Kafka Connect
offset.storage.topic
configuration option documentationKafka Connect
OffsetStorageReader
API documentation