기존 데이터 복사
이 사용 예는 MongoDB Kafka 소스 커넥터를 사용하여 MongoDB 컬렉션에서 Apache Kafka 주제로 데이터를 복사하는 방법을 보여 줍니다.
예시
MongoDB 컬렉션 을 Apache Kafka 에 복사하고 일부 데이터를 필터하다 한다고 가정해 보겠습니다.
요구 사항과 솔루션은 다음과 같습니다.
요구 사항 | 솔루션 |
---|---|
MongoDB deployment에 있는 shopping 데이터베이스의 customers 컬렉션을 Apache Kafka 주제에 복사합니다. | See the Copy Data section of this guide. |
country 필드에 "멕시코" 값이 있는 문서만 복사합니다. | See the Filter Data section of this guide. |
customers
컬렉션에는 다음 문서가 포함되어 있습니다.
{ "_id": 1, "country": "Mexico", "purchases": 2, "last_viewed": { "$date": "2021-10-31T20:30:00.245Z" } } { "_id": 2, "country": "Iceland", "purchases": 8, "last_viewed": { "$date": "2015-07-20T10:00:00.135Z" } }
데이터 복사
소스 커넥터에서 다음 구성 옵션을 지정하여 shopping
데이터베이스의 customers
컬렉션 내용을 복사합니다.
database=shopping collection=customers startup.mode=copy_existing
소스 커넥터는 각 문서를 컬렉션에 삽입하는 것을 설명하는 변경 이벤트 문서를 생성하여 컬렉션을 복사합니다.
참고
데이터 복사로 인해 중복 이벤트가 발생할 수 있음
소스 커넥터가 데이터베이스의 기존 데이터를 변환하는 동안 시스템에서 데이터베이스의 데이터를 변경하는 경우, MongoDB는 최신 변경 사항을 반영하기 위해 중복된 변경 스트림 이벤트를 생성할 수 있습니다. 데이터 복사가 의존하는 변경 스트림 이벤트는 멱등성이 있으므로 복사된 데이터는 결국 일관성을 갖습니다.
변경 이벤트 문서에 학습 보려면 Change Streams 가이드 를 참조하세요.
startup.mode
옵션에 대해 자세히 알아보려면 시작 속성을 참조하세요.
데이터 필터링
소스 커넥터 구성의 startup.mode.copy.existing.pipeline
옵션에서 집계 파이프라인을 지정하여 데이터를 필터링할 수 있습니다. 다음 구성은 country
필드의 "멕시코"와 모든 문서를 일치시키는 집계 파이프라인을 지정합니다.
startup.mode.copy.existing.pipeline=[{ "$match": { "country": "Mexico" } }]
startup.mode.copy.existing.pipeline
옵션에 대해 자세히 알아보려면 시작 속성을 참조하세요.
집계 파이프라인에 대해 자세히 알아보려면 다음 리소스를 참조하세요.
MongoDB 매뉴얼의 애그리게이션 .
구성 지정
customers
컬렉션을 복사하기 위한 최종 소스 커넥터 구성은 다음과 같습니다.
connector.class=com.mongodb.kafka.connect.MongoSourceConnector connection.uri=<your production MongoDB connection uri> database=shopping collection=customers startup.mode=copy_existing startup.mode.copy.existing.pipeline=[{ "$match": { "country": "Mexico" } }]
커넥터가 데이터를 복사하면 shopping.customers
Apache Kafka 주제에 있는 이전 샘플 컬렉션에 해당하는 다음 변경 이벤트 문서가 표시됩니다.
{ "_id": { "_id": 1, "copyingData": true }, "operationType": "insert", "documentKey": { "_id": 1 }, "fullDocument": { "_id": 1, "country": "Mexico", "purchases": 2, "last_viewed": { "$date": "2021-10-31T20:30:00.245Z" } }, "ns": { "db": "shopping", "coll": "customers" } }
참고
주제의 데이터를 컬렉션에 쓰기
변경 데이터 캡처 핸들러를 사용하여 Apache Kafka 주제의 변경 이벤트 문서를 MongoDB 쓰기 작업으로 변환합니다. 자세한 내용은 변경 데이터 캡처 핸들러 가이드를 참조하세요.