Docs Menu
Docs Home
/
MongoDB Kafka Connector
/

변경 데이터 캡처 핸들러로 데이터 복제하기

이 페이지의 내용

  • 개요
  • CDC 핸들러로 데이터 복제
  • 튜토리얼 설정 완료
  • 대화형 셸 시작
  • 소스 커넥터 구성하기
  • 싱크 커넥터 구성
  • Kafka Topic 모니터링
  • 소스에 데이터 쓰기 및 데이터 흐름 보기
  • (선택 사항) 추가 변경 사항 생성
  • 요약
  • 자세히 알아보기

이 튜토리얼을 따라 변경 데이터 캡처(CDC) 핸들러 를 사용하여 MongoDB Kafka Connector로 데이터를 복제하는 방법을 알아보세요. CDC 핸들러는 CDC 이벤트를 MongoDB 쓰기 작업으로 변환하는 애플리케이션입니다. 한 데이터스토어의 변경 사항을 다른 데이터스토어에 재현해야 하는 경우 CDC 핸들러를 사용합니다.

이 튜토리얼에서는 CDC를 사용하여 두 개의 MongoDB 컬렉션에 동일한 문서가 포함되도록 하기 위해 MongoDB Kafka 소스 및 싱크 커넥터를 구성하고 실행합니다. 소스 커넥터는 원본 컬렉션의 변경 스트림 데이터를 Kafka 주제에 기록하고, 싱크 커넥터는 대상 MongoDB 컬렉션에 Kafka 주제 데이터를 씁니다.

CDC 핸들러의 작동 방식에 학습 보려면 데이터 캡처 핸들러 변경 가이드 를 참조하세요.

1

Confluent Kafka Connect 및 MongoDB 환경을 시작하려면 Kafka Connector 튜토리얼 설정의 단계를 완료하세요.

2

Docker 컨테이너에서 별도의 창에서 두 개의 대화형 셸을 시작합니다. 튜토리얼에서는 셸을 사용하여 다양한 작업을 실행하고 관찰할 수 있습니다.

터미널에서 대화식 셸을 시작하려면 다음 명령을 실행합니다.

docker exec -it mongo1 /bin/bash

이 튜토리얼 전체에서는 이 대화형 셸을 CDCShell1 이라고 합니다.

대화형 셸을 시작하려면 두 번째 터미널에서 다음 명령을 실행합니다.

docker exec -it mongo1 /bin/bash

이 튜토리얼 전체에서는 이 대화형 셸을 CDCShell2 라고 합니다.

화면에 있는 두 개의 창을 정렬하여 두 창이 모두 보이도록 하면 실시간 업데이트를 볼 수 있습니다.

CDCShell1을 사용하여 커넥터를 구성하고 카프카 주제를 모니터링하세요. CDCShell2를 사용하여 MongoDB에서 쓰기 조작을 수행하십시오.

3

CDCShell1에서 CDCTutorial.Source MongoDB 네임스페이스에서 읽고 CDCTutorial.Source Kafka 주제에 쓰도록 소스 커넥터를 구성합니다.

다음 명령을 사용하여 cdc-source.json 이라는 구성 파일을 만듭니다:

nano cdc-source.json

다음 구성 정보를 파일에 붙여넣고 변경 사항을 저장합니다.

{
"name": "mongo-cdc-source",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"connection.uri": "mongodb://mongo1",
"database": "CDCTutorial",
"collection": "Source"
}
}

CDCShell1에서 다음 명령을 실행하여 만든 구성 파일을 사용하여 원본 커넥터를 시작합니다.

cx cdc-source.json

참고

cx 명령은 튜토리얼 개발 환경에 포함된 맞춤 스크립트입니다. 이 스크립트는 Kafka Connect REST API에 대해 다음과 같은 동일한 요청을 실행하여 새 커넥터를 생성합니다.

curl -X POST -H "Content-Type: application/json" -d @cdc-source.json http://connect:8083/connectors -w "\n"

커넥터의 상태를 확인하려면 셸에서 다음 명령을 실행합니다.

status

싱크 커넥터가 성공적으로 시작되면 다음 출력이 표시됩니다.

Kafka topics:
...
The status of the connectors:
source | mongo-cdc-source | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector
Currently configured connectors
[
"mongo-cdc-source"
]
...
4

CDCShell1에서 CDCTutorial.Source Kafka 항목의 데이터를 CDCTutorial.Destination MongoDB 네임스페이스로 데이터를 복사합니다.

다음 명령을 사용하여 cdc-sink.json 이라는 구성 파일을 만듭니다:

nano cdc-sink.json

다음 구성 정보를 파일에 붙여넣고 변경 사항을 저장합니다.

{
"name": "mongo-cdc-sink",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"topics": "CDCTutorial.Source",
"change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler",
"connection.uri": "mongodb://mongo1",
"database": "CDCTutorial",
"collection": "Destination"
}
}

