기존 데이터 복사
이 사용 예는 MongoDB Kafka 소스 커넥터를 사용하여 MongoDB 컬렉션에서 Apache Kafka 주제로 데이터를 복사하는 방법을 보여 줍니다.
예시
MongoDB 컬렉션 을 Apache Kafka 에 복사하고 일부 데이터를 필터하다 한다고 가정해 보겠습니다.
요구 사항과 솔루션은 다음과 같습니다.
요구 사항 | 솔루션 |
---|---|
MongoDB deployment에 있는 | See the Copy Data section of this guide. |
| See the Filter Data section of this guide. |
customers
컬렉션에는 다음 문서가 포함되어 있습니다.
{ "_id": 1, "country": "Mexico", "purchases": 2, "last_viewed": { "$date": "2021-10-31T20:30:00.245Z" } } { "_id": 2, "country": "Iceland", "purchases": 8, "last_viewed": { "$date": "2015-07-20T10:00:00.135Z" } }
데이터 복사
소스 커넥터에서 다음 구성 옵션을 지정하여 shopping
데이터베이스의 customers
컬렉션 내용을 복사합니다.
database=shopping collection=customers startup.mode=copy_existing
소스 커넥터는 각 문서를 컬렉션에 삽입하는 것을 설명하는 변경 이벤트 문서를 생성하여 컬렉션을 복사합니다.
참고
데이터 복사로 인해 중복 이벤트가 발생할 수 있음
소스 커넥터가 데이터베이스의 기존 데이터를 변환하는 동안 시스템에서 데이터베이스의 데이터를 변경하는 경우, MongoDB는 최신 변경 사항을 반영하기 위해 중복된 변경 스트림 이벤트를 생성할 수 있습니다. 데이터 복사가 의존하는 변경 스트림 이벤트는 멱등성이 있으므로 복사된 데이터는 결국 일관성을 갖습니다.
변경 이벤트 문서에 대해 자세히 알아보려면 Change Streams 가이드를 참조하세요.
startup.mode
옵션에 대해 자세히 알아보려면 시작 속성을 참조하세요.
데이터 필터링
소스 커넥터 구성의 startup.mode.copy.existing.pipeline
옵션에서 집계 파이프라인을 지정하여 데이터를 필터링할 수 있습니다. 다음 구성은 country
필드의 "멕시코"와 모든 문서를 일치시키는 집계 파이프라인을 지정합니다.
startup.mode.copy.existing.pipeline=[{ "$match": { "country": "Mexico" } }]
startup.mode.copy.existing.pipeline
옵션에 대해 자세히 알아보려면 시작 속성을 참조하세요.
집계 파이프라인에 대해 자세히 알아보려면 다음 리소스를 참조하세요.
MongoDB 매뉴얼의 애그리게이션 .
구성 지정
customers
컬렉션을 복사하기 위한 최종 소스 커넥터 구성은 다음과 같습니다.
connector.class=com.mongodb.kafka.connect.MongoSourceConnector connection.uri=<your production MongoDB connection uri> database=shopping collection=customers startup.mode=copy_existing startup.mode.copy.existing.pipeline=[{ "$match": { "country": "Mexico" } }]
커넥터가 데이터를 복사하면 shopping.customers
Apache Kafka 주제에 있는 이전 샘플 컬렉션에 해당하는 다음 변경 이벤트 문서가 표시됩니다.
{ "_id": { "_id": 1, "copyingData": true }, "operationType": "insert", "documentKey": { "_id": 1 }, "fullDocument": { "_id": 1, "country": "Mexico", "purchases": 2, "last_viewed": { "$date": "2021-10-31T20:30:00.245Z" } }, "ns": { "db": "shopping", "coll": "customers" } }
참고
주제의 데이터를 컬렉션에 쓰기
변경 데이터 캡처 핸들러를 사용하여 Apache Kafka 주제의 변경 이벤트 문서를 MongoDB 쓰기 작업으로 변환합니다. 자세한 내용은 변경 데이터 캡처 핸들러 가이드를 참조하세요.