Docs Menu

Docs HomeMongoDB Kafka Connector

Sink Connector Post Processors

On this page

  • Overview
  • How Post Processors Modify Data
  • How to Specify Post Processors
  • Prebuilt Post Processors
  • Configure the Document Id Adder Post Processor
  • Post Processor Examples
  • Allow List and Block List Examples
  • Allow List Projector Example
  • Block List Projector Example
  • Projection Wildcard Pattern Matching Examples
  • Field Renaming Examples
  • How to Create a Custom Post Processor

On this page, you can learn how to configure post processors in your Sink Connector. Post processors modify sink records that the connector reads from a Kafka topic before the connector stores it in your MongoDB collection. A few examples of data modifications post processors can make include:

  • Set the document _id field to a custom value

  • Include or exclude message key or value fields

  • Rename fields

You can use the prebuilt post processors included in the MongoDB Kafka Connector or implement your own.

See the following sections for more information on post processors:

Post processors modify data read from a Kafka topic. The connector stores the message in a SinkDocument class which contains a representation of the Kafka SinkRecord key and value fields. The connector sequentially applies any post processors specified in the configuration and stores the result in a MongoDB collection.

Post processors perform data modification tasks such as generating the document _id field, projecting message key or value fields, and renaming fields. You can use the prebuilt post processors included in the connector, or you can implement your own by extending the PostProcessor class.

Important

Post Processors and Change Data Capture (CDC) Handlers

You cannot apply a post processor to CDC handler event data.

You can specify one or more post processors in the post.processor.chain configuration setting as a comma-separated list. If you specify more than one, the connector applies them sequentially in which each post processor modifies the data output by the prior one.

To ensure the documents the connector writes to MongoDB contain unique _id fields, it automatically adds the DocumentIdAdder post processor in the first position of the chain if you do not otherwise include it.

The following example setting specifies that the connector should run the KafkaMetaAdder post processor first and then the AllowListValueProjector post processor on the output.

post.processor.chain=com.mongodb.kafka.connect.sink.processor.KafkaMetaAdder,com.mongodb.kafka.connect.sink.processor.AllowListValueProjector

The following table contains a list of all the post processors included in the MongoDB Kafka Sink Connector.

Post Processor Name
Description
DocumentIdAdder
Full Path:
com.mongodb.kafka.connect.sink.processor.DocumentIdAdder
Inserts an _id field determined by the configured strategy.
The default strategy is BsonOidStrategy.
For information on strategy options and configuration, see the Configure the Document Id Adder Post Processor section.
BlockListKeyProjector
Full Path:
com.mongodb.kafka.connect.sink.processor.BlockListKeyProjector
Removes matching key fields from the sink record.
For more information on configuration, see the Allow List and Block List Examples.
BlockListValueProjector
Full Path:
com.mongodb.kafka.connect.sink.processor.BlockListValueProjector
Removes matching value fields from the sink record.
For more information on configuration, see the Allow List and Block List Examples.
AllowListKeyProjector
Full Path:
com.mongodb.kafka.connect.sink.processor.AllowListKeyProjector
Includes only matching key fields from the sink record.
For more information on configuration, see the Allow List and Block List Examples.
AllowListValueProjector
Full Path:
com.mongodb.kafka.connect.sink.processor.AllowListValueProjector``
Includes only matching value fields from the sink record.
For more information on configuration, see the Allow List and Block List Examples.
KafkaMetaAdder
Full Path:
com.mongodb.kafka.connect.sink.processor.KafkaMetaAdder
Adds a field named "topic-partition-offset" and sets the value to the concatenation of Kafka topic, partition, and offset to the document.
RenameByMapping
Full Path:
com.mongodb.kafka.connect.sink.processor.field.renaming.RenameByMapping
Renames fields that are an exact match to a specified field name in the key or value document.
For information on configuration, see the Renaming by Mapping Example.
RenameByRegex
Full Path:
com.mongodb.kafka.connect.sink.processor.field.renaming.RenameByRegex
Renames fields that match a regular expression in the key or value document.
For information on configuration, see the Renaming by Regular Expression Example.

The DocumentIdAdder post processor uses a strategy to determine how it should format the _id field in the MongoDB document. A strategy defines preset behavior that you can customize for your use case.

You can specify a strategy for this post processor in the document.id.strategy setting as shown in the following example:

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.UuidStrategy

The following table shows a list of the strategies you can use to configure the DocumentIdAdder post processor:

