Customize a Pipeline to Filter Change Events
This usage example demonstrates how to configure a pipeline to customize the data that your MongoDB Kafka source connector consumes. A pipeline is a MongoDB aggregation pipeline composed of instructions to the database to filter or transform data.
MongoDB notifies the connector of data changes that match your aggregation pipeline on a change stream. A change stream is a sequence of events that describe data changes a client made to a MongoDB deployment in real-time. For more information, see the MongoDB server manual entry on Change Streams.
Example
Suppose you are coordinating an event and want to collect names and arrival times of each guest at a specific event. Whenever a guest checks into the event, an application inserts a new document that contains the following details:
{ "_id": ObjectId(...), "eventId": 321, "name": "Dorothy Gale", "arrivalTime": 2021-10-31T20:30:00.245Z }
You can define your connector pipeline
setting to instruct the change
stream to filter the change event information as follows:
Create change events for insert operations and omit events for all other types of operations.
Create change events only for documents that match the
fullDocument.eventId
value "321" and omit all other documents.Omit the
_id
andeventId
fields from thefullDocument
object using a projection.
To apply these transformations, assign the following aggregation pipeline
to your pipeline
setting:
pipeline=[{"$match": { "$and": [{"operationType": "insert"}, { "fullDocument.eventId": 321 }] } }, {"$project": { "fullDocument._id": 0, "fullDocument.eventId": 0 } } ]
Important
Make sure that the results of the pipeline contain the top-level
_id
and ns
fields of the payload
object. MongoDB uses
id
as the value of the resume token, and ns
to generate the
Kafka output topic name.
When the application inserts the sample document, your configured connector publishes the following record to your Kafka topic:
{ ... "payload": { _id: { _data: ... }, "operationType": "insert", "fullDocument": { "name": "Dorothy Gale", "arrivalTime": "2021-10-31T20:30:00.245Z", }, "ns": { ... }, "documentKey": { _id: {"$oid": ... } } } }
For more information on managing change streams with the source connector, see the connector documentation on Change Streams.