Docs Menu
Docs Home
/
MongoDB Kafka Connector
/

MongoDB Kafka Source Connector 시작하기

이 페이지의 내용

  • MongoDB Kafka Source Connector 시작하기
  • 요약
  • 자세히 알아보기

이 튜토리얼을 따라 변경 스트림에서 데이터를 읽고 이를 Apache Kafka 주제에 게시하도록 MongoDB Kafka Source Connector를 구성하는 방법을 알아보세요.

1

Kafka Connector 튜토리얼 설정 의 단계를 완료하여 Confluent Kafka Connect 및 MongoDB 환경을 시작합니다.

2

다음 명령을 사용하여 튜토리얼 설치용으로 다운로드한 튜토리얼 Docker 컨테이너에서 대화형 셸 세션을 생성합니다.

docker exec -it mongo1 /bin/bash

다음 명령을 사용하여 simplesource.json(이)라는 소스 구성 파일을 만듭니다.

nano simplesource.json

다음 구성 정보를 파일에 붙여넣고 변경 사항을 저장합니다.

{
"name": "mongo-simple-source",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"connection.uri": "mongodb://mongo1",
"database": "Tutorial1",
"collection": "orders"
}
}

셸에서 다음 명령을 실행하여 만든 구성 파일을 사용하여 싱크 커넥터를 시작합니다.

cx simplesource.json

참고

cx 명령은 튜토리얼 개발 환경에 포함된 맞춤 스크립트입니다. 이 스크립트는 Kafka Connect REST API에 대해 다음과 같은 동일한 요청을 실행하여 새 커넥터를 생성합니다.

curl -X POST -H "Content-Type: application/json" -d @simplesource.json http://connect:8083/connectors -w "\n"

커넥터의 상태를 확인하려면 셸에서 다음 명령을 실행합니다.

status

싱크 커넥터가 성공적으로 시작되면 다음 출력이 표시됩니다.

Kafka topics:
...
The status of the connectors:
source | mongo-simple-source | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector
Currently configured connectors
[
"mongo-simple-source"
]
...
3

동일한 셸에서 다음 명령을 실행하여 MongoDB Shell인 mongosh를 사용하여 MongoDB에 연결합니다.

mongosh "mongodb://mongo1"

연결에 성공하면 다음과 같은 MongoDB Shell 프롬프트가 표시됩니다.

rs0 [direct: primary] test>

프롬프트에 다음 명령을 입력하여 새 문서를 삽입합니다:

use Tutorial1
db.orders.insertOne( { 'order_id' : 1, 'item' : 'coffee' } )

MongoDB가 삽입 명령을 완료하면 다음 텍스트와 유사한 승인을 받아야 합니다.

{
acknowledged: true,
insertedId: ObjectId("627e7e...")
}

exit 명령을 입력하여 MongoDB Shell을 종료합니다.

다음 명령을 사용하여 Kafka 환경의 상태를 확인하십시오.

status

이전 명령의 출력에는 변경 이벤트를 수신한 후 소스 커넥터가 생성한 새 항목이 표시되어야 합니다.

...
"topic": "Tutorial1.orders",
...

다음 명령을 실행하여 새 카프카 주제에 대한 데이터의 내용을 확인합니다:

kc Tutorial1.orders

참고

kc 명령은 카프카 토픽의 콘텐츠를 출력하는 헬퍼 스크립트입니다.

앞의 명령을 실행하면 "Key" 및 "Value" 섹션으로 구성된 다음 Kafka 주제 데이터가 표시됩니다:

출력의 "Value" 섹션에서 다음 형식의 JSON 문서에 강조 표시된 fullDocument 데이터를 포함하는 payload 부분을 찾을 수 있습니다.

{
"_id": {
"_data": "8262655A..."
},
"operationType": "insert",
"clusterTime": {
"$timestamp": {
"t": 1650809557,
"i": 2
}
},
"wallTime": {
"$date": "2022-10-13T17:06:23.409Z"
},
"fullDocument": {
"_id": {
"$oid": "62655a..."
},
"order_id": 1,
"item": "coffee"
},
"ns": {
"db": "Tutorial1",
"coll": "orders"
},
"documentKey": {
"_id": {
"$oid": "62655a..."
}
}
}
4

