모델 전략 작성
이 페이지의 내용
개요
이 가이드에서는 MongoDB Kafka 싱크 커넥터가 MongoDB에 데이터를 쓰는 방식을 변경하는 방법을 보여줍니다.
다음을 포함한 사용 사례에 대하여 커넥터가 MongoDB에 데이터를 쓰는 방식을 변경할 수 있습니다.
문서를 위로 올리는 대신 삽입하기
_id
필드 이외의 필터와 일치하는 문서를 교체 또는 업데이트합니다.필터와 일치하는 문서 삭제하기
모델 쓰기 전략을 지정하여 커넥터가 MongoDB에 대한 데이터 쓰기 방법을 구성할 수 있습니다. 쓰기 모델 전략은 쓰기 모델을 사용하여 싱크 커넥터의 데이터 쓰기 방법을 정의하는 클래스입니다. 쓰기 모델은 쓰기 작업의 구조를 정의하는 MongoDB Java 드라이버 인터페이스입니다.
connector 가 수신한 싱크 기록을 MongoDB 에 쓰기 전에 수정하는 방법을 학습 보려면 싱크 connector Connector 프로세서에 대한 가이드 를 읽어보세요.
쓰기 모델 전략 구현을 보려면 InsertOneDefaultStrategy 클래스의 소스 코드를 참조하세요.
대량 쓰기 작업
싱크 커넥터는 대량 쓰기 작업을 통해 MongoDB에 데이터를 작성합니다. 대량 쓰기는 삽입, 업데이트, 삭제 등 여러 쓰기 작업을 함께 그룹화합니다.
싱크 커넥터는 순서가 기본적으로 지정된 대량 쓰기를 수행하여 데이터 변경 순서를 보장합니다. 순서가 지정된 대량 쓰기에서 쓰기 작업에서 오류가 발생하면 커넥터는 해당 배치의 나머지 쓰기를 건너뛰게 됩니다.
데이터 변경 순서를 보장할 필요가 없는 경우 bulk.write.ordered
설정을 false
로 설정하여 커넥터가 순서가 지정되지 않은 일괄 쓰기를 수행하도록 할 수 있습니다. 싱크 커넥터는 정렬되지 않은 대량 쓰기를 병렬로 수행하므로 성능이 향상될 수 있습니다.
또한 정렬되지 않은 대량 쓰기를 활성화하고 errors.tolerance
설정을 all
로 설정하면 커넥터는 대량 쓰기의 쓰기 작업이 실패하더라도 일괄 처리에서 오류를 반환하지 않는 나머지 쓰기 작업을 계속 수행하게 됩니다.
팁
bulk.write.ordered
설정 관련 세부 사항은 커넥터 메시지 처리 속성을 참조하십시오.
대량 쓰기 작업 관련 세부 사항은 다음 문서를 참조하십시오.
쓰기 모델 전략 지정 방법
쓰기 모델 전략을 지정하기 위해 다음 설정을 사용합니다.
writemodel.strategy=<write model strategy classname>
커넥터에 포함된 사전 구축된 쓰기 모델 전략의 목록은 쓰기 모델 전략 구성가이드를 참조하십시오.
비즈니스 키 지정하기
비즈니스 키는 하나 이상의 고유 식별 싱크 기록으로 구성된 값입니다. 싱크 커넥터는 싱크 기록의 _id
필드를 사용하기본적으로 여 비즈니스 키를 조회합니다. 다른 비즈니스 키를 지정하려면 사용자 정의 값을 사용하도록 Document Id Adder를 구성합니다.
다음 속성 예시에 표시된 것처럼 싱크 기록 키에서 _id
필드를 설정하도록 Document Id Adder를 구성할 수 있습니다.
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialKeyStrategy document.id.strategy.partial.key.projection.list=<comma-separated field names> document.id.strategy.partial.key.projection.type=AllowList
또는 다음 속성 예시에 표시된 대로 싱크 기록 값에서 _id
필드를 설정하도록 구성할 수 있습니다.
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy document.id.strategy.partial.value.projection.list=<comma-separated field names> document.id.strategy.partial.value.projection.type=AllowList
중요
쓰기 성능 향상
비즈니스 키의 필드에 해당하는 대상 컬렉션에 고유 인덱스를 생성합니다. 이렇게 하면 싱크 connector 에서 쓰기 작업의 성능이 향상됩니다. 자세한 내용은 고유 인덱스 가이드를 참조하세요.
다음 쓰기 모델 전략에는 비즈니스 키가 필요합니다.
ReplaceOneBusinessKeyStrategy
DeleteOneBusinessKeyStrategy
UpdateOneBusinessKeyTimestampStrategy
문서 ID 추가 포스트 프로세서 관련 세부 사항은 Document Id Adder Post Processor 구성하기를 참조하십시오.
예시
본 절에서는 다음 쓰기 모델 전략의 구성 및 출력 예시를 다룹니다.
하나의 타임스탬프 업데이트 전략
MongoDB에 문서를 작성할 때 타임스탬프를 추가하고 업데이트하도록 하나의 타임스탬프 업데이트 전략을 구성할 수 있습니다. 이 전략은 다음 조치를 수행합니다.
새 MongoDB 문서를 삽입할 때는 커넥터가
_insertedTS
,_modifiedTS
필드를 커넥터 서버의 현재 시간으로 설정합니다.커넥터가 기존 MongoDB 문서를 업데이트하면
_modifiedTS
필드는 커넥터 서버의 현재 시간으로 업데이트됩니다.
경로를 따라 열차의 위치를 추적하려고 하는 상황에서 싱크 커넥터가 다음과 같은 구조의 메시지를 수신한다고 가정해 보겠습니다.
{ "_id": "MN-1234", "start": "Beacon", "destination": "Grand Central" "position": [ 40, -73 ] }
커넥터가 메시지의 _id
값을 사용하여 MongoDB 문서의 _id
필드를 할당하도록 지정하려면 ProvidedInValueStrategy
를 사용하십시오. 아이디를 지정하고 다음과 같이 모델 전략 속성을 작성합니다.
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy
싱크 커넥터는 위의 기록 예시를 처리한 후 다음 문서와 같이 _insertedTS
및 _modifiedTS
필드가 포함된 문서를 삽입합니다.
{ "_id": "MN-1234", "_insertedTS": ISODate("2021-09-20T15:08:000Z"), "_modifiedTS": ISODate("2021-09-20T15:08:000Z"), "start": "Beacon", "destination": "Grand Central" "position": [ 40, -73 ] }
한 시간 후에 열차는 다음 기록에 표시된 대로 경로를 따라 새 위치를 새로운 위치로 보고합니다.
{ "_id": "MN-1234", "start": "Beacon", "destination": "Grand Central" "position": [ 42, -75 ] }
Sink Connector가 이전 기록을 처리하고 나면 다음 데이터가 포함된 문서를 삽입합니다.
{ "_id": "MN-1234", "_insertedTS": ISODate("2021-09-20T15:08:000Z"), "_modifiedTS": ISODate("2021-09-20T16:08:000Z"), "start": "Beacon", "destination": "Grand Central" "position": [ 42, -75 ] }
ProvidedInValueStrategy
관련 정보는 Document Id Adder Post Processor 구성하기를 참조하십시오.
하나의 비즈니스 키 교체 전략
비즈니스 키 값과 일치하는 문서를 교체하도록 하나의 비즈니스 키 교체 전략을 구성할 수 있습니다. 기록의 여러 필드에 비즈니스 키를 정의하고 매칭 비즈니스 키가 포함된 문서를 교체하도록 커넥터를 구성하기 위해 다음 작업을 수행합니다.
컬렉션에서 비즈니스 키 필드에 해당하는 고유 인덱스를 생성합니다.
커넥터 구성에서 비즈니스 키에 속하는 필드를 식별하기 위해
PartialValueStrategy
ID 전략을 지정합니다.커넥터 구성에서
ReplaceOneBusinessKeyStrategy
쓰기 모델 전략을 지정합니다.
flight_no
와 airport_code
로 각각 표시된 항공편 번호와 공항 위치를 기준으로 항공기 용량을 추적한다고 가정해 봅시다. 메시지 예시는 다음 정보를 포함합니다.
{ "flight_no": "Z342", "airport_code": "LAX", "seats": { "capacity": 180, "occupied": 152 } }
flight_no
와 airport_code
를 비즈니스 키로 사용하여 전략을 구현하기 위해서는 우선 MongoDB Shell에서 해당 필드에 고유 인덱스를 생성합니다.
db.collection.createIndex({ "flight_no": 1, "airport_code": 1}, { unique: true })
그 다음 프로젝션 목록에서 PartialValueStrategy
전략 및 비즈니스 키 필드를 지정합니다. 다음과 같이 ID를 지정하고 모델 전략 구성을 작성합니다.
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy document.id.strategy.partial.value.projection.list=flight_no,airport_code document.id.strategy.partial.value.projection.type=AllowList writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy
컬렉션에 삽입된 샘플 데이터 포함 사항:
{ "flight_no": "Z342" "airport_code": "LAX", "seats": { "capacity": 180, "occupied": 152 } }
커넥터가 기존 문서의 비즈니스 키와 일치하는 싱크 데이터를 처리할 때는 비즈니스 키 필드를 변경하지 않고 문서를 새 값으로 교체합니다.
{ "flight_no": "Z342" "airport_code": "LAX", "status": "canceled" }
싱크 데이터를 처리한 후에는 커넥터가 MongoDB의 원본 샘플 문서를 이전 샘플 문서로 교체합니다.
하나의 비즈니스 키 삭제 전략
하나의 비즈니스 키 삭제 전략을 사용하여 비즈니스 키와 일치하는 메시지를 수신할 때 문서를 제거하도록 커넥터를 구성할 수 있습니다. 기록의 여러 필드에서 비즈니스 키를 설정하고 일치하는 비즈니스 키가 포함된 문서를 삭제하도록 커넥터를 구성하려면 다음 작업을 수행합니다.
MongoDB 컬렉션에 대하여 비즈니스 키 필드에 해당하는 고유 인덱스를 생성합니다.
커넥터 구성에서 비즈니스 키에 속하는 필드를 식별하기 위한 식별 전략으로
PartialValueStrategy
를 지정합니다.커넥터 구성에서
DeleteOneBusinessKeyStrategy
쓰기 모델 전략을 지정합니다.
다음과 유사한 문서가 포함된 컬렉션에서 특정 연도의 캘린더 이벤트를 삭제한다고 가정해 보겠습니다.
{ "year": 2005, "month": 3, "day": 15, "event": "Dentist Appointment" }
year
를 비즈니스 키로 사용하여 전략을 구현하기 위해서는 먼저 MongoDB Shell에서 다음 필드에 고유 인덱스를 생성합니다.
db.collection.createIndex({ "year": 1 }, { unique: true })
다음으로 비즈니스 키를 지정하고 다음과 같이 구성에 모델 전략을 작성합니다.
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy document.id.strategy.partial.value.projection.list=year document.id.strategy.partial.value.projection.type=AllowList writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteOneBusinessKeyStrategy
커넥터가 비즈니스 키 year
가 포함된 싱크 기록을 처리하는 경우, MongoDB가 반환한 매칭 필드 값을 가진 첫 번째 문서를 삭제합니다. 커넥터가 다음 값 데이터가 포함된 싱크 기록을 처리한다고 가정해 봅시다.
{ "year": 2005, ... }
커넥터가 이전 기록을 처리할 때 원본 '치과 예약' 샘플 문서처럼 '2005'라는 값을 가진 year
필드를 포함하는 첫 번째 문서를 컬렉션에서 삭제합니다.
Custom Write Model 전략
커넥터에 포함된 쓰기 모델 전략 중 어떤 것도 사용 사례에 맞지 않는 경우 직접 생성할 수 있습니다.
쓰기 모델 전략은 WriteModelStrategy
인터페이스를 구현하고 createWriteModel()
메서드를 재정의해야 하는 Java 클래스입니다.
WriteModelStrategy 인터페이스의 소스 코드 를 참조하세요. 필요한 메서드 서명에 해당합니다.
Sample Write Model 전략
다음 사용자 지정 쓰기 모델 전략은 싱크 기록의 _id
필드와 일치하는 MongoDB 문서를 싱크 기록의 fullDocument
필드 값으로 교체하는 쓰기 작업을 반환합니다.
/** * Custom write model strategy * * This class reads the 'fullDocument' field from a change stream and * returns a ReplaceOne operation. */ public class CustomWriteModelStrategy implements WriteModelStrategy { private static String ID = "_id"; public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) { BsonDocument changeStreamDocument = document.getValueDoc() .orElseThrow(() -> new DataException("Missing value document")); BsonDocument fullDocument = changeStreamDocument.getDocument("fullDocument", new BsonDocument()); if (fullDocument.isEmpty()) { return null; // Return null to indicate no op. } return new ReplaceOneModel<>(Filters.eq(ID, fullDocument.get(ID)), fullDocument); } }
사용자 지정 쓰기 모델 전략의 또 다른 예는 GitHub의 UpsertAsPartOfDocumentStrategy 예시 전략을 참조하십시오.
전략 설치 방법
사용자 지정 쓰기 전략을 사용하도록 싱크 커넥터를 구성하려면 다음 조치를 완료해야 합니다.
사용자 지정 쓰기 전략 클래스를 JAR 파일로 컴파일합니다.
컴파일된 JAR을 Kafka 작업자의 클래스패스/플러그인 패스에 추가하세요. 플러그인 패스에 대한 자세한 내용은 Confluent 설명서를 참조하세요.
참고
Kafka Connect는 플러그인을 개별적으로 로드합니다. 사용자 지정 쓰기 전략을 배포하는 경우 커넥터 JAR과 쓰기 모델 전략 JAR이 모두 동일한 경로에 있어야 합니다. 다음 경로와 유사한 경로를 사용하십시오.
<plugin.path>/mongo-kafka-connect/mongo-kafka-connect-all.jar
<plugin.path>/mongo-kafka-connect/custom-write-model-strategy.jar
Kafka Connect 플러그인에 대해 자세히 알아보려면 Confluent에서 제공하는 이 가이드를 참조하세요.
writemodel.strategy 구성 설정에서 사용자 지정 클래스를 지정합니다.
클래스를 JAR 파일로 컴파일하는 방법은 Java SE 문서의 JAR 배포 가이드를 참조하십시오.