셸에서 다음 명령을 실행하여 만든 구성 파일을 사용하여 싱크 커넥터를 시작합니다.

cx cdc-sink.json

커넥터의 상태를 확인하려면 셸에서 다음 명령을 실행합니다.

status

싱크 커넥터가 성공적으로 시작되면 다음 출력이 표시됩니다.

Kafka topics:
...
The status of the connectors:
sink | mongo-cdc-sink | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSinkConnector
source | mongo-cdc-source | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector
Currently configured connectors
[
"mongo-cdc-sink"
"mongo-cdc-source"
]
...
5

CDCShell1에서 수신 이벤트에 대한 Kafka 토픽을 모니터합니다. 다음 명령어를 실행하여 주제에 게시된 데이터를 출력하는 kafkacat 애플리케이션을 시작합니다.

kc CDCTutorial.Source

참고

kc 명령은 튜토리얼 개발 환경에 포함된 사용자 지정 스크립트로, kafkacat 애플리케이션을 호출하여 Kafka에 연결하고 지정된 주제의 출력 형식을 지정하는 옵션을 제공합니다.

일단 시작하면 현재 읽을 데이터가 없음을 나타내는 다음 출력이 표시됩니다.

% Reached end of topic CDCTutorial.Source [0] at offset 0
6

CDCShell2에서 다음 명령을 실행하여 mongosh, 즉 MongoDB Shell을 사용하여 MongoDB에 연결합니다:

mongosh "mongodb://mongo1"

연결에 성공하면 다음과 같은 MongoDB Shell 프롬프트가 표시됩니다.

rs0 [direct: primary] test>

프롬프트에서 다음 명령을 입력하여 CDCTutorial.Source MongoDB 네임스페이스에 새 문서를 삽입합니다.

use CDCTutorial
db.Source.insertOne({ proclaim: "Hello World!" });

MongoDB가 삽입 명령을 완료하면 다음 텍스트와 유사한 승인을 받아야 합니다.

{
acknowledged: true,
insertedId: ObjectId("600b38ad...")
}

소스 커넥터는 변경 사항을 선택하여 Kafka 주제에 게시합니다. CDCShell1 창에 다음과 같은 주제 메시지가 표시되어야 합니다.

{
"schema": { "type": "string", "optional": false },
"payload": {
"_id": { "_data": "8260..." },
"operationType": "insert",
"clusterTime": { "$timestamp": { "t": 1611..., "i": 2 } },
"wallTime": { "$date": "..." },
"fullDocument": {
"_id": { "$oid": "600b38ad..." },
"proclaim": "Hello World!"
},
"ns": { "db": "CDCTutorial", "coll": "Source" },
"documentKey": { "_id": { "$oid": "600b38a..." } }
}
}

싱크 커넥터는 카프카 메시지를 수신하고 데이터를 MongoDB로 싱크합니다. CDCShell2에서 시작한 MongoDB Shell에서 다음 명령을 실행하여 MongoDB의 CDCTutorial.Destination 네임스페이스에서 문서를 검색할 수 있습니다:

db.Destination.find()

다음과 같은 문서가 결과로 반환됩니다.

[
{
_id: ObjectId("600b38a..."),
proclaim: 'Hello World'
}
]
7

MongoDB Shell에서 다음 명령을 실행하여 CDCTutorial.Source 네임스페이스에서 문서를 제거해 보세요.

db.Source.deleteMany({})

CDCShell1 창에 다음과 같은 주제 메시지가 표시되어야 합니다.

{
"schema": { "type": "string", "optional": false },
"payload": {
"_id": { "_data": "8261...." },
...
"operationType": "delete",
"clusterTime": { "$timestamp": { "t": 1631108282, "i": 1 } },
"ns": { "db": "CDCTutorial", "coll": "Source" },
"documentKey": { "_id": { "$oid": "6138..." } }
}
}

컬렉션에 있는 현재 문서 수를 검색하려면 다음 명령을 실행합니다.

db.Destination.count()

그러면 컬렉션이 비어 있음을 나타내는 다음 출력이 반환됩니다.

0

다음 명령을 실행하여 MongoDB Shell을 종료합니다:

exit

이 자습서에서는 MongoDB 컬렉션에 대한 변경 사항을 캡처하여 Apache Kafka로 보내도록 소스 커넥터를 설정합니다. 또한 MongoDB CDC 핸들러가 있는 싱크 커넥터를 구성하여 Apache Kafka에서 MongoDB 컬렉션으로 데이터를 이동했습니다.

이 튜토리얼에서 언급된 개념에 대해 자세히 알아보려면 다음 리소스를 참조하세요.

돌아가기

MongoDB Kafka Sink Connector 시작하기