Docs Menu
Docs Home
/
MongoDB Kafka Connector
/ /

Write Model Strategies

On this page

  • Overview
  • Bulk Write Operations
  • How to Specify Write Model Strategies
  • Specify a Business Key
  • Examples
  • Update One Timestamps Strategy
  • Replace One Business Key Strategy
  • Delete One Business Key Strategy
  • Custom Write Model Strategies
  • Sample Write Model Strategy
  • How to Install Your Strategy

This guide shows you how to change the way your MongoDB Kafka sink connector writes data to MongoDB.

You can change how your connector writes data to MongoDB for use cases including the following:

  • Insert documents instead of upserting them

  • Replace or update documents that match a filter other than the _id field

  • Delete documents that match a filter

You can configure how your connector writes data to MongoDB by specifying a write model strategy. A write model strategy is a class that defines how your sink connector should write data using write models. A write model is a MongoDB Java driver interface that defines the structure of a write operation.

To learn how to modify the sink records your connector receives before your connector writes them to MongoDB, read the guide on Sink Connector Post Processors.

To see a write model strategy implementation, see the source code of the InsertOneDefaultStrategy class.

The sink connector writes data to MongoDB using bulk write operations. Bulk writes group multiple write operations, such as inserts, updates, or deletes, together.

By default, the sink connector performs ordered bulk writes, which guarantee the order of data changes. In an ordered bulk write, if any write operation results in an error, the connector skips the remaining writes in that batch.

If you don't need to guarantee the order of data changes, you can set the bulk.write.ordered setting to false so that the connector performs unordered bulk writes. The sink connector performs unordered bulk writes in parallel, which can improve performance.

In addition, when you enable unordered bulk writes and set the errors.tolerance setting to all, even if any write operation in your bulk write fails, the connector continues to perform the remaining write operations in the batch that do not return errors.

Tip

To learn more about the bulk.write.ordered setting, see the Connector Message Processing Properties.

To learn more about bulk write operations, see the following documentation:

To specify a write model strategy, use the following setting:

writemodel.strategy=<write model strategy classname>

For a list of the pre-built write model strategies included in the connector, see the guide on write model strategy configurations.

A business key is a value composed of one or more fields in your sink record that identifies it as unique. By default, the sink connector uses the _id field of the sink record to retrieve the business key. To specify a different business key, configure the Document Id Adder post processor to use a custom value.

You can configure the Document Id Adder to set the _id field from the sink record key as shown in the following example properties:

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialKeyStrategy
document.id.strategy.partial.key.projection.list=<comma-separated field names>
document.id.strategy.partial.key.projection.type=AllowList

Alternatively, you can configure it to set the _id field from the sink record value as shown in the following example properties:

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
document.id.strategy.partial.value.projection.list=<comma-separated field names>
document.id.strategy.partial.value.projection.type=AllowList

Important

Improve Write Performance

Create a unique index in your target collection that corresponds to the fields of your business key. This improves the performance of write operations from your sink connector. See the guide on unique indexes for more information.

The following write model strategies require a business key:

  • ReplaceOneBusinessKeyStrategy

  • DeleteOneBusinessKeyStrategy

  • UpdateOneBusinessKeyTimestampStrategy

For more information on the Document Id Adder post processor, see Configure the Document Id Adder Post Processor.

This section shows examples of configuration and output of the following write model strategies:

You can configure the Update One Timestamps strategy to add and update timestamps when writing documents to MongoDB. This strategy performs the following actions:

  • When the connector inserts a new MongoDB document, it sets the _insertedTS and _modifiedTS fields to the current time on the connector's server.

  • When the connector updates an existing MongoDB document, it updates the _modifiedTS field to the current time on the connector's server.

Suppose you want to track the position of a train along a route, and your sink connector receives messages with the following structure:

{
"_id": "MN-1234",
"start": "Beacon",
"destination": "Grand Central"
"position": [ 40, -73 ]
}

Use the ProvidedInValueStrategy to specify that your connector should use the _id value of the message to assign the _id field in your MongoDB document. Specify your id and write model strategy properties as follows:

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy

After your sink connector processes the preceding example record, it inserts a document that contains the _insertedTS and _modifiedTS fields as shown in the following document:

{
"_id": "MN-1234",
"_insertedTS": ISODate("2021-09-20T15:08:000Z"),
"_modifiedTS": ISODate("2021-09-20T15:08:000Z"),
"start": "Beacon",
"destination": "Grand Central"
"position": [ 40, -73 ]
}

After one hour, the train reports its new location along its route with a new position as shown in the following record:

{
"_id": "MN-1234",
"start": "Beacon",
"destination": "Grand Central"
"position": [ 42, -75 ]
}

Once your sink connector processes the preceding record, it inserts a document that contains the following data:

{
"_id": "MN-1234",
"_insertedTS": ISODate("2021-09-20T15:08:000Z"),
"_modifiedTS": ISODate("2021-09-20T16:08:000Z"),
"start": "Beacon",
"destination": "Grand Central"
"position": [ 42, -75 ]
}

