Docs Menu
Docs Home
/
MongoDB Kafka Connector
/ /

변경 이벤트를 필터링하도록 파이프라인 사용자 지정하기

이 사용 예제에서는 MongoDB Kafka source connector가 사용하는 데이터를 사용자 지정하도록 파이프라인 을 구성하는 방법을 보여줍니다. 파이프라인은 데이터를 필터링하거나 변환하라는 데이터베이스에 대한 지침으로 구성된 MongoDB 집계 파이프라인입니다.

MongoDB 는 변경 스트림 의 집계 파이프라인 과 일치하는 데이터 변경 사항을 connector 에 알립니다. 변경 스트림 은 클라이언트 가 MongoDB deployment 에 수행한 데이터 변경 사항을 실시간 설명하는 이벤트 시퀀스입니다. 자세한 내용은 Change Streams에 대한 MongoDB 서버 수동 항목을 참조하세요.

이벤트 를 조정하면서 특정 이벤트 의 각 게스트의 이름과 도착 시간을 수집하려고 한다고 가정해 보겠습니다. 게스트가 이벤트 에 체크인할 때마다 애플리케이션 은 다음 세부 정보가 포함된 새 문서 를 삽입합니다.

{
"_id": ObjectId(...),
"eventId": 321,
"name": "Dorothy Gale",
"arrivalTime": 2021-10-31T20:30:00.245Z
}

당신은 connector pipeline 설정을 정의하여 다음과 같이 change stream이 이벤트 정보를 필터링하도록 지시할 수 있습니다.

  • 삽입 작업에 대한 변경 이벤트를 생성하고 다른 모든 유형의 작업에 대한 이벤트를 생략합니다.

  • fullDocument.eventId 값 '321'과 일치하는 문서에 대해서만 변경 이벤트를 생성하고 다른 모든 문서는 생략합니다.

  • 프로젝션을 사용하여 fullDocument 객체에서 _ideventId 필드를 생략합니다.

이러한 변환을 적용하려면 pipeline 설정에 다음 집계 파이프라인을 할당합니다.

pipeline=[{"$match": { "$and": [{"operationType": "insert"}, { "fullDocument.eventId": 321 }] } }, {"$project": { "fullDocument._id": 0, "fullDocument.eventId": 0 } } ]

중요

파이프라인의 결과에 MongoDB가 재개 토큰의 값으로 사용하는 payload 객체의 최상위 _id 필드가 포함되어 있는지 확인합니다.

애플리케이션이 샘플 문서를 삽입하면 구성된 connector는 다음 레코드를 Kafka 주제에 게시합니다.

{
...
"payload": {
_id: { _data: ... },
"operationType": "insert",
"fullDocument": {
"name": "Dorothy Gale",
"arrivalTime": "2021-10-31T20:30:00.245Z",
},
"ns": { ... },
"documentKey": {
_id: {"$oid": ... }
}
}
}

소스 connector 를 사용하여 변경 스트림을 관리하는 방법에 대한 자세한 내용은 에 대한 connector 설명서를 Change Streams 참조하세요.

돌아가기

사용 예시