fullDocument 필드만 반환하도록 구성하여 변경 스트림에 의해 생성된 이벤트에서 메타데이터를 생략할 수 있습니다.

다음 명령을 사용하여 커넥터를 중지합니다.

del mongo-simple-source

참고

del 명령은 커넥터를 중지하기 위해 Kafka Connect REST API를 호출하는 헬퍼 스크립트이며 다음 명령과 동일합니다.

curl -X DELETE connect:8083/connectors/<parameter>

다음 명령을 사용하여 simplesource.json(이)라는 소스 구성 파일을 만듭니다.

nano simplesource.json

기존 구성을 제거하고 다음 구성을 추가한 후 파일을 저장합니다.

{
"name": "mongo-simple-source",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"connection.uri": "mongodb://mongo1",
"publish.full.document.only": true,
"database": "Tutorial1",
"collection": "orders"
}
}

셸에서 다음 명령을 실행하여 만든 구성 파일을 사용하여 싱크 커넥터를 시작합니다.

cx simplesource.json

다음 명령을 사용하여 mongosh 을 사용하여 MongoDB에 연결합니다.

mongosh "mongodb://mongo1"

프롬프트에 다음 명령을 입력하여 새 문서를 삽입합니다:

use Tutorial1
db.orders.insertOne( { 'order_id' : 2, 'item' : 'oatmeal' } )

다음 명령을 실행하여 mongosh 을 종료합니다.

exit

다음 명령을 실행하여 새 카프카 주제에 대한 데이터의 내용을 확인합니다:

kc Tutorial1.orders

"Value" 문서의 payload 필드에는 다음 문서 데이터만 포함되어야 합니다.

{ "_id": { "$oid": "<your _id value>" }, "order_id": 2, "item": "oatmeal" }
5

이 튜토리얼을 완료한 후 Docker 자산을 중지하거나 제거하여 컴퓨터의 리소스를 확보하세요. Docker 컨테이너와 이미지를 모두 제거하거나 컨테이너만 제거하도록 선택할 수 있습니다. 컨테이너와 이미지를 제거한 경우 이를 다시 다운로드하여 크기가 약 2.4GB인 MongoDB Kafka Connector 개발 환경을 다시 시작해야 합니다. 컨테이너만 제거하면 이미지를 재사용할 수 있고 샘플 데이터 파이프라인에 있는 대부분의 대용량 파일을 다운로드하지 않아도 됩니다.

더 많은 튜토리얼

MongoDB Kafka Connector 튜토리얼을 더 완료하려는 경우 컨테이너만 제거하는 것이 좋습니다. MongoDB Kafka Connector 튜토리얼을 더 이상 완료하지 않으려면 컨테이너 및 이미지를 제거하는 것이 좋습니다.

실행할 제거 작업에 해당하는 탭을 선택합니다.

다음 셸 명령을 실행하여 개발 환경에 대한 Docker 컨테이너와 이미지를 제거합니다.

docker-compose -p mongo-kafka down --rmi all

다음 shell 명령을 실행하여 Docker 컨테이너를 제거하고 개발 환경용 이미지는 유지합니다.

docker-compose -p mongo-kafka down

컨테이너를 다시 시작하려면 튜토리얼 설정에서 컨테이너를 시작하는 데 필요한 것과 동일한 단계를 따르세요.

이 튜토리얼에서는 다양한 구성을 사용하여 소스 커넥터를 시작하여 Kafka 주제에 게시된 변경 스트림 이벤트 데이터를 변경했습니다.

이 튜토리얼에서 언급된 개념에 대해 자세히 알아보려면 다음 리소스를 참조하세요.

  • 소스 커넥터 구성 속성

  • Kafka Connect REST API

돌아가기

MongoDB Change Streams 살펴보기