변경 이벤트를 필터링하도록 파이프라인 사용자 지정하기
이 사용 예제에서는 MongoDB Kafka source connector가 사용하는 데이터를 사용자 지정하도록 파이프라인 을 구성하는 방법을 보여줍니다. 파이프라인은 데이터를 필터링하거나 변환하라는 데이터베이스에 대한 지침으로 구성된 MongoDB 집계 파이프라인입니다.
MongoDB 는 변경 스트림 의 집계 파이프라인 과 일치하는 데이터 변경 사항을 connector 에 알립니다. 변경 스트림 은 클라이언트 가 MongoDB deployment 에 수행한 데이터 변경 사항을 실시간 설명하는 이벤트 시퀀스입니다. 자세한 내용은 Change Streams에 대한 MongoDB 서버 수동 항목을 참조하세요.
예시
이벤트 를 조정하면서 특정 이벤트 의 각 게스트의 이름과 도착 시간을 수집하려고 한다고 가정해 보겠습니다. 게스트가 이벤트 에 체크인할 때마다 애플리케이션 은 다음 세부 정보가 포함된 새 문서 를 삽입합니다.
{ "_id": ObjectId(...), "eventId": 321, "name": "Dorothy Gale", "arrivalTime": 2021-10-31T20:30:00.245Z }
당신은 connector pipeline
설정을 정의하여 다음과 같이 change stream이 이벤트 정보를 필터링하도록 지시할 수 있습니다.
삽입 작업에 대한 변경 이벤트를 생성하고 다른 모든 유형의 작업에 대한 이벤트를 생략합니다.
fullDocument.eventId
값 '321'과 일치하는 문서에 대해서만 변경 이벤트를 생성하고 다른 모든 문서는 생략합니다.프로젝션을 사용하여
fullDocument
객체에서_id
및eventId
필드를 생략합니다.
이러한 변환을 적용하려면 pipeline
설정에 다음 집계 파이프라인을 할당합니다.
pipeline=[{"$match": { "$and": [{"operationType": "insert"}, { "fullDocument.eventId": 321 }] } }, {"$project": { "fullDocument._id": 0, "fullDocument.eventId": 0 } } ]
중요
파이프라인 의 결과에 payload
객체 의 최상위 _id
및 ns
필드가 포함되어 있는지 확인합니다. MongoDB 는 재개 토큰 의 값으로 id
을 사용하고 ns
Kafka 출력 주제 이름을 생성합니다.
애플리케이션이 샘플 문서를 삽입하면 구성된 connector는 다음 레코드를 Kafka 주제에 게시합니다.
{ ... "payload": { _id: { _data: ... }, "operationType": "insert", "fullDocument": { "name": "Dorothy Gale", "arrivalTime": "2021-10-31T20:30:00.245Z", }, "ns": { ... }, "documentKey": { _id: {"$oid": ... } } } }
소스 connector 를 사용하여 변경 스트림을 관리하는 방법에 대한 자세한 내용은 에 대한 connector 설명서를 Change Streams 참조하세요.