Docs Menu
Docs Home
/
MongoDB Kafka Connector
/

MongoDB Kafka Sink Connector 시작하기

이 페이지의 내용

  • MongoDB Kafka Sink Connector 시작하기
  • 요약
  • 자세히 알아보기

이 튜토리얼을 따라 Apache Kafka 주제에서 데이터를 읽고 이를 MongoDB 컬렉션에 쓰도록 MongoDB Kafka sink connector를 구성하는 방법을 알아보세요.

1

Kafka Connector 튜토리얼 설정 의 단계를 완료하여 Confluent Kafka Connect 및 MongoDB 환경을 시작합니다.

2

다음 명령을 사용하여 튜토리얼 Docker Container에서 인터랙티브 셸 세션을 만듭니다.

docker exec -it mongo1 /bin/bash

다음 명령을 사용하여 simplesink.json(이)라는 소스 구성 파일을 만듭니다.

nano simplesink.json

다음 구성 정보를 파일에 붙여넣고 변경 사항을 저장합니다.

{
"name": "mongo-tutorial-sink",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"topics": "Tutorial2.pets",
"connection.uri": "mongodb://mongo1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"database": "Tutorial2",
"collection": "pets"
}
}

참고

구성 속성에서 강조 표시된 줄은 커넥터에 Kafka에서 데이터를 변환하는 방법을 알려주는 변환기를 지정합니다.

셸에서 다음 명령을 실행하여 만든 구성 파일을 사용하여 싱크 커넥터를 시작합니다.

cx simplesink.json

참고

cx 명령은 튜토리얼 개발 환경에 포함된 맞춤 스크립트입니다. 이 스크립트는 Kafka Connect REST API에 대해 다음과 같은 동일한 요청을 실행하여 새 커넥터를 생성합니다.

curl -X POST -H "Content-Type: application/json" -d @simplesink.json http://connect:8083/connectors -w "\n"

커넥터의 상태를 확인하려면 셸에서 다음 명령을 실행합니다.

status

싱크 커넥터가 성공적으로 시작되면 다음 출력이 표시됩니다.

Kafka topics:
...
The status of the connectors:
sink | mongo-tutorial-sink | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSinkConnector
Currently configured connectors
[
"mongo-tutorial-sink"
]
...
3

같은 셸에서 Python 스크립트를 만들어 Kafka 주제에 데이터를 씁니다.

nano kafkawrite.py

다음 코드를 파일에 붙여넣고 변경 내용을 저장합니다.

from kafka import KafkaProducer
import json
from json import dumps
p = KafkaProducer(bootstrap_servers = ['broker:29092'], value_serializer = lambda x:dumps(x).encode('utf-8'))
data = {'name': 'roscoe'}
p.send('Tutorial2.pets', value = data)
p.flush()

Python 스크립트를 실행합니다.

python3 kafkawrite.py
4

동일한 셸에서 다음 명령을 실행하여 MongoDB Shell인 mongosh를 사용하여 MongoDB에 연결합니다.

mongosh "mongodb://mongo1"

연결에 성공하면 다음과 같은 MongoDB Shell 프롬프트가 표시됩니다.

rs0 [direct: primary] test>

프롬프트에서 다음 명령을 입력하여 Tutorial2.pets MongoDB 네임스페이스의 모든 문서를 조회합니다.

use Tutorial2
db.pets.find()

다음과 같은 문서가 결과로 반환됩니다.

{ _id: ObjectId("62659..."), name: 'roscoe' }

exit 명령을 입력하여 MongoDB Shell을 종료합니다.

5

이 튜토리얼을 완료한 후 Docker 자산을 중지하거나 제거하여 컴퓨터의 리소스를 확보하세요. Docker 컨테이너와 이미지를 모두 제거하거나 컨테이너만 제거하도록 선택할 수 있습니다. 컨테이너와 이미지를 제거한 경우 이를 다시 다운로드하여 크기가 약 2.4GB인 MongoDB Kafka Connector 개발 환경을 다시 시작해야 합니다. 컨테이너만 제거하면 이미지를 재사용할 수 있고 샘플 데이터 파이프라인에 있는 대부분의 대용량 파일을 다운로드하지 않아도 됩니다.

더 많은 튜토리얼

MongoDB Kafka Connector 튜토리얼을 더 완료하려는 경우 컨테이너만 제거하는 것이 좋습니다. MongoDB Kafka Connector 튜토리얼을 더 이상 완료하지 않으려면 컨테이너 및 이미지를 제거하는 것이 좋습니다.

실행할 제거 작업에 해당하는 탭을 선택합니다.

다음 셸 명령을 실행하여 개발 환경에 대한 Docker 컨테이너와 이미지를 제거합니다.

docker-compose -p mongo-kafka down --rmi all

다음 shell 명령을 실행하여 Docker 컨테이너를 제거하고 개발 환경용 이미지는 유지합니다.

docker-compose -p mongo-kafka down

컨테이너를 다시 시작하려면 튜토리얼 설정에서 컨테이너를 시작하는 데 필요한 것과 동일한 단계를 따르세요.

이 튜토리얼에서는 Kafka 주제의 데이터를 MongoDB cluster의 collection에 저장하도록 싱크 connector를 구성했습니다.

이 튜토리얼에서 언급된 개념에 대해 자세히 알아보려면 다음 리소스를 참조하세요.

  • 싱크 커넥터 구성 속성

  • Kafka Connector 컨버터 소개

  • Kafka Connect REST API

돌아가기

MongoDB Kafka Source Connector 시작하기