For more information on the ProvidedInValueStrategy, see the section on how to Configure the Document Id Adder Post Processor.

You can configure the Replace One Business Key strategy to replace documents that match the value of the business key. To define a business key on multiple fields of a record and configure the connector to replace documents that contain matching business keys, perform the following tasks:

  1. Create a unique index in your collection that corresponds to your business key fields.

  2. Specify the PartialValueStrategy id strategy to identify the fields that belong to the business key in the connector configuration.

  3. Specify the ReplaceOneBusinessKeyStrategy write model strategy in the connector configuration.

Suppose you want to track airplane capacity by the flight number and airport location represented by flight_no and airport_code, respectively. An example message contains the following information:

{
"flight_no": "Z342",
"airport_code": "LAX",
"seats": {
"capacity": 180,
"occupied": 152
}
}

To implement the strategy, using flight_no and airport_code as the business key, first create a unique index on these fields in the MongoDB shell:

db.collection.createIndex({ "flight_no": 1, "airport_code": 1}, { unique: true })

Next, specify the PartialValueStrategy strategy and business key fields in the a projection list. Specify the id and write model strategy configuration as follows:

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
document.id.strategy.partial.value.projection.list=flight_no,airport_code
document.id.strategy.partial.value.projection.type=AllowList
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy

The sample data inserted into the collection contains the following:

{
"flight_no": "Z342"
"airport_code": "LAX",
"seats": {
"capacity": 180,
"occupied": 152
}
}

When the connector processes sink data that matches the business key of the existing document, it replaces the document with the new values without changing the business key fields:

{
"flight_no": "Z342"
"airport_code": "LAX",
"status": "canceled"
}

After the connector processes the sink data, it replaces the original sample document in MongoDB with the preceding one.

You can configure the connector to remove a document when it receives messages that match a business key using the Delete One Business Key strategy. To set a business key from multiple fields of a record and configure the connector to delete a document that contains a matching business key, perform the following tasks:

  1. Create a unique index in your MongoDB collection that corresponds to your business key fields.

  2. Specify the PartialValueStrategy as the id strategy to identify the fields that belong to the business key in the connector configuration.

  3. Specify the DeleteOneBusinessKeyStrategy write model strategy in the connector configuration.

Suppose you want to delete a calendar event from a specific year from a collection that contains a document that resembles the following:

{
"year": 2005,
"month": 3,
"day": 15,
"event": "Dentist Appointment"
}

To implement the strategy, using year as the business key, first create a unique index on these fields in the MongoDB shell:

db.collection.createIndex({ "year": 1 }, { unique: true })

Next, specify your business key and write model strategy in your configuration as follows:

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
document.id.strategy.partial.value.projection.list=year
document.id.strategy.partial.value.projection.type=AllowList
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteOneBusinessKeyStrategy

If your connector processes a sink record that contains the business key year, it deletes the first document with a matching field value returned by MongoDB. Suppose your connector processes a sink record that contains the following value data:

{
"year": 2005,
...
}

When the connector processes the preceding record, it deletes the first document from the collection that contains a year field with a value of "2005" such as the original "Dentist Appointment" sample document.

If none of the write model strategies included with the connector fit your use case, you can create your own.

A write model strategy is a Java class that implements the WriteModelStrategy interface and must override the createWriteModel() method.

See the source code for the WriteModelStrategy interface for the required method signature.

The following custom write model strategy returns a write operation that replaces a MongoDB document that matches the _id field of your sink record with the value of the fullDocument field of your sink record:

/**
* Custom write model strategy
*
* This class reads the 'fullDocument' field from a change stream and
* returns a ReplaceOne operation.
*/
public class CustomWriteModelStrategy implements WriteModelStrategy {
private static String ID = "_id";
@Override
public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {
BsonDocument changeStreamDocument = document.getValueDoc()
.orElseThrow(() -> new DataException("Missing value document"));
BsonDocument fullDocument = changeStreamDocument.getDocument("fullDocument", new BsonDocument());
if (fullDocument.isEmpty()) {
return null; // Return null to indicate no op.
}
return new ReplaceOneModel<>(Filters.eq(ID, fullDocument.get(ID)), fullDocument);
}
}

For another example of a custom write model strategy, see the UpsertAsPartOfDocumentStrategy example strategy on GitHub.

To configure your sink connector to use a custom write strategy, you must complete the following actions:

  1. Compile the custom write strategy class to a JAR file.

  2. Add the compiled JAR to the classpath/plugin path for your Kafka workers. For more information about plugin paths, see the Confluent documentation.

    Note

    Kafka Connect loads plugins in isolation. When you deploy a custom write strategy, both the connector JAR and the write model strategy JAR must be on the same path. Your paths should resemble the following:

    <plugin.path>/mongo-kafka-connect/mongo-kafka-connect-all.jar
    <plugin.path>/mongo-kafka-connect/custom-write-model-strategy.jar

    To learn more about Kafka Connect plugins, see this guide from Confluent.

  3. Specify your custom class in the writemodel.strategy configuration setting.

To learn how to compile a class to a JAR file, see the JAR deployment guide from the Java SE documentation.

Back

Fundamentals