Docs Menu
Docs Home
/
MongoDB Kafka Connector
/ /

Customize a Pipeline to Filter Change Events

This usage example demonstrates how to configure a pipeline to customize the data that your MongoDB Kafka source connector consumes. A pipeline is a MongoDB aggregation pipeline composed of instructions to the database to filter or transform data.

MongoDB notifies the connector of data changes that match your aggregation pipeline on a change stream. A change stream is a sequence of events that describe data changes a client made to a MongoDB deployment in real-time. For more information, see the MongoDB server manual entry on Change Streams.

Suppose you are coordinating an event and want to collect names and arrival times of each guest at a specific event. Whenever a guest checks into the event, an application inserts a new document that contains the following details:

{
"_id": ObjectId(...),
"eventId": 321,
"name": "Dorothy Gale",
"arrivalTime": 2021-10-31T20:30:00.245Z
}

You can define your connector pipeline setting to instruct the change stream to filter the change event information as follows:

  • Create change events for insert operations and omit events for all other types of operations.

  • Create change events only for documents that match the fullDocument.eventId value "321" and omit all other documents.

  • Omit the _id and eventId fields from the fullDocument object using a projection.

To apply these transformations, assign the following aggregation pipeline to your pipeline setting:

pipeline=[{"$match": { "$and": [{"operationType": "insert"}, { "fullDocument.eventId": 321 }] } }, {"$project": { "fullDocument._id": 0, "fullDocument.eventId": 0 } } ]

Important

Make sure that the results of the pipeline contain the top-level _id and ns fields of the payload object. MongoDB uses id as the value of the resume token, and ns to generate the Kafka output topic name.

When the application inserts the sample document, your configured connector publishes the following record to your Kafka topic:

{
...
"payload": {
_id: { _data: ... },
"operationType": "insert",
"fullDocument": {
"name": "Dorothy Gale",
"arrivalTime": "2021-10-31T20:30:00.245Z",
},
"ns": { ... },
"documentKey": {
_id: {"$oid": ... }
}
}
}

For more information on managing change streams with the source connector, see the connector documentation on Change Streams.

Back

Usage Examples