Hello All,
we are using docker container for connecting with mongodb sink connector and flowing below step.
Step one (YML file)
version: ‘2’
services:
broker:
image: confluentinc/cp-kafka:7.6.1
hostname: broker
container_name: kafka-0
restart: unless-stopped
ports:
- “9092:9092”
- “9101:9101”
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: ‘CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT’
KAFKA_ADVERTISED_LISTENERS: ‘PLAINTEXT://broker:29092,PLAINTEXT_HOST://0.0.0.0:9092’
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: 0.0.0.0
KAFKA_PROCESS_ROLES: ‘broker,controller’
KAFKA_CONTROLLER_QUORUM_VOTERS: ‘1@broker:29093’
KAFKA_LISTENERS: ‘PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092’
KAFKA_INTER_BROKER_LISTENER_NAME: ‘PLAINTEXT’
KAFKA_CONTROLLER_LISTENER_NAMES: ‘CONTROLLER’
KAFKA_LOG_DIRS: ‘/u01/kraft-combined-logs’
CLUSTER_ID: ‘MkU3OEVBNTcwNTJENDM2Qk’
volumes:
- ./kraft-combined-logs:/u01/kraft-combined-logs
- kafka-data:/var/lib/kafka/data
user: “1000:1000”
connect:
image: confluentinc/cp-server-connect:7.5.2
container_name: connect
depends_on:
- broker
ports:
- “8083:8083”
environment:
CONNECT_BOOTSTRAP_SERVERS: ‘broker:29092’
CONNECT_GROUP_ID: “debezium-example”
CONNECT_REST_ADVERTISED_HOST_NAME: connect
# topic for storing configurations
CONNECT_CONFIG_STORAGE_TOPIC: connect-demo-configs
# topic for storing offsets
CONNECT_OFFSET_STORAGE_TOPIC: connect-demo-offsets
# topic for storing connector statuses
CONNECT_STATUS_STORAGE_TOPIC: connect-demo-statuses
CONNECT_REPLICATION_FACTOR: 1
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
# specify the converter for keys
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
# specify the converter for values
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
# Plugin path
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
# since we are using Avro + Schema Registry, specify the URL for Schema Registry
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
# specify the MongoDB connection details
MONGO_URI: "mongodb://abc:XYZ@0.0.0.0:27017/?directConnection=true&serverSelectionTimeoutMS=2000&appName=mongosh+2.1.4"
MONGO_DB: "dev"
volumes:
- database:/opt/docker/db/data
- $PWD/stack-configs:/opt/docker/stack-configs
command:
- bash
- -c
- |
echo "Installing connector plugins"
confluent-hub install --no-prompt mongodb/kafka-connect-mongodb:1.6.1
echo "Launching Kafka Connect worker"
/etc/confluent/docker/run
mongodb:
image: mongo:4.4.6
container_name: mongodb
ports:
- “27017:27017”
environment:
MONGO_INITDB_ROOT_USERNAME: root
MONGO_INITDB_ROOT_PASSWORD: xxxx
volumes:
- mongodb_data:/data/db
volumes:
kafka-data:
database:
mongodb_data:
Step two :
after installing kafka, kafka connect and mongoDB successfully, and create topic then we have created json config file for connecting with the sink connector.
{
“connector.class”: “com.mongodb.kafka.connect.MongoSinkConnector”,
“tasks.max”: “1”,
“topics”: “kafka-topic”,
“connection.uri”: “mongodb://abc:XYZ@0.0.0.0:27017”,
“database”: “dev”,
“collection”: “kafka”,
“key.converter”: “org.apache.kafka.connect.storage.StringConverter”,
“value.converter”: “org.apache.kafka.connect.json.JsonConverter”,
“value.converter.schemas.enable”: “false”,
“mongo.errors.log.enable”: “true”
}
after that we have tested using curl
curl -X PUT -H “Content-Type: application/json” --data @mongodb-sink-connector.json
once run curl command then getting below error in mongodb log
{“t”:{“$date”:“2024-06-19T13:10:01.768+00:00”},“s”:“I”, “c”:“NETWORK”, “id”:51800, “ctx”:“conn48”,“msg”:“client metadata”,“attr”:{“remote”:“192.168.240.1:52934”,“client”:“conn48”,“doc”:{“driver”:{“name”:“mongo-java-driver|sync”,“version”:“4.3.1”},“os”:{“type”:“Linux”,“name”:“Linux”,“architecture”:“amd64”,“version”:“4.18.0-513.18.1.el8_9.x86_64”},“platform”:“Java/Azul Systems, Inc./11.0.21+9-LTS”}}}
{“t”:{“$date”:“2024-06-19T13:10:01.774+00:00”},“s”:“I”, “c”:“NETWORK”, “id”:22943, “ctx”:“listener”,“msg”:“Connection accepted”,“attr”:{“remote”:“192.168.240.1:52946”,“connectionId”:50,“connectionCount”:16}}
{“t”:{“$date”:“2024-06-19T13:10:01.776+00:00”},“s”:“I”, “c”:“NETWORK”, “id”:51800, “ctx”:“conn50”,“msg”:“client metadata”,“attr”:{“remote”:“192.168.240.1:52946”,“client”:“conn50”,“doc”:{“driver”:{“name”:“mongo-java-driver|sync”,“version”:“4.3.1”},“os”:{“type”:“Linux”,“name”:“Linux”,“architecture”:“amd64”,“version”:“4.18.0-513.18.1.el8_9.x86_64”},“platform”:“Java/Azul Systems, Inc./11.0.21+9-LTS”}}}
{“t”:{“$date”:“2024-06-19T13:10:01.856+00:00”},“s”:“I”, “c”:“ACCESS”, “id”:20250, “ctx”:“conn50”,“msg”:“Authentication succeeded”,“attr”:{“mechanism”:“SCRAM-SHA-256”,“speculative”:true,“principalName”:“root”,“authenticationDatabase”:“admin”,“remote”:“192.168.240.1:52946”,“extraInfo”:{}}}
{“t”:{“$date”:“2024-06-19T13:10:01.864+00:00”},“s”:“I”, “c”:“NETWORK”, “id”:22944, “ctx”:“conn50”,“msg”:“Connection ended”,“attr”:{“remote”:“192.168.240.1:52946”,“connectionId”:50,“connectionCount”:15}}
{“t”:{“$date”:“2024-06-19T13:10:01.865+00:00”},“s”:“I”, “c”:“-”, “id”:20883, “ctx”:“conn48”,“msg”:“Interrupted operation as its client disconnected”,“attr”:{“opId”:47123}}
{“t”:{“$date”:“2024-06-19T13:10:01.865+00:00”},“s”:“I”, “c”:“NETWORK”, “id”:22944, “ctx”:“conn49”,“msg”:“Connection ended”,“attr”:{“remote”:“192.168.240.1:52944”,“connectionId”:49,“connectionCount”:14}}
{“t”:{“$date”:“2024-06-19T13:10:01.865+00:00”},“s”:“I”, “c”:“NETWORK”, “id”:22944, “ctx”:“conn48”,“msg”:“Connection ended”,“attr”:{“remote”:“192.168.240.1:52934”,“connectionId”:48,“connectionCount”:13}}
{“t”:{“$date”:“2024-06-19T13:10:18.917+00:00”},“s”:“I”, “c”:“NETWORK”, “id”:22943, “ctx”:“listener”,“msg”:“Connection accepted”,“attr”:{“remote”:“192.168.240.1:60100”,“connectionId”:51,“connectionCount”:14}}
{“t”:{“$date”:“2024-06-19T13:10:18.918+00:00”},“s”:“I”, “c”:“NETWORK”, “id”:22943, “ctx”:“listener”,“msg”:“Connection accepted”,“attr”:{“remote”:“192.168.240.1:60102”,“connectionId”:52,“connectionCount”:15}}
{“t”:{“$date”:“2024-06-19T13:10:18.918+00:00”},“s”:“I”, “c”:“NETWORK”, “id”:51800, “ctx”:“conn52”,“msg”:“client metadata”,“attr”:{“remote”:“192.168.240.1:60102”,“client”:“conn52”,“doc”:{“driver”:{“name”:“mongo-java-driver|sync”,“version”:“4.3.1”},“os”:{“type”:“Linux”,“name”:“Linux”,“architecture”:“amd64”,“version”:“4.18.0-513.18.1.el8_9.x86_64”},“platform”:“Java/Azul Systems, Inc./11.0.21+9-LTS”}}}
{“t”:{“$date”:“2024-06-19T13:10:18.922+00:00”},“s”:“I”, “c”:“NETWORK”, “id”:51800, “ctx”:“conn51”,“msg”:“client metadata”,“attr”:{“remote”:“192.168.240.1:60100”,“client”:“conn51”,“doc”:{“driver”:{“name”:“mongo-java-driver|sync”,“version”:“4.3.1”},“os”:{“type”:“Linux”,“name”:“Linux”,“architecture”:“amd64”,“version”:“4.18.0-513.18.1.el8_9.x86_64”},“platform”:“Java/Azul Systems, Inc./11.0.21+9-LTS”}}}
{“t”:{“$date”:“2024-06-19T13:10:18.929+00:00”},“s”:“I”, “c”:“NETWORK”, “id”:22943, “ctx”:“listener”,“msg”:“Connection accepted”,“attr”:{“remote”:“192.168.240.1:60106”,“connectionId”:53,“connectionCount”:16}}
kafka connect log
[2024-06-19 13:10:19,041] INFO Instantiated connector mongodb-sink-connector with version 1.6.1-dirty of type class com.mongodb.kafka.connect.MongoSinkConnector (org.apache.kafka.connect.runtime.Worker)
[2024-06-19 13:10:19,041] INFO Finished creating connector mongodb-sink-connector (org.apache.kafka.connect.runtime.Worker)
[2024-06-19 13:10:19,051] INFO SinkConnectorConfig values:
config.action.reload = restart
connector.class = com.mongodb.kafka.connect.MongoSinkConnector
errors.deadletterqueue.context.headers.enable = false
errors.deadletterqueue.topic.name =
errors.deadletterqueue.topic.replication.factor = 3
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = class org.apache.kafka.connect.storage.StringConverter
name = mongodb-sink-connector
predicates =
tasks.max = 1
topics = [kafka-topic]
topics.regex =
transforms =
value.converter = class org.apache.kafka.connect.json.JsonConverter
(org.apache.kafka.connect.runtime.SinkConnectorConfig)
[2024-06-19 13:10:19,054] INFO EnrichedConnectorConfig values:
config.action.reload = restart
connector.class = com.mongodb.kafka.connect.MongoSinkConnector
errors.deadletterqueue.context.headers.enable = false
errors.deadletterqueue.topic.name =
errors.deadletterqueue.topic.replication.factor = 3
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = class org.apache.kafka.connect.storage.StringConverter
name = mongodb-sink-connector
predicates =
tasks.max = 1
topics = [kafka-topic]
topics.regex =
transforms =
value.converter = class org.apache.kafka.connect.json.JsonConverter
(org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig)
[2024-06-19 13:14:27,771] INFO [AdminClient clientId=debezium-example–shared-admin] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient)
we are not able to add kafka message data in mongodb, can you please look on issue and suggest us.
Thanks,
Pankaj D