MongoDB Kafka Source Connector 시작하기
이 튜토리얼을 따라 변경 스트림에서 데이터를 읽고 이를 Apache Kafka 주제에 게시하도록 MongoDB Kafka Source Connector를 구성하는 방법을 알아보세요.
MongoDB Kafka Source Connector 시작하기
튜토리얼 설정 완료
Kafka Connector 튜토리얼 설정 의 단계를 완료하여 Confluent Kafka Connect 및 MongoDB 환경을 시작합니다.
소스 커넥터 구성하기
다음 명령을 사용하여 튜토리얼 설치용으로 다운로드한 튜토리얼 Docker 컨테이너에서 대화형 셸 세션을 생성합니다.
docker exec -it mongo1 /bin/bash
다음 명령을 사용하여 simplesource.json
(이)라는 소스 구성 파일을 만듭니다.
nano simplesource.json
다음 구성 정보를 파일에 붙여넣고 변경 사항을 저장합니다.
{ "name": "mongo-simple-source", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector", "connection.uri": "mongodb://mongo1", "database": "Tutorial1", "collection": "orders" } }
셸에서 다음 명령을 실행하여 만든 구성 파일을 사용하여 싱크 커넥터를 시작합니다.
cx simplesource.json
참고
cx
명령은 튜토리얼 개발 환경에 포함된 맞춤 스크립트입니다. 이 스크립트는 Kafka Connect REST API에 대해 다음과 같은 동일한 요청을 실행하여 새 커넥터를 생성합니다.
curl -X POST -H "Content-Type: application/json" -d @simplesource.json http://connect:8083/connectors -w "\n"
커넥터의 상태를 확인하려면 셸에서 다음 명령을 실행합니다.
status
싱크 커넥터가 성공적으로 시작되면 다음 출력이 표시됩니다.
Kafka topics: ... The status of the connectors: source | mongo-simple-source | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector Currently configured connectors [ "mongo-simple-source" ] ...
변경 이벤트 만들기
동일한 셸에서 다음 명령을 실행하여 MongoDB Shell인 mongosh
를 사용하여 MongoDB에 연결합니다.
mongosh "mongodb://mongo1"
연결에 성공하면 다음과 같은 MongoDB Shell 프롬프트가 표시됩니다.
rs0 [direct: primary] test>
프롬프트에 다음 명령을 입력하여 새 문서를 삽입합니다:
use Tutorial1 db.orders.insertOne( { 'order_id' : 1, 'item' : 'coffee' } )
MongoDB가 삽입 명령을 완료하면 다음 텍스트와 유사한 승인을 받아야 합니다.
{ acknowledged: true, insertedId: ObjectId("627e7e...") }
exit
명령을 입력하여 MongoDB Shell을 종료합니다.
다음 명령을 사용하여 Kafka 환경의 상태를 확인하십시오.
status
이전 명령의 출력에는 변경 이벤트를 수신한 후 소스 커넥터가 생성한 새 항목이 표시되어야 합니다.
... "topic": "Tutorial1.orders", ...
다음 명령을 실행하여 새 카프카 주제에 대한 데이터의 내용을 확인합니다:
kc Tutorial1.orders
참고
kc
명령은 카프카 토픽의 콘텐츠를 출력하는 헬퍼 스크립트입니다.
앞의 명령을 실행하면 "Key" 및 "Value" 섹션으로 구성된 다음 Kafka 주제 데이터가 표시됩니다:
출력의 "Value" 섹션에서 다음 형식의 JSON 문서에 강조 표시된 fullDocument
데이터를 포함하는 payload
부분을 찾을 수 있습니다.
{ "_id": { "_data": "8262655A..." }, "operationType": "insert", "clusterTime": { "$timestamp": { "t": 1650809557, "i": 2 } }, "wallTime": { "$date": "2022-10-13T17:06:23.409Z" }, "fullDocument": { "_id": { "$oid": "62655a..." }, "order_id": 1, "item": "coffee" }, "ns": { "db": "Tutorial1", "coll": "orders" }, "documentKey": { "_id": { "$oid": "62655a..." } } }
변경 스트림 재구성
fullDocument
필드만 반환하도록 구성하여 변경 스트림에 의해 생성된 이벤트에서 메타데이터를 생략할 수 있습니다.
다음 명령을 사용하여 커넥터를 중지합니다.
del mongo-simple-source
참고
del
명령은 커넥터를 중지하기 위해 Kafka Connect REST API를 호출하는 헬퍼 스크립트이며 다음 명령과 동일합니다.
curl -X DELETE connect:8083/connectors/<parameter>
다음 명령을 사용하여 simplesource.json
(이)라는 소스 구성 파일을 만듭니다.
nano simplesource.json
기존 구성을 제거하고 다음 구성을 추가한 후 파일을 저장합니다.
{ "name": "mongo-simple-source", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector", "connection.uri": "mongodb://mongo1", "publish.full.document.only": true, "database": "Tutorial1", "collection": "orders" } }
셸에서 다음 명령을 실행하여 만든 구성 파일을 사용하여 싱크 커넥터를 시작합니다.
cx simplesource.json
다음 명령을 사용하여 mongosh
을 사용하여 MongoDB에 연결합니다.
mongosh "mongodb://mongo1"
프롬프트에 다음 명령을 입력하여 새 문서를 삽입합니다:
use Tutorial1 db.orders.insertOne( { 'order_id' : 2, 'item' : 'oatmeal' } )
다음 명령을 실행하여 mongosh
을 종료합니다.
exit
다음 명령을 실행하여 새 카프카 주제에 대한 데이터의 내용을 확인합니다:
kc Tutorial1.orders
"Value" 문서의 payload
필드에는 다음 문서 데이터만 포함되어야 합니다.
{ "_id": { "$oid": "<your _id value>" }, "order_id": 2, "item": "oatmeal" }
(선택 사항) Docker 컨테이너 중지
이 튜토리얼을 완료한 후 Docker 자산을 중지하거나 제거하여 컴퓨터의 리소스를 확보하세요. Docker 컨테이너와 이미지를 모두 제거하거나 컨테이너만 제거하도록 선택할 수 있습니다. 컨테이너와 이미지를 제거한 경우 이를 다시 다운로드하여 크기가 약 2.4GB인 MongoDB Kafka Connector 개발 환경을 다시 시작해야 합니다. 컨테이너만 제거하면 이미지를 재사용할 수 있고 샘플 데이터 파이프라인에 있는 대부분의 대용량 파일을 다운로드하지 않아도 됩니다.
팁
더 많은 튜토리얼
MongoDB Kafka Connector 튜토리얼을 더 완료하려는 경우 컨테이너만 제거하는 것이 좋습니다. MongoDB Kafka Connector 튜토리얼을 더 이상 완료하지 않으려면 컨테이너 및 이미지를 제거하는 것이 좋습니다.
실행할 제거 작업에 해당하는 탭을 선택합니다.
다음 셸 명령을 실행하여 개발 환경에 대한 Docker 컨테이너와 이미지를 제거합니다.
docker-compose -p mongo-kafka down --rmi all
다음 shell 명령을 실행하여 Docker 컨테이너를 제거하고 개발 환경용 이미지는 유지합니다.
docker-compose -p mongo-kafka down
컨테이너를 다시 시작하려면 튜토리얼 설정에서 컨테이너를 시작하는 데 필요한 것과 동일한 단계를 따르세요.
요약
이 튜토리얼에서는 다양한 구성을 사용하여 소스 커넥터를 시작하여 Kafka 주제에 게시된 변경 스트림 이벤트 데이터를 변경했습니다.
자세히 알아보기
이 튜토리얼에서 언급된 개념에 대해 자세히 알아보려면 다음 리소스를 참조하세요.