Docs Menu
Docs Home
/
MongoDB Kafka Connector
/ /

기존 데이터 복사

이 사용 예는 MongoDB Kafka 소스 커넥터를 사용하여 MongoDB 컬렉션에서 Apache Kafka 주제로 데이터를 복사하는 방법을 보여 줍니다.

MongoDB 컬렉션 을 Apache Kafka 에 복사하고 일부 데이터를 필터하다 한다고 가정해 보겠습니다.

요구 사항과 솔루션은 다음과 같습니다.

요구 사항
솔루션

MongoDB deployment에 있는 shopping 데이터베이스의 customers 컬렉션을 Apache Kafka 주제에 복사합니다.

See the Copy Data section of this guide.

country 필드에 "멕시코" 값이 있는 문서만 복사합니다.

See the Filter Data section of this guide.

customers 컬렉션에는 다음 문서가 포함되어 있습니다.

{
"_id": 1,
"country": "Mexico",
"purchases": 2,
"last_viewed": { "$date": "2021-10-31T20:30:00.245Z" }
}
{
"_id": 2,
"country": "Iceland",
"purchases": 8,
"last_viewed": { "$date": "2015-07-20T10:00:00.135Z" }
}

소스 커넥터에서 다음 구성 옵션을 지정하여 shopping 데이터베이스의 customers 컬렉션 내용을 복사합니다.

database=shopping
collection=customers
startup.mode=copy_existing

소스 커넥터는 각 문서를 컬렉션에 삽입하는 것을 설명하는 변경 이벤트 문서를 생성하여 컬렉션을 복사합니다.

참고

데이터 복사로 인해 중복 이벤트가 발생할 수 있음

소스 커넥터가 데이터베이스의 기존 데이터를 변환하는 동안 시스템에서 데이터베이스의 데이터를 변경하는 경우, MongoDB는 최신 변경 사항을 반영하기 위해 중복된 변경 스트림 이벤트를 생성할 수 있습니다. 데이터 복사가 의존하는 변경 스트림 이벤트는 멱등성이 있으므로 복사된 데이터는 결국 일관성을 갖습니다.

변경 이벤트 문서에 대해 자세히 알아보려면 Change Streams 가이드를 참조하세요.

startup.mode 옵션에 대해 자세히 알아보려면 시작 속성을 참조하세요.

소스 커넥터 구성의 startup.mode.copy.existing.pipeline 옵션에서 집계 파이프라인을 지정하여 데이터를 필터링할 수 있습니다. 다음 구성은 country 필드의 "멕시코"와 모든 문서를 일치시키는 집계 파이프라인을 지정합니다.

startup.mode.copy.existing.pipeline=[{ "$match": { "country": "Mexico" } }]

startup.mode.copy.existing.pipeline 옵션에 대해 자세히 알아보려면 시작 속성을 참조하세요.

집계 파이프라인에 대해 자세히 알아보려면 다음 리소스를 참조하세요.

  • 변경 이벤트 필터링을 위한 파이프라인 사용자 정의 사용 예시

  • MongoDB 매뉴얼의 애그리게이션 .

customers 컬렉션을 복사하기 위한 최종 소스 커넥터 구성은 다음과 같습니다.

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
connection.uri=<your production MongoDB connection uri>
database=shopping
collection=customers
startup.mode=copy_existing
startup.mode.copy.existing.pipeline=[{ "$match": { "country": "Mexico" } }]

커넥터가 데이터를 복사하면 shopping.customers Apache Kafka 주제에 있는 이전 샘플 컬렉션에 해당하는 다음 변경 이벤트 문서가 표시됩니다.

{
"_id": { "_id": 1, "copyingData": true },
"operationType": "insert",
"documentKey": { "_id": 1 },
"fullDocument": {
"_id": 1,
"country": "Mexico",
"purchases": 2,
"last_viewed": { "$date": "2021-10-31T20:30:00.245Z" }
},
"ns": { "db": "shopping", "coll": "customers" }
}

참고

주제의 데이터를 컬렉션에 쓰기

변경 데이터 캡처 핸들러를 사용하여 Apache Kafka 주제의 변경 이벤트 문서를 MongoDB 쓰기 작업으로 변환합니다. 자세한 내용은 변경 데이터 캡처 핸들러 가이드를 참조하세요.

돌아가기

주제 이름 지정