Strategy Name
Description
BsonOidStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy
Generates a MongoDB BSON ObjectId.
Default strategy for the DocumentIdAdder post processor.
KafkaMetaDataStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.KafkaMetaDataStrategy
Builds a string composed of the concatenation of Kafka topic, partition, and offset.
FullKeyStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.FullKeyStrategy
Uses the complete key structure of the sink document to generate the value for the _id field.
Defaults to a blank document if no key exists.
ProvidedInKeyStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInKeyStrategy
Uses the _id field specified in the key structure of the sink document.
Throws an exception if the field is missing from the sink document.
ProvidedInValueStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy
Uses the _id field specified in the value structure of the sink document.
Throws an exception if the field is missing from the sink document.
PartialKeyStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.PartialKeyStrategy
Uses a block list or allow list projection of the sink document key structure.
Defaults to a blank document if no key exists.
PartialValueStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
Uses a block list or allow list projection of the sink document value structure.
Defaults to a blank document if no value exists.
UuidProvidedInKeyStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.UuidInKeyStrategy
Converts the _id key field to a UUID. The value must be either a string or binary type and must conform to the UUID format.
UuidProvidedInValueStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.UuidInValueStrategy
Converts the _id value field to a UUID. The value must be either a string or binary type and must conform to the UUID format.
UuidStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.UuidStrategy``
Uses a randomly generated UUID in string format.

If the built-in document id adder strategies do not cover your use case, you can define a custom document id strategy by following the steps below:

  1. Create a Java class that implements the interface IdStrategy and contains your custom configuration logic.

  2. Compile the class to a JAR file.

  3. Add the compiled JAR to the class path / plugin path for all your Kafka workers. For more information about plugin paths, see the Confluent documentation.

  4. Update the document.id.strategy setting to the full class name of your custom class in all your Kafka workers.

Note

Selected strategy may have implications on delivery semantics

BSON ObjectId or UUID strategies can only guarantee at-least-once delivery since the connector generates new ids on retries or when processing records again. Other strategies permit exactly-once delivery if you can guarantee the fields that form the document id are unique.

For example implementations of the IdStrategy interface, see the source code directory that contains id strategy implementations packaged with the connector.

This section shows examples of configuration and sample output of the following types of post processors:

The allow list and block list projector post processors determine which fields to include and exclude from the output.

When you use the allow list projector, the post processor only outputs data from the fields that you specify.

When you use the block list projector, the post process only omits data from the fields that you specify.

Note

You can use the "." (dot) notation to reference nested fields in the record. You can also use the notation to reference fields of documents in an array.

When you add a projector to your post processor chain, you must specify the projector type and whether to apply it to the key or value portion of the sink document.

See the following sections for example projector configurations and output.

Suppose your Kafka record value documents resembled the following user profile data:

{
"name": "Sally Kimball",
"age": 10,
"address": {
"city": "Idaville",
"country": "USA"
},
"hobbies": [
"reading",
"solving crime"
]
}

You can configure the AllowList value projector to store select data such as the "name", "address.city", and "hobbies" fields from your value documents using the following settings:

post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector
value.projection.type=AllowList
value.projection.list=name,address.city,hobbies

After the post processor applies the projection, it outputs the following record:

{
"name": "Sally Kimball",
"address": {
"city": "Idaville"
},
"hobbies": [
"reading",
"solving crime"
]
}

Suppose your Kafka record key documents resembled the following user identification data:

{
"username": "user5983",
"registration": {
"date": "2021-09-13",
"source": "mobile"
},
"authToken": {
"alg": "HS256",
"type": "JWT",
"payload": "zI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODk"
}
}

You can configure the BlockList key projector to omit the "authToken" and "registration.source" fields before storing the data with the following settings:

post.processor.chain=com.mongodb.kafka.connect.sink.processor.BlockListKeyProjector
key.projection.type=BlockList
key.projection.list=authToken,registration.source

After the post processor applies the projection, it outputs the following record:

{
"username": "user5983",
"registration": {
"date": "2021-09-13",
}
}

This section shows how you can configure the projector post processors to match wildcard patterns to match field names.

Pattern
Description
*
Matches any number of characters in the current level.
**
Matches any characters in the current level and all nested levels.

For the allow list and block list wildcard pattern matching examples in this section, refer to the following value document that contains weather measurements:

{
"city": "Springfield",
"temperature": {
"high": 28,
"low": 24,
"units": "C"
},
"wind_speed_10m": {
"average": 3,
"units": "km/h"
},
"wind_speed_80m": {
"average": 8,
"units": "km/h"
},
"soil_conditions": {
"temperature": {
"high": 22,
"low": 17,
"units": "C"
},
"moisture": {
"average": 340,
"units": "mm"
}
}
}

You can use the * wildcard to match multiple field names. The following example configuration matches the following fields:

  • The top-level field named "city"

  • The fields named "average" that are subdocuments of any top-level field that starts with the name "wind_speed".

post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector
value.projection.type=AllowList
value.projection.list=city,wind_speed*.average

After the post processor applies the allow list projection, it outputs the following record:

{
"city": "Springfield",
"wind_speed_10m": {
"average": 3,
},
"wind_speed_80m": {
"average": 8,
}
}

