Docs Menu
Docs Home
/
MongoDB Kafka Connector
/

Converters

On this page

  • Overview
  • Available Converters
  • Converters with Schemas
  • Connector Configuration
  • Avro Converter
  • Protobuf Converter
  • JSON Schema Converter
  • JSON Converter
  • String Converter (Raw JSON)

This guide describes how to use converters with the MongoDB Kafka Connector. Converters are programs that translate between bytes and Kafka Connect's runtime data format.

Converters pass data between Kafka Connect and Apache Kafka. The connector passes data between MongoDB and Kafka Connect. The following diagram shows these relationships:

Diagram illustrating converters' role in Kafka Connect

To learn more about converters, see the following resources:

  • Article from Confluent.

  • Confluent Article on Kafka Connect Concepts

  • Converter Interface API Documentation

As the connector converts your MongoDB data into Kafka Connect's runtime data format, the connector works with all available converters.

Important

Use the Same Converter for your Source and Sink Connectors

You must use the same converter in your MongoDB Kafka source connector and MongoDB Kafka sink connector. For example, if your source connector writes to a topic using Protobuf, your sink connector must use Protobuf to read from the topic.

To learn what converter to use, see this page from Confluent.

If you use a schema-based converter such as the Kafka Connect Avro Converter (Avro Converter), Kafka Connect Protobuf Converter, or Kafka Connect JSON Schema Converter, you should define a schema in your source connector.

To learn how to specify a schema, see the Apply Schemas guide.

This section provides templates for properties files to configure the following converters in a connector pipeline:

Click the following tabs to view properties files that work with the Avro converter:

The following properties file defines a source connector. This connector uses the default schema and an Avro converter to write to an Apache Kafka topic:

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
connection.uri=<your mongodb uri>
database=<your database to read from>
collection=<your collection to read from>
output.format.value=schema
output.format.key=schema
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=<your schema registry uri>
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=<your schema registry uri>

Important

Avro Converter with a MongoDB Data Source

Avro converters are a great fit for data with a static structure but are not a good fit for dynamic or changing data. MongoDB's schemaless document model supports dynamic data, so ensure your MongoDB data source has a static structure before specifying an Avro converter.

The following properties file defines a sink connector. This connector uses an Avro converter to read from an Apache Kafka topic:

connector.class=com.mongodb.kafka.connect.MongoSinkConnector
connection.uri=<your mongodb uri>
database=<your database to write to>
collection=<your collection to write to>
topics=<your topic to read from>
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=<your schema registry uri>
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=<your schema registry uri>

To use the preceding properties file, replace the placeholder text in angle brackets with your information.

Click the following tabs to view properties files that work with the Protobuf converter:

The following properties file defines a source connector. This connector uses the default schema and a Protobuf converter to write to an Apache Kafka topic:

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
connection.uri=<your mongodb uri>
database=<your database to read from>
collection=<your collection to read from>
output.format.value=schema
output.format.key=schema
key.converter=io.confluent.connect.protobuf.ProtobufConverter
key.converter.schema.registry.url=<your schema registry uri>
value.converter=io.confluent.connect.protobuf.ProtobufConverter
value.converter.schema.registry.url=<your schema registry uri>

The following properties file defines a sink connector. This connector uses a Protobuf converter to read from an Apache Kafka topic:

connector.class=com.mongodb.kafka.connect.MongoSinkConnector
connection.uri=<your mongodb uri>
database=<your database to write to>
collection=<your collection to write to>
topics=<your topic to read from>
key.converter=io.confluent.connect.protobuf.ProtobufConverter
key.converter.schema.registry.url=<your schema registry uri>
value.converter=io.confluent.connect.protobuf.ProtobufConverter
value.converter.schema.registry.url=<your schema registry uri>

To use the preceding properties file, replace the placeholder text in angle brackets with your information.

Click the following tabs to view properties files that work with the JSON Schema converter:

The following properties files configure your connector to manage JSON Schemas using Confluent Schema Registry:

The following properties file defines a source connector. This connector uses the default schema and a JSON Schema converter to write to an Apache Kafka topic:

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
connection.uri=<your mongodb uri>
database=<your database to read from>
collection=<your collection to read from>
output.format.value=schema
output.format.key=schema
key.converter=io.confluent.connect.json.JsonSchemaConverter
key.converter.schema.registry.url=<your schema registry uri>
value.converter=io.confluent.connect.json.JsonSchemaConverter
value.converter.schema.registry.url=<your schema registry uri>

The following properties file defines a sink connector. This connector uses a JSON Schema converter to read from an Apache Kafka topic:

connector.class=com.mongodb.kafka.connect.MongoSinkConnector
connection.uri=<your mongodb uri>
database=<your database to write to>
collection=<your collection to write to>
topics=<your topic to read from>
key.converter=io.confluent.connect.json.JsonSchemaConverter
key.converter.schema.registry.url=<your schema registry uri>
value.converter=io.confluent.connect.json.JsonSchemaConverter
value.converter.schema.registry.url=<your schema registry uri>

The following properties files configure your connector to embed JSON Schemas in messages:

Important

Increased Message Size

Embedding a JSON Schema in your message increases the size of your message. To decrease the size of your messages while using JSON Schema, use Schema Registry.

The following properties file defines a source connector. This connector uses the default schema and a JSON Schema converter to write to an Apache Kafka topic:

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
connection.uri=<your mongodb uri>
database=<your database to read from>
collection=<your collection to read from>
output.format.value=schema
output.format.key=schema
output.schema.infer.value=true
key.converter.schemas.enable=true
value.converter.schemas.enable=true
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

The following properties file defines a sink connector. This connector uses a JSON Schema converter to read from an Apache Kafka topic:

connector.class=com.mongodb.kafka.connect.MongoSinkConnector
connection.uri=<your mongodb uri>
database=<your database to write to>
collection=<your collection to write to>
topics=<your topic to read from>
key.converter.schemas.enable=true
value.converter.schemas.enable=true
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

To use the preceding properties file, replace the placeholder text in angle brackets with your information.

Click the following tabs to view properties files that work with the JSON converter:

The following properties file defines a source connector. This connector uses a JSON converter to write to an Apache Kafka topic:

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
connection.uri=<your mongodb uri>
database=<your database to read from>
collection=<your collection to read from>
output.format.value=json
output.format.key=json
key.converter.schemas.enable=false
value.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

The following properties file defines a sink connector. This connector uses a JSON converter to read from an Apache Kafka topic:

connector.class=com.mongodb.kafka.connect.MongoSinkConnector
connection.uri=<your mongodb uri>
database=<your database to write to>
collection=<your collection to write to>
topics=<your topic to read from>
key.converter.schemas.enable=false
value.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

To use the preceding properties file, replace the placeholder text in angle brackets with your information.

Click the following tabs to view properties files that work with the String converter:

The following properties file defines a source connector. This connector uses a String converter to write to an Apache Kafka topic:

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
connection.uri=<your mongodb uri>
database=<your database to read from>
collection=<your collection to read from>
output.format.value=json
output.format.key=json
key.converter.schemas.enable=false
value.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

The following properties file defines a sink connector. This connector uses a String converter to read from an Apache Kafka topic:

connector.class=com.mongodb.kafka.connect.MongoSinkConnector
connection.uri=<your mongodb uri>
database=<your database to write to>
collection=<your collection to write to>
topics=<your topic to read from>
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

Important

Received Strings Must be Valid JSON

Your sink connector must receive valid JSON strings from your Apache Kafka topic even when using a String converter.

To use the preceding properties file, replace the placeholder text in angle brackets with your information.

Back

Data Formats