Docs Menu

Docs HomeMongoDB Kafka Connector

Specify a Schema

This usage example demonstrates how you can configure your MongoDB Kafka source connector to apply a custom schema to your data. A schema is a definition that specifies the structure and type information about data in an Apache Kafka topic. Use a schema when you need to ensure the data on the topic populated by your source connector has a consistent structure.

To learn more about using schemas with the MongoDB Kafka Connector, see the Apply Schemas guide.

Suppose your application keeps track of customer data in a MongoDB collection, and you need to publish this data to a Kafka topic. You want the subscribers of the customer data to receive consistently formatted data. You choose to apply a schema to your data.

Your requirements and your solutions are as follows:

Requirement
Solution
Receive customer data from a MongoDB collection
Configure a MongoDB source connector to receive updates to data from a specific database and collection.
Provide the customer data schema
Specify a schema that corresponds to the structure and data types of the customer data.
Omit Kafka metadata from the customer data
Include only the data from the fullDocument field.

For the full configuration file that meets the requirements above, see Specify the Configuration.

To configure your source connector to receive data from a MongoDB collection, specify the database and collection name. For this example, you can configure the connector to read from the purchases collection in the customers database as follows:

database=customers
collection=purchases

A sample customer data document from your collection contains the following information:

{
"name": "Zola",
"visits": [
{
"$date": "2021-07-25T17:30:00.000Z"
},
{
"$date": "2021-10-03T14:06:00.000Z"
}
],
"goods_purchased": {
"apples": 1,
"bananas": 10
}
}

From the sample document, you decide your schema should present the fields using the following data types:

Field name
Data types
Description
name
Name of the customer
visits
Dates the customer visited
goods_purchased
map of string (the assumed type) to integer values
Names of goods and quantity of each item the customer purchased

You can describe your data using the Avro schema format as shown in the example schema below:

{
"type": "record",
"name": "Customer",
"fields": [{
"name": "name",
"type": "string"
},{
"name": "visits",
"type": {
"type": "array",
"items": {
"type": "long",
"logicalType": "timestamp-millis"
}
}
},{
"name": "goods_purchased",
"type": {
"type": "map",
"values": "int"
}
}
]
}

Important

Converters

If you want to send your data through Apache Kafka with ​ binary encoding, you must use an ​ converter. For more information, see the guide on Converters.

The connector publishes the customer data documents and metadata that describes the document to a Kafka topic. You can set the connector to include only the document data contained in the fullDocument field of the record using the following setting:

publish.full.document.only=true

For more information on the fullDocument field, see the Change Streams guide.

Your custom schema connector configuration should resemble the following:

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
connection.uri=<your MongoDB connection URI>
database=customers
collection=purchases
publish.full.document.only=true
output.format.value=schema
output.schema.value={\"type\": \"record\", \"name\": \"Customer\", \"fields\": [{\"name\": \"name\", \"type\": \"string\"}, {\"name\": \"visits\", \"type\": {\"type\": \"array\", \"items\": {\"type\": \"long\", \"logicalType\": \"timestamp-millis\"}}}, {\"name\": \"goods_purchased\", \"type\": {\"type\": \"map\", \"values\": \"int\"}}]}
value.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter

Note

Embedded Schema

In the preceding configuration, the JSON Schema converter embeds the custom schema in your messages. To learn more about the JSON Schema converter, see the Converters guide.

For more information on specifying schemas, see the Apply Schemas guide.

←  Copy Existing DataFundamentals →