Docs Menu

Docs HomeMongoDB Kafka Connector

Customize a Pipeline to Filter Change Events

This usage example demonstrates how to configure a pipeline to customize the data that your source connector consumes. A pipeline is a MongoDB aggregation pipeline composed of instructions to the database to filter or transform data.

MongoDB notifies the connector of data changes that match your aggregation pipeline on a change stream. A change stream is a sequence of events that describe data changes a client made to a MongoDB deployment in real-time. For more information, see the MongoDB Server manual entry on Change Streams.

Suppose you're an event coordinator who needs to collect names and arrival times of each guest at a specific event. Whenever a guest checks into the event, an application inserts a new document that contains the following details:

{
"_id": ObjectId(...),
"eventId": 321,
"name": "Dorothy Gale",
"arrivalTime": 2021-10-31T20:30:00.245Z
}

You can define your connector pipeline setting to instruct the change stream to filter the change event information as follows:

  • Create change events for insert operations and omit events for all other types of operations.

  • Create change events only for documents that match the fullDocument.eventId value "321" and omit all other documents.

  • Omit the _id and eventId fields from the fullDocument object using a projection.

To apply these transformations, assign the following aggregation pipeline to your pipeline setting:

pipeline=[{"$match": { "$and": [{"operationType": "insert"}, { "fullDocument.eventId": 321 }] } }, {"$project": { "fullDocument._id": 0, "fullDocument.eventId": 0 } } ]

Important

Make sure that the results of the pipeline contain the top-level _id field of the payload object, which MongoDB uses as the value of the resume token.

When the application inserts the sample document, your configured connector publishes the following record to your Kafka topic:

{
...
"payload": {
_id: { _data: ... },
"operationType": "insert",
"fullDocument": {
"name": "Dorothy Gale",
"arrivalTime": "2021-10-31T20:30:00.245Z",
},
"ns": { ... },
"documentKey": {
_id: {"$oid": ... }
}
}
}

For more information on managing change streams with the source connector, see the connector documentation on Change Streams.

←  Usage ExamplesListen for Changes on Multiple Sources →