Write Model Strategies
On this page
Overview
This guide shows you how to change the way your MongoDB Kafka sink connector writes data to MongoDB.
You can change how your connector writes data to MongoDB for use cases including the following:
Insert documents instead of upserting them
Replace or update documents that match a filter other than the
_id
fieldDelete documents that match a filter
You can configure how your connector writes data to MongoDB by specifying a write model strategy. A write model strategy is a class that defines how your sink connector should write data using write models. A write model is a MongoDB Java driver interface that defines the structure of a write operation.
To learn how to modify the sink records your connector receives before your connector writes them to MongoDB, read the guide on Sink Connector Post Processors.
To see a write model strategy implementation, see the source code of the InsertOneDefaultStrategy class.
Bulk Write Operations
The sink connector writes data to MongoDB using bulk write operations. Bulk writes group multiple write operations, such as inserts, updates, or deletes, together.
By default, the sink connector performs ordered bulk writes, which guarantee the order of data changes. In an ordered bulk write, if any write operation results in an error, the connector skips the remaining writes in that batch.
If you don't need to guarantee the order of data changes, you can
set the bulk.write.ordered
setting to false
so that the
connector performs unordered bulk writes. The sink connector performs
unordered bulk writes in parallel, which can improve performance.
In addition, when you enable unordered bulk writes and set the
errors.tolerance
setting to all
, even if any write
operation in your bulk write fails, the connector continues to
perform the remaining write operations in the batch that do not return
errors.
Tip
To learn more about the bulk.write.ordered
setting, see the
Connector Message Processing Properties.
To learn more about bulk write operations, see the following documentation:
How to Specify Write Model Strategies
To specify a write model strategy, use the following setting:
writemodel.strategy=<write model strategy classname>
For a list of the pre-built write model strategies included in the connector, see the guide on write model strategy configurations.
Specify a Business Key
A business key is a value composed of one or more fields in your sink record
that identifies it as unique. By default, the sink connector uses the _id
field of the sink record to retrieve the business key. To specify a
different business key, configure the Document Id Adder post processor to use
a custom value.
You can configure the Document Id Adder to set the _id
field from the
sink record key as shown in the following example properties:
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialKeyStrategy document.id.strategy.partial.key.projection.list=<comma-separated field names> document.id.strategy.partial.key.projection.type=AllowList
Alternatively, you can configure it to set the _id
field from the sink
record value as shown in the following example properties:
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy document.id.strategy.partial.value.projection.list=<comma-separated field names> document.id.strategy.partial.value.projection.type=AllowList
Important
Improve Write Performance
Create a unique index in your target collection that corresponds to the fields of your business key. This improves the performance of write operations from your sink connector. See the guide on unique indexes for more information.
The following write model strategies require a business key:
ReplaceOneBusinessKeyStrategy
DeleteOneBusinessKeyStrategy
UpdateOneBusinessKeyTimestampStrategy
For more information on the Document Id Adder post processor, see Configure the Document Id Adder Post Processor.
Examples
This section shows examples of configuration and output of the following write model strategies:
Update One Timestamps Strategy
You can configure the Update One Timestamps strategy to add and update timestamps when writing documents to MongoDB. This strategy performs the following actions:
When the connector inserts a new MongoDB document, it sets the
_insertedTS
and_modifiedTS
fields to the current time on the connector's server.When the connector updates an existing MongoDB document, it updates the
_modifiedTS
field to the current time on the connector's server.
Suppose you want to track the position of a train along a route, and your sink connector receives messages with the following structure:
{ "_id": "MN-1234", "start": "Beacon", "destination": "Grand Central" "position": [ 40, -73 ] }
Use the ProvidedInValueStrategy
to specify that your connector should use
the _id
value of the message to assign the _id
field in your MongoDB
document. Specify your id and write model strategy properties as follows:
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy
After your sink connector processes the preceding example record, it inserts a
document that contains the _insertedTS
and _modifiedTS
fields as shown
in the following document:
{ "_id": "MN-1234", "_insertedTS": ISODate("2021-09-20T15:08:000Z"), "_modifiedTS": ISODate("2021-09-20T15:08:000Z"), "start": "Beacon", "destination": "Grand Central" "position": [ 40, -73 ] }
After one hour, the train reports its new location along its route with a new position as shown in the following record:
{ "_id": "MN-1234", "start": "Beacon", "destination": "Grand Central" "position": [ 42, -75 ] }
Once your sink connector processes the preceding record, it inserts a document that contains the following data:
{ "_id": "MN-1234", "_insertedTS": ISODate("2021-09-20T15:08:000Z"), "_modifiedTS": ISODate("2021-09-20T16:08:000Z"), "start": "Beacon", "destination": "Grand Central" "position": [ 42, -75 ] }
For more information on the ProvidedInValueStrategy
, see the section
on how to Configure the Document Id Adder Post Processor.
Replace One Business Key Strategy
You can configure the Replace One Business Key strategy to replace documents that match the value of the business key. To define a business key on multiple fields of a record and configure the connector to replace documents that contain matching business keys, perform the following tasks:
Create a unique index in your collection that corresponds to your business key fields.
Specify the
PartialValueStrategy
id strategy to identify the fields that belong to the business key in the connector configuration.Specify the
ReplaceOneBusinessKeyStrategy
write model strategy in the connector configuration.
Suppose you want to track airplane capacity by the flight number and airport
location represented by flight_no
and airport_code
, respectively. An
example message contains the following information:
{ "flight_no": "Z342", "airport_code": "LAX", "seats": { "capacity": 180, "occupied": 152 } }
To implement the strategy, using flight_no
and airport_code
as the
business key, first create a unique index on these fields in the MongoDB
shell:
db.collection.createIndex({ "flight_no": 1, "airport_code": 1}, { unique: true })
Next, specify the PartialValueStrategy
strategy and business key
fields in the a projection list. Specify the id and write model strategy
configuration as follows:
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy document.id.strategy.partial.value.projection.list=flight_no,airport_code document.id.strategy.partial.value.projection.type=AllowList writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy
The sample data inserted into the collection contains the following:
{ "flight_no": "Z342" "airport_code": "LAX", "seats": { "capacity": 180, "occupied": 152 } }
When the connector processes sink data that matches the business key of the existing document, it replaces the document with the new values without changing the business key fields:
{ "flight_no": "Z342" "airport_code": "LAX", "status": "canceled" }
After the connector processes the sink data, it replaces the original sample document in MongoDB with the preceding one.
Delete One Business Key Strategy
You can configure the connector to remove a document when it receives messages that match a business key using the Delete One Business Key strategy. To set a business key from multiple fields of a record and configure the connector to delete a document that contains a matching business key, perform the following tasks:
Create a unique index in your MongoDB collection that corresponds to your business key fields.
Specify the
PartialValueStrategy
as the id strategy to identify the fields that belong to the business key in the connector configuration.Specify the
DeleteOneBusinessKeyStrategy
write model strategy in the connector configuration.
Suppose you want to delete a calendar event from a specific year from a collection that contains a document that resembles the following:
{ "year": 2005, "month": 3, "day": 15, "event": "Dentist Appointment" }
To implement the strategy, using year
as the business key, first create
a unique index on these fields in the MongoDB shell:
db.collection.createIndex({ "year": 1 }, { unique: true })
Next, specify your business key and write model strategy in your configuration as follows:
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy document.id.strategy.partial.value.projection.list=year document.id.strategy.partial.value.projection.type=AllowList writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteOneBusinessKeyStrategy
If your connector processes a sink record that contains the business key
year
, it deletes the first document with a matching field value
returned by MongoDB. Suppose your connector processes a sink record that
contains the following value data:
{ "year": 2005, ... }
When the connector processes the preceding record, it deletes the first
document from the collection that contains a year
field with a value of
"2005" such as the original
"Dentist Appointment" sample document.
Custom Write Model Strategies
If none of the write model strategies included with the connector fit your use case, you can create your own.
A write model strategy is a Java class that implements the
WriteModelStrategy
interface and must override the createWriteModel()
method.
See the source code for the WriteModelStrategy interface for the required method signature.
Sample Write Model Strategy
The following custom write model strategy returns a write operation that
replaces a MongoDB document that matches the _id
field of your sink
record with the value of the fullDocument
field of your sink record:
/** * Custom write model strategy * * This class reads the 'fullDocument' field from a change stream and * returns a ReplaceOne operation. */ public class CustomWriteModelStrategy implements WriteModelStrategy { private static String ID = "_id"; public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) { BsonDocument changeStreamDocument = document.getValueDoc() .orElseThrow(() -> new DataException("Missing value document")); BsonDocument fullDocument = changeStreamDocument.getDocument("fullDocument", new BsonDocument()); if (fullDocument.isEmpty()) { return null; // Return null to indicate no op. } return new ReplaceOneModel<>(Filters.eq(ID, fullDocument.get(ID)), fullDocument); } }
For another example of a custom write model strategy, see the UpsertAsPartOfDocumentStrategy example strategy on GitHub.
How to Install Your Strategy
To configure your sink connector to use a custom write strategy, you must complete the following actions:
Compile the custom write strategy class to a JAR file.
Add the compiled JAR to the classpath/plugin path for your Kafka workers. For more information about plugin paths, see the Confluent documentation.
Note
Kafka Connect loads plugins in isolation. When you deploy a custom write strategy, both the connector JAR and the write model strategy JAR must be on the same path. Your paths should resemble the following:
<plugin.path>/mongo-kafka-connect/mongo-kafka-connect-all.jar
<plugin.path>/mongo-kafka-connect/custom-write-model-strategy.jar
To learn more about Kafka Connect plugins, see this guide from Confluent.
Specify your custom class in the writemodel.strategy configuration setting.
To learn how to compile a class to a JAR file, see the JAR deployment guide from the Java SE documentation.