Sink Connector Post Processors
On this page
- Overview
- How Post Processors Modify Data
- How to Specify Post Processors
- Prebuilt Post Processors
- Configure the Document Id Adder Post Processor
- Post Processor Examples
- Allow List and Block List Examples
- Allow List Projector Example
- Block List Projector Example
- Projection Wildcard Pattern Matching Examples
- Field Renaming Examples
- How to Create a Custom Post Processor
Overview
On this page, you can learn how to configure post processors in your MongoDB Kafka sink connector. Post processors modify sink records that the connector reads from a Kafka topic before the connector stores it in your MongoDB collection. A few examples of data modifications post processors can make include:
Set the document
_id
field to a custom valueInclude or exclude message key or value fields
Rename fields
You can use the prebuilt post processors included in the connector or implement your own.
See the following sections for more information on post processors:
How Post Processors Modify Data
Post processors modify data read from a Kafka topic. The connector stores
the message in a SinkDocument
class which contains a representation of
the Kafka SinkRecord
key and value fields. The connector sequentially
applies any post processors specified in the configuration and stores the
result in a MongoDB collection.
Post processors perform data modification tasks such as generating
the document _id
field, projecting message key or value fields, and
renaming fields. You can use the prebuilt post processors included in the
connector, or you can implement your own by extending the
PostProcessor
class.
Important
Post Processors and Change Data Capture (CDC) Handlers
You cannot apply a post processor to CDC handler event data. If you specify both, the connector logs a warning.
How to Specify Post Processors
You can specify one or more post processors in the post.processor.chain
configuration setting as a comma-separated list. If you specify more than one,
the connector applies them sequentially in which each post processor modifies
the data output by the prior one.
To ensure the documents the connector writes to MongoDB contain unique _id
fields, it automatically adds the DocumentIdAdder
post processor in the
first position of the chain if you do not otherwise include it.
The following example setting specifies that the connector should run the
KafkaMetaAdder
post processor first and then the
AllowListValueProjector
post processor on the output.
post.processor.chain=com.mongodb.kafka.connect.sink.processor.KafkaMetaAdder,com.mongodb.kafka.connect.sink.processor.AllowListValueProjector
Prebuilt Post Processors
The following table contains a list of all the post processors included in the sink connector.
Post Processor Name | Description | |
---|---|---|
DocumentIdAdder | Full Path:
Inserts an _id field determined by the configured strategy.The default strategy is BsonOidStrategy .For information on strategy options and configuration, see the
Configure the Document Id Adder Post Processor
section. | |
BlockListKeyProjector | Full Path:
Removes matching key fields from the sink record. For more information on configuration, see the
Allow List and Block List Examples. | |
BlockListValueProjector | Full Path:
Removes matching value fields from the sink record. For more information on configuration, see the
Allow List and Block List Examples. | |
AllowListKeyProjector | Full Path:
Includes only matching key fields from the sink record. For more information on configuration, see the
Allow List and Block List Examples. | |
AllowListValueProjector | Full Path:
Includes only matching value fields from the sink record. For more information on configuration, see the
Allow List and Block List Examples. | |
KafkaMetaAdder | Full Path:
Adds a field named "topic-partition-offset" and sets the value
to the concatenation of Kafka topic, partition, and offset to the
document. | |
RenameByMapping | Full Path:
Renames fields that are an exact match to a specified field name in
the key or value document. For information on configuration, see the
Renaming by Mapping Example. | |
RenameByRegex | Full Path:
Renames fields that match a regular expression in the key or
value document. For information on configuration, see the
Renaming by Regular Expression Example. |
Configure the Document Id Adder Post Processor
The DocumentIdAdder
post processor uses a strategy to determine how it
should format the _id
field in the MongoDB document. A strategy defines
preset behavior that you can customize for your use case.
You can specify a strategy for this post processor in the
document.id.strategy
setting as shown in the following example:
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.UuidStrategy
The following table shows a list of the strategies you can use to
configure the DocumentIdAdder
post processor:
Strategy Name | Description | |
---|---|---|
BsonOidStrategy | Full Path:
Generates a MongoDB BSON ObjectId. Default strategy for the DocumentIdAdder post processor. | |
KafkaMetaDataStrategy | Full Path:
Builds a string composed of the concatenation of Kafka topic,
partition, and offset. | |
FullKeyStrategy | Full Path:
Uses the complete key structure of the sink document to generate the
value for the _id field.Defaults to a blank document if no key exists. | |
ProvidedInKeyStrategy | Full Path:
Uses the _id field specified in the key structure of the sink
document.Throws an exception if the field is missing from the sink document. | |
ProvidedInValueStrategy | Full Path:
Uses the _id field specified in the value structure of the
sink document.Throws an exception if the field is missing from the sink document. | |
PartialKeyStrategy | Full Path:
Uses a block list or allow list projection of the sink document key
structure. Defaults to a blank document if no key exists. | |
PartialValueStrategy | Full Path:
Uses a block list or allow list projection of the sink document
value structure. Defaults to a blank document if no value exists. | |
UuidProvidedInKeyStrategy | Full Path:
Converts the _id key field to a UUID. The value must be either a
string or binary type and must conform to the
UUID format. | |
UuidProvidedInValueStrategy | Full Path:
Converts the _id value field to a UUID. The value must be either a
string or binary type and must conform to the
UUID format. | |
UuidStrategy | Full Path:
Uses a randomly generated UUID in string format. |
Create a Custom Document Id Strategy
If the built-in document id adder strategies do not cover your use case, you can define a custom document id strategy by following the steps below:
Create a Java class that implements the interface IdStrategy and contains your custom configuration logic.
Compile the class to a JAR file.
Add the compiled JAR to the class path / plugin path for all your Kafka workers. For more information about plugin paths, see the Confluent documentation.
Update the
document.id.strategy
setting to the full class name of your custom class in all your Kafka workers.
Note
Selected strategy may have implications on delivery semantics
BSON ObjectId or UUID strategies can only guarantee at-least-once delivery since the connector generates new ids on retries or when processing records again. Other strategies permit exactly-once delivery if you can guarantee the fields that form the document id are unique.
For example implementations of the IdStrategy
interface, see the
source code directory that contains
id strategy implementations
packaged with the connector.
Post Processor Examples
This section shows examples of configuration and sample output of the following types of post processors:
Allow List and Block List Examples
The allow list and block list projector post processors determine which fields to include and exclude from the output.
When you use the allow list projector, the post processor only outputs data from the fields that you specify.
When you use the block list projector, the post process only omits data from the fields that you specify.
Note
You can use the "." (dot) notation to reference nested fields in the record. You can also use the notation to reference fields of documents in an array.
When you add a projector to your post processor chain, you must specify the projector type and whether to apply it to the key or value portion of the sink document.
See the following sections for example projector configurations and output.
Allow List Projector Example
Suppose your Kafka record value documents resembled the following user profile data:
{ "name": "Sally Kimball", "age": 10, "address": { "city": "Idaville", "country": "USA" }, "hobbies": [ "reading", "solving crime" ] }
You can configure the AllowList
value projector to store select data
such as the "name", "address.city", and "hobbies" fields from your value
documents using the following settings:
post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector value.projection.type=AllowList value.projection.list=name,address.city,hobbies
After the post processor applies the projection, it outputs the following record:
{ "name": "Sally Kimball", "address": { "city": "Idaville" }, "hobbies": [ "reading", "solving crime" ] }
Block List Projector Example
Suppose your Kafka record key documents resembled the following user identification data:
{ "username": "user5983", "registration": { "date": "2021-09-13", "source": "mobile" }, "authToken": { "alg": "HS256", "type": "JWT", "payload": "zI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODk" } }
You can configure the BlockList
key projector to omit the "authToken"
and "registration.source" fields before storing the data with the
following settings:
post.processor.chain=com.mongodb.kafka.connect.sink.processor.BlockListKeyProjector key.projection.type=BlockList key.projection.list=authToken,registration.source
After the post processor applies the projection, it outputs the following record:
{ "username": "user5983", "registration": { "date": "2021-09-13", } }
Projection Wildcard Pattern Matching Examples
This section shows how you can configure the projector post processors to match wildcard patterns to match field names.
Pattern | Description |
* | Matches any number of characters in the current level. |
** | Matches any characters in the current level and all nested levels. |
For the allow list and block list wildcard pattern matching examples in this section, refer to the following value document that contains weather measurements:
{ "city": "Springfield", "temperature": { "high": 28, "low": 24, "units": "C" }, "wind_speed_10m": { "average": 3, "units": "km/h" }, "wind_speed_80m": { "average": 8, "units": "km/h" }, "soil_conditions": { "temperature": { "high": 22, "low": 17, "units": "C" }, "moisture": { "average": 340, "units": "mm" } } }
Allow List Wildcard Examples
You can use the *
wildcard to match multiple field names. The following
example configuration matches the following fields:
The top-level field named "city"
The fields named "average" that are subdocuments of any top-level field that starts with the name "wind_speed".
post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector value.projection.type=AllowList value.projection.list=city,wind_speed*.average
After the post processor applies the allow list projection, it outputs the following record:
{ "city": "Springfield", "wind_speed_10m": { "average": 3, }, "wind_speed_80m": { "average": 8, } }
You can use the **
wildcard which matches objects at any level
starting from the one at which you specify the wildcard. The following
wildcard matching example projects any document that contains the field
named "low".
post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector value.projection.type=AllowList value.projection.list=**.low
The post processor that applies the projection outputs the following record:
{ "temperature": { "high": 28, "low": 24, "units": "C" }, "soil_conditions": { "temperature": { "high": 22, "low": 17, "units": "C" } } }
Block List Wildcard Example
You can use the wildcard patterns to match fields at a specific document level as shown in the following block list configuration example:
post.processor.chain=com.mongodb.kafka.connect.sink.processor.BlockListValueProjector value.projection.type=BlockList value.projection.list=*.*.temperature
{ "city": "Springfield", "temperature": { "high": 28, "low": 24, "units": "C" }, "wind_speed_10m": { "average": 3, "units": "km/h" }, "wind_speed_80m": { "average": 8, "units": "km/h" }, "soil_conditions": { "moisture": { "average": 340, "units": "mm" } } }
Field Renaming Examples
This section shows how you can configure the RenameByMapping
and RenameByRegex
field renamer post processors to update field names
in a sink record. The field renaming settings specify the following:
Whether to update the key or value document in the record
The field names to update
The new field names
You must specify RenameByMapping
and RenameByRegex
settings in a
JSON array. You can specify nested fields by using either dot notation or
pattern matching.
The field renamer post processor examples use the following example sink record:
Key Document
{ "location": "Provence", "date_month": "October", "date_day": 17 }
Value Document
{ "flapjacks": { "purchased": 598, "size": "large" } }
Rename by Mapping Example
The RenameByMapping
post processor setting specifies one or more
JSON objects that assign fields matching a string to a new name. Each
object contains the text to match in the oldName
element and the
replacement text in the newName
element as described in the table
below.
Key Name | Description |
---|---|
oldName | Specifies whether to match fields in the key or value document and
the field name to replace. The setting uses a "." character to
separate the two values. |
newName | Specifies the replacement field name for all matches of the field. |
The following example property matches the "location" field of a key document and renames it to "country":
field.renamer.mapping=[{"oldName":"key.location", "newName":"country"}]
This setting instructs the RenameByMapping
post processor to transform
the original key document to the
following document:
{ "country": "Provence", "date_month": "October", "date_day": 17 }
You can perform a similar field name assignment for value documents by
specifying the value document with the appended field name in the oldName
field as follows:
field.renamer.mapping=[{"oldName":"value.flapjacks", "newName":"crepes"}]
This setting instructs the RenameByMapping
post processor to transform
the original value document to the
following document:
{ "crepes": { "purchased": 598, "size": "large" } }
You can also specify one or more mappings in the field.renamer.mapping
property by using a JSON array in string format as shown in the following
setting:
field.renamer.mapping=[{ "oldName":"key.location", "newName":"city" }, { "oldName":"value.crepes", "newName":"flapjacks" }]
Rename by Regular Expression
The RenameByRegex
post processor setting specifies the field names and
text patterns that it should match, and replacement values for the matched
text. You can specify one or more renaming expressions in JSON objects
containing the fields described in the following table:
Key Name | Description |
---|---|
regexp | Contains a regular expression that matches fields to perform the
replacement. |
pattern | Contains a regular expression that matches on the text to replace. |
replace | Contains the replacement text for all matches of the regular expression
you defined in the pattern field. |
The following example setting instructs the post processor to perform the following:
Match any field names in the key document that start with "date". In the set of matching fields, replace all text that matches the pattern
_
with the-
character.Match any field names in the value document that are subdocuments of
crepes
. In the set of matching fields, replace all text that matches the patternpurchased
withquantity
.
field.renamer.regexp=[{"regexp":"^key\\.date.*$","pattern":"_","replace":"-"}, {"regexp":"^value\\.crepes\\..*","pattern":"purchased","replace":"quantity"}]
When the connector applies the post processor to the example key document and the example value document, it outputs the following:
Key Document
{ "location": "Provence", "date-month": "October", "date-day": 17 }
Value Document
{ "crepes": { "quantity": 598, "size": "large" } }
Warning
The renamer post processors do not overwrite existing field names
The target field names you set in your renamer post processors to may result in duplicate field names in the same document. To avoid this, the post processor skips renaming when it would duplicate an existing field name at the same level of the document.
How to Create a Custom Post Processor
If the built-in post processors do not cover your use case, you can create a custom post processor class using the following steps:
Create a Java class that extends the PostProcessor abstract class.
Override the
process()
method in your class. You can update theSinkDocument
, a BSON representation of the sink record key and value fields, and access the original KafkaSinkRecord
in your method.Compile the class to a JAR file.
Add the compiled JAR to the class path / plugin path for all your Kafka workers. For more information about plugin paths, see the Confluent documentation on Manually Installing Community Connectors.
Add your post processor full class name to the post processor chain configuration.
For example post processors, you can browse the source code for the built-in post processor classes.