Docs Home → MongoDB Kafka Connector
Specify a Schema
This usage example demonstrates how you can configure your MongoDB Kafka source connector to apply a custom schema to your data. A schema is a definition that specifies the structure and type information about data in an Apache Kafka topic. Use a schema when you need to ensure the data on the topic populated by your source connector has a consistent structure.
To learn more about using schemas with the MongoDB Kafka Connector, see the Apply Schemas guide.
Example
Suppose your application keeps track of customer data in a MongoDB collection, and you need to publish this data to a Kafka topic. You want the subscribers of the customer data to receive consistently formatted data. You choose to apply a schema to your data.
Your requirements and your solutions are as follows:
Requirement | Solution |
---|---|
Receive customer data from a MongoDB collection | Configure a MongoDB source connector to receive updates to data
from a specific database and collection. |
Provide the customer data schema | Specify a schema that corresponds to the structure and data types of
the customer data. |
Omit Kafka metadata from the customer data | Include only the data from the fullDocument field. |
For the full configuration file that meets the requirements above, see Specify the Configuration.
Receive Data from a Collection
To configure your source connector to receive data from a MongoDB collection,
specify the database and collection name. For this example, you can
configure the connector to read from the purchases
collection in the
customers
database as follows:
database=customers collection=purchases
Create a Custom Schema
A sample customer data document from your collection contains the following information:
{ "name": "Zola", "visits": [ { "$date": "2021-07-25T17:30:00.000Z" }, { "$date": "2021-10-03T14:06:00.000Z" } ], "goods_purchased": { "apples": 1, "bananas": 10 } }
From the sample document, you decide your schema should present the fields using the following data types:
Field name | Data types | Description |
---|---|---|
name | Name of the customer | |
visits | array
of timestamps | Dates the customer visited |
goods_purchased | Names of goods and quantity of each item the customer purchased |
You can describe your data using the Avro schema format as shown in the example schema below:
{ "type": "record", "name": "Customer", "fields": [{ "name": "name", "type": "string" },{ "name": "visits", "type": { "type": "array", "items": { "type": "long", "logicalType": "timestamp-millis" } } },{ "name": "goods_purchased", "type": { "type": "map", "values": "int" } } ] }
Important
Converters
If you want to send your data through Apache Kafka with binary encoding, you must use an converter. For more information, see the guide on Converters.
Omit Metadata from Published Records
The connector publishes the customer data documents and metadata
that describes the document to a Kafka topic. You can set the connector to
include only the document data contained in the fullDocument
field of the
record using the following setting:
publish.full.document.only=true
For more information on the fullDocument
field, see the
Change Streams guide.
Specify the Configuration
Your custom schema connector configuration should resemble the following:
connector.class=com.mongodb.kafka.connect.MongoSourceConnector connection.uri=<your MongoDB connection URI> database=customers collection=purchases publish.full.document.only=true output.format.value=schema output.schema.value={\"type\": \"record\", \"name\": \"Customer\", \"fields\": [{\"name\": \"name\", \"type\": \"string\"}, {\"name\": \"visits\", \"type\": {\"type\": \"array\", \"items\": {\"type\": \"long\", \"logicalType\": \"timestamp-millis\"}}}, {\"name\": \"goods_purchased\", \"type\": {\"type\": \"map\", \"values\": \"int\"}}]} value.converter.schemas.enable=true value.converter=org.apache.kafka.connect.json.JsonConverter key.converter=org.apache.kafka.connect.storage.StringConverter
Note
Embedded Schema
In the preceding configuration, the JSON Schema converter embeds the custom schema in your messages. To learn more about the JSON Schema converter, see the Converters guide.
For more information on specifying schemas, see the Apply Schemas guide.