Converters
On this page
Overview
This guide describes how to use converters with the MongoDB Kafka Connector. Converters are programs that translate between bytes and Kafka Connect's runtime data format.
Converters pass data between Kafka Connect and Apache Kafka. The connector passes data between MongoDB and Kafka Connect. The following diagram shows these relationships:
To learn more about converters, see the following resources:
Available Converters
As the connector converts your MongoDB data into Kafka Connect's runtime data format, the connector works with all available converters.
Important
Use the Same Converter for your Source and Sink Connectors
You must use the same converter in your MongoDB Kafka source connector and MongoDB Kafka sink connector. For example, if your source connector writes to a topic using Protobuf, your sink connector must use Protobuf to read from the topic.
To learn what converter to use, see this page from Confluent.
Converters with Schemas
If you use a schema-based converter such as the Kafka Connect Avro Converter (Avro Converter), Kafka Connect Protobuf Converter, or Kafka Connect JSON Schema Converter, you should define a schema in your source connector.
To learn how to specify a schema, see the Apply Schemas guide.
Connector Configuration
This section provides templates for properties files to configure the following converters in a connector pipeline:
Avro Converter
Click the following tabs to view properties files that work with the Avro converter:
The following properties file defines a source connector. This connector uses the default schema and an Avro converter to write to an Apache Kafka topic:
connector.class=com.mongodb.kafka.connect.MongoSourceConnector connection.uri=<your mongodb uri> database=<your database to read from> collection=<your collection to read from> output.format.value=schema output.format.key=schema key.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url=<your schema registry uri> value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=<your schema registry uri>
Important
Avro Converter with a MongoDB Data Source
Avro converters are a great fit for data with a static structure but are not a good fit for dynamic or changing data. MongoDB's schemaless document model supports dynamic data, so ensure your MongoDB data source has a static structure before specifying an Avro converter.
The following properties file defines a sink connector. This connector uses an Avro converter to read from an Apache Kafka topic:
connector.class=com.mongodb.kafka.connect.MongoSinkConnector connection.uri=<your mongodb uri> database=<your database to write to> collection=<your collection to write to> topics=<your topic to read from> key.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url=<your schema registry uri> value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=<your schema registry uri>
To use the preceding properties file, replace the placeholder text in angle brackets with your information.
Protobuf Converter
Click the following tabs to view properties files that work with the Protobuf converter:
The following properties file defines a source connector. This connector uses the default schema and a Protobuf converter to write to an Apache Kafka topic:
connector.class=com.mongodb.kafka.connect.MongoSourceConnector connection.uri=<your mongodb uri> database=<your database to read from> collection=<your collection to read from> output.format.value=schema output.format.key=schema key.converter=io.confluent.connect.protobuf.ProtobufConverter key.converter.schema.registry.url=<your schema registry uri> value.converter=io.confluent.connect.protobuf.ProtobufConverter value.converter.schema.registry.url=<your schema registry uri>
The following properties file defines a sink connector. This connector uses a Protobuf converter to read from an Apache Kafka topic:
connector.class=com.mongodb.kafka.connect.MongoSinkConnector connection.uri=<your mongodb uri> database=<your database to write to> collection=<your collection to write to> topics=<your topic to read from> key.converter=io.confluent.connect.protobuf.ProtobufConverter key.converter.schema.registry.url=<your schema registry uri> value.converter=io.confluent.connect.protobuf.ProtobufConverter value.converter.schema.registry.url=<your schema registry uri>
To use the preceding properties file, replace the placeholder text in angle brackets with your information.
JSON Schema Converter
Click the following tabs to view properties files that work with the JSON Schema converter:
The following properties files configure your connector to manage JSON Schemas using Confluent Schema Registry:
The following properties file defines a source connector. This connector uses the default schema and a JSON Schema converter to write to an Apache Kafka topic:
connector.class=com.mongodb.kafka.connect.MongoSourceConnector connection.uri=<your mongodb uri> database=<your database to read from> collection=<your collection to read from> output.format.value=schema output.format.key=schema key.converter=io.confluent.connect.json.JsonSchemaConverter key.converter.schema.registry.url=<your schema registry uri> value.converter=io.confluent.connect.json.JsonSchemaConverter value.converter.schema.registry.url=<your schema registry uri>
The following properties file defines a sink connector. This connector uses a JSON Schema converter to read from an Apache Kafka topic:
connector.class=com.mongodb.kafka.connect.MongoSinkConnector connection.uri=<your mongodb uri> database=<your database to write to> collection=<your collection to write to> topics=<your topic to read from> key.converter=io.confluent.connect.json.JsonSchemaConverter key.converter.schema.registry.url=<your schema registry uri> value.converter=io.confluent.connect.json.JsonSchemaConverter value.converter.schema.registry.url=<your schema registry uri>
The following properties files configure your connector to embed JSON Schemas in messages:
Important
Increased Message Size
Embedding a JSON Schema in your message increases the size of your message. To decrease the size of your messages while using JSON Schema, use Schema Registry.
The following properties file defines a source connector. This connector uses the default schema and a JSON Schema converter to write to an Apache Kafka topic:
connector.class=com.mongodb.kafka.connect.MongoSourceConnector connection.uri=<your mongodb uri> database=<your database to read from> collection=<your collection to read from> output.format.value=schema output.format.key=schema output.schema.infer.value=true key.converter.schemas.enable=true value.converter.schemas.enable=true key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter
The following properties file defines a sink connector. This connector uses a JSON Schema converter to read from an Apache Kafka topic:
connector.class=com.mongodb.kafka.connect.MongoSinkConnector connection.uri=<your mongodb uri> database=<your database to write to> collection=<your collection to write to> topics=<your topic to read from> key.converter.schemas.enable=true value.converter.schemas.enable=true key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter
To use the preceding properties file, replace the placeholder text in angle brackets with your information.
JSON Converter
Click the following tabs to view properties files that work with the JSON converter:
The following properties file defines a source connector. This connector uses a JSON converter to write to an Apache Kafka topic:
connector.class=com.mongodb.kafka.connect.MongoSourceConnector connection.uri=<your mongodb uri> database=<your database to read from> collection=<your collection to read from> output.format.value=json output.format.key=json key.converter.schemas.enable=false value.converter.schemas.enable=false key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter
The following properties file defines a sink connector. This connector uses a JSON converter to read from an Apache Kafka topic:
connector.class=com.mongodb.kafka.connect.MongoSinkConnector connection.uri=<your mongodb uri> database=<your database to write to> collection=<your collection to write to> topics=<your topic to read from> key.converter.schemas.enable=false value.converter.schemas.enable=false key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter
To use the preceding properties file, replace the placeholder text in angle brackets with your information.
String Converter (Raw JSON)
Click the following tabs to view properties files that work with the String converter:
The following properties file defines a source connector. This connector uses a String converter to write to an Apache Kafka topic:
connector.class=com.mongodb.kafka.connect.MongoSourceConnector connection.uri=<your mongodb uri> database=<your database to read from> collection=<your collection to read from> output.format.value=json output.format.key=json key.converter.schemas.enable=false value.converter.schemas.enable=false key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter
The following properties file defines a sink connector. This connector uses a String converter to read from an Apache Kafka topic:
connector.class=com.mongodb.kafka.connect.MongoSinkConnector connection.uri=<your mongodb uri> database=<your database to write to> collection=<your collection to write to> topics=<your topic to read from> key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter
Important
Received Strings Must be Valid JSON
Your sink connector must receive valid JSON strings from your Apache Kafka topic even when using a String converter.
To use the preceding properties file, replace the placeholder text in angle brackets with your information.