You can use the ** wildcard which matches objects at any level starting from the one at which you specify the wildcard. The following wildcard matching example projects any document that contains the field named "low".

post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector
value.projection.type=AllowList
value.projection.list=**.low

The post processor that applies the projection outputs the following record:

{
"temperature": {
"high": 28,
"low": 24,
"units": "C"
},
"soil_conditions": {
"temperature": {
"high": 22,
"low": 17,
"units": "C"
}
}
}

You can use the wildcard patterns to match fields at a specific document level as shown in the following block list configuration example:

post.processor.chain=com.mongodb.kafka.connect.sink.processor.BlockListValueProjector
value.projection.type=BlockList
value.projection.list=*.*.temperature
{
"city": "Springfield",
"temperature": {
"high": 28,
"low": 24,
"units": "C"
},
"wind_speed_10m": {
"average": 3,
"units": "km/h"
},
"wind_speed_80m": {
"average": 8,
"units": "km/h"
},
"soil_conditions": {
"moisture": {
"average": 340,
"units": "mm"
}
}
}

This section shows how you can configure the RenameByMapping and RenameByRegex field renamer post processors to update field names in a sink record. The field renaming settings specify the following:

  • Whether to update the key or value document in the record

  • The field names to update

  • The new field names

You must specify RenameByMapping and RenameByRegex settings in a JSON array. You can specify nested fields by using either dot notation or pattern matching.

The field renamer post processor examples use the following example sink record:

Key Document

{
"location": "Provence",
"date_month": "October",
"date_day": 17
}

Value Document

{
"flapjacks": {
"purchased": 598,
"size": "large"
}
}

The RenameByMapping post processor setting specifies one or more JSON objects that assign fields matching a string to a new name. Each object contains the text to match in the oldName element and the replacement text in the newName element as described in the table below.

Key Name
Description
oldName
Specifies whether to match fields in the key or value document and the field name to replace. The setting uses a "." character to separate the two values.
newName
Specifies the replacement field name for all matches of the field.

The following example property matches the "location" field of a key document and renames it to "country":

field.renamer.mapping=[{"oldName":"key.location", "newName":"country"}]

This setting instructs the RenameByMapping post processor to transform the original key document to the following document:

{
"country": "Provence",
"date_month": "October",
"date_day": 17
}

You can perform a similar field name assignment for value documents by specifying the value document with the appended field name in the oldName field as follows:

field.renamer.mapping=[{"oldName":"value.flapjacks", "newName":"crepes"}]

This setting instructs the RenameByMapping post processor to transform the original value document to the following document:

{
"crepes": {
"purchased": 598,
"size": "large"
}
}

You can also specify one or more mappings in the field.renamer.mapping property by using a JSON array in string format as shown in the following setting:

field.renamer.mapping=[{ "oldName":"key.location", "newName":"city" }, { "oldName":"value.crepes", "newName":"flapjacks" }]

The RenameByRegex post processor setting specifies the field names and text patterns that it should match, and replacement values for the matched text. You can specify one or more renaming expressions in JSON objects containing the fields described in the following table:

Key Name
Description
regexp
Contains a regular expression that matches fields to perform the replacement.
pattern
Contains a regular expression that matches on the text to replace.
replace
Contains the replacement text for all matches of the regular expression you defined in the pattern field.

The following example setting instructs the post processor to perform the following:

  • Match any field names in the key document that start with "date". In the set of matching fields, replace all text that matches the pattern _ with the - character.

  • Match any field names in the value document that are subdocuments of crepes. In the set of matching fields, replace all text that matches the pattern purchased with quantity.

field.renamer.regexp=[{"regexp":"^key\\.date.*$","pattern":"_","replace":"-"}, {"regexp":"^value\\.crepes\\..*","pattern":"purchased","replace":"quantity"}]

When the connector applies the post processor to the example key document and the example value document, it outputs the following:

Key Document

{
"location": "Provence",
"date-month": "October",
"date-day": 17
}

Value Document

{
"crepes": {
"quantity": 598,
"size": "large"
}
}

Warning

The renamer post processors do not overwrite existing field names

The target field names you set in your renamer post processors to may result in duplicate field names in the same document. To avoid this, the post processor skips renaming when it would duplicate an existing field name at the same level of the document.

If the built-in post processors do not cover your use case, you can create a custom post processor class using the following steps:

  1. Create a Java class that extends the PostProcessor abstract class.

  2. Override the process() method in your class. You can update the SinkDocument, a BSON representation of the sink record key and value fields, and access the original Kafka SinkRecord in your method.

  3. Compile the class to a JAR file.

  4. Add the compiled JAR to the class path / plugin path for all your Kafka workers. For more information about plugin paths, see the Confluent documentation on Manually Installing Community Connectors.

  5. Add your post processor full class name to the post processor chain configuration.

For example post processors, you can browse the source code for the built-in post processor classes.

←  Write Model StrategiesError Handling →