Docs Menu
Docs Home
/
MongoDB Kafka Connector
/ /

Sink Connector Post Processors

이 페이지의 내용

  • 개요
  • 포스트 프로세서가 데이터를 수정하는 방법
  • 포스트 프로세서 지정 방법
  • 사전 빌드된 포스트 프로세서
  • 문서 ID 추가 포스트 프로세서 구성
  • 포스트 프로세서 예시
  • 허용 목록 및 차단 목록 예시
  • 허용 목록 프로젝터 예시
  • 차단 목록 프로젝터 예시
  • 프로젝션 와일드카드 패턴 일치 예시
  • 필드 이름 바꾸기 예시
  • 사용자 지정 포스트 프로세서를 만드는 방법

이 페이지에서는 MongoDB Kafka 싱크 커넥터에서 포스트 프로세서를 구성하는 방법에 대해 알아볼 수 있습니다. 포스트 프로세서는 커넥터가 Kafka 주제에서 읽는 싱크 레코드를 MongoDB 컬렉션에 저장하기 전에 수정합니다. 포스트 프로세서가 수행할 수 있는 데이터 수정의 몇 가지 예는 다음과 같습니다.

  • 문서 _id 필드를 사용자 지정 값으로 설정합니다.

  • 메시지 키 또는 값 필드 포함 또는 제외

  • 필드 이름 바꾸기

커넥터에 포함된 사전 빌드된 포스트 프로세서를 사용하거나 자체 포스트 프로세서를 구현할 수 있습니다.

포스트 프로세서에 대한 자세한 내용은 다음 섹션을 참조하세요.

  • 포스트 프로세서가 데이터를 수정하는 방법

  • 포스트 프로세서 지정 방법

  • 사전 빌드된 포스트 프로세서

  • 문서 ID 추가 포스트 프로세서 구성

  • 포스트 프로세서 구성 예시

  • 사용자 지정 포스트 프로세서 만들기

포스트 프로세서는 Kafka 주제에서 읽은 데이터를 수정합니다. 커넥터는 Kafka SinkRecord 키 및 값 필드의 표현을 포함하는 SinkDocument 클래스에 메시지를 저장합니다. 커넥터는 구성에 지정된 포스트 프로세서를 순차적으로 적용하고 결과를 MongoDB 컬렉션에 저장합니다.

포스트 프로세서는 문서 _id 필드 생성, 메시지 키 또는 값 필드 프로젝션, 필드 이름 변경과 같은 데이터 수정 작업을 수행합니다. 에 포함된 사전 빌드된 포스트 프로세서를 사용하거나 connector 포스트 프로세서 { 4} 를 확장하여 직접 구현할 수 있습니다. 클래스.

중요

포스트 프로세서 및 변경 데이터 캡처(CDC) 핸들러

CDC 핸들러 이벤트 데이터에는 게시 프로세서를 적용 할 수 없습니다. 둘 다 지정하면 connector 가 경고를 기록합니다.

post.processor.chain 구성 설정에서 하나 이상의 포스트 프로세서를 쉼표로 구분된 목록으로 지정할 수 있습니다. 둘 이상을 지정하면 커넥터가 이를 순차적으로 적용하여 각 포스트 프로세서가 이전 포스트 프로세서의 데이터 출력을 수정합니다.

커넥터가 MongoDB에 쓰는 문서에 고유한 _id 필드가 포함되도록 하기 위해 별도로 포함하지 않을 경우 DocumentIdAdder 포스트 프로세서를 체인의 첫 번째 위치에 자동으로 추가합니다.

다음 예시 설정은 커넥터가 출력에서 KafkaMetaAdder 포스트 프로세서를 먼저 실행한 다음 AllowListValueProjector 포스트 프로세서를 실행하도록 지정합니다.

post.processor.chain=com.mongodb.kafka.connect.sink.processor.KafkaMetaAdder,com.mongodb.kafka.connect.sink.processor.AllowListValueProjector

다음 표에는 싱크 커넥터에 포함된 모든 포스트 프로세서 목록이 포함되어 있습니다.

포스트 프로세서 이름
설명
DocumentIdAdder
Full Path:
com.mongodb.kafka.connect.sink.processor.DocumentIdAdder
Inserts an _id field determined by the configured strategy.
The default strategy is BsonOidStrategy.
For information on strategy options and configuration, see the Configure the Document Id Adder Post Processor section.
BlockListKeyProjector
Full Path:
com.mongodb.kafka.connect.sink.processor.BlockListKeyProjector
Removes matching key fields from the sink record.
For more information on configuration, see the Allow List and Block List Examples.
BlockListValueProjector
Full Path:
com.mongodb.kafka.connect.sink.processor.BlockListValueProjector
Removes matching value fields from the sink record.
For more information on configuration, see the Allow List and Block List Examples.
AllowListKeyProjector
Full Path:
com.mongodb.kafka.connect.sink.processor.AllowListKeyProjector
Includes only matching key fields from the sink record.
For more information on configuration, see the Allow List and Block List Examples.
AllowListValueProjector
Full Path:
com.mongodb.kafka.connect.sink.processor.AllowListValueProjector``
Includes only matching value fields from the sink record.
For more information on configuration, see the Allow List and Block List Examples.
KafkaMetaAdder
Full Path:
com.mongodb.kafka.connect.sink.processor.KafkaMetaAdder
Adds a field named "topic-partition-offset" and sets the value to the concatenation of Kafka topic, partition, and offset to the document.
RenameByMapping
Full Path:
com.mongodb.kafka.connect.sink.processor.field.renaming.RenameByMapping
Renames fields that are an exact match to a specified field name in the key or value document.
For information on configuration, see the Renaming by Mapping Example.
RenameByRegex
Full Path:
com.mongodb.kafka.connect.sink.processor.field.renaming.RenameByRegex
Renames fields that match a regular expression in the key or value document.
For information on configuration, see the Renaming by Regular Expression Example.

DocumentIdAdder 포스트 프로세서는 전략을 사용하여 MongoDB 문서에서 _id 필드의 서식을 지정하는 방법을 결정합니다. 전략은 사용 사례에 맞게 사용자 지정할 수 있는 사전 설정 동작을 정의합니다.

다음 예와 같이 document.id.strategy 설정에서 이 포스트 프로세서에 대한 전략을 지정할 수 있습니다.

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.UuidStrategy

다음 표는 DocumentIdAdder 포스트 프로세서를 구성하는 데 사용할 수 있는 전략 목록을 보여줍니다.

전략 이름
설명
BsonOidStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy
Generates a MongoDB BSON ObjectId.
Default strategy for the DocumentIdAdder post processor.
KafkaMetaDataStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.KafkaMetaDataStrategy
Builds a string composed of the concatenation of Kafka topic, partition, and offset.
FullKeyStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.FullKeyStrategy
Uses the complete key structure of the sink document to generate the value for the _id field.
Defaults to a blank document if no key exists.
ProvidedInKeyStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInKeyStrategy
Uses the _id field specified in the key structure of the sink document.
Throws an exception if the field is missing from the sink document.
ProvidedInValueStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy
Uses the _id field specified in the value structure of the sink document.
Throws an exception if the field is missing from the sink document.
PartialKeyStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.PartialKeyStrategy
Uses a block list or allow list projection of the sink document key structure.
Defaults to a blank document if no key exists.
PartialValueStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
Uses a block list or allow list projection of the sink document value structure.
Defaults to a blank document if no value exists.
UuidProvidedInKeyStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.UuidInKeyStrategy
Converts the _id key field to a UUID. The value must be either a string or binary type and must conform to the UUID format.
UuidProvidedInValueStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.UuidInValueStrategy
Converts the _id value field to a UUID. The value must be either a string or binary type and must conform to the UUID format.
UuidStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.UuidStrategy``
Uses a randomly generated UUID in string format.

기본 제공 문서 ID 추가 전략이 사용 사례에 맞지 않는 경우, 아래 단계에 따라 사용자 지정 문서 ID 전략을 정의할 수 있습니다.

  1. 인터페이스 Id 전략을 구현하고 사용자 지정 구성 논리를 포함하는 Java 클래스를 만듭니다.

  2. 클래스를 JAR 파일로 컴파일합니다.

  3. 컴파일된 JAR을 모든 Kafka 작업자의 클래스 경로/플러그인 경로에 추가합니다. 플러그인 경로에 대한 자세한 내용은 Confluent 문서를 참조하세요.

  4. document.id.strategy 설정을 모든 Kafka 워커에서 사용자 지정 클래스의 전체 클래스 이름으로 업데이트합니다.

참고

선택한 전략이 전달 시맨틱에 영향을 미칠 수 있습니다.

커넥터는 재시도 시 또는 레코드를 다시 처리할 때 새 ID를 생성하기 때문에 BSON ObjectId 또는 UUID 전략은 최소 한 번만 전송을 보장할 수 있습니다. 다른 전략은 문서 ID를 구성하는 필드가 고유하다는 것을 보장할 수 있는 경우 정확히 한 번만 전송하는 것을 허용합니다.

IdStrategy 인터페이스의 구현 예를 보려면 커넥터와 함께 패키징된 id 전략 구현이 포함된 소스 코드 디렉터리를 참조하세요.

이 섹션에서는 다음 유형의 포스트 프로세서에 대한 구성 및 샘플 출력의 예를 보여줍니다.

허용 목록차단 목록 프로젝터 포스트 프로세서는 출력에 포함할 필드와 제외할 필드를 결정합니다.

허용 목록 프로젝터를 사용하는 경우 포스트 프로세서는 사용자가 지정한 필드의 데이터만 출력합니다.

차단 목록 프로젝터를 사용하는 경우 포스트 프로세서는 지정한 필드의 데이터만 생략합니다.

참고

"." (점) 표기법을 사용하여 레코드에서 중첩된 필드를 참조할 수 있습니다. 이 표기법을 사용하여 배열에 있는 문서의 필드를 참조할 수도 있습니다.

포스트 프로세서 체인에 프로젝터를 추가할 때는 프로젝터 유형과 이를 싱크 문서의 키 또는 값 부분에 적용할지 여부를 지정해야 합니다.

프로젝터 구성 및 출력의 예는 다음 섹션을 참조하세요.

Kafka 레코드 값 문서가 다음과 같은 사용자 프로필 데이터와 유사하다고 가정해 보겠습니다.

{
"name": "Sally Kimball",
"age": 10,
"address": {
"city": "Idaville",
"country": "USA"
},
"hobbies": [
"reading",
"solving crime"
]
}

다음 설정을 사용하여 값 문서에서 "name", "address.city", "hobbies" 필드와 같은 일부 데이터를 저장하도록 AllowList 값 프로젝터를 구성할 수 있습니다.

post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector
value.projection.type=AllowList
value.projection.list=name,address.city,hobbies

포스트 프로세서가 프로젝션을 적용한 후 다음 레코드를 출력합니다.

{
"name": "Sally Kimball",
"address": {
"city": "Idaville"
},
"hobbies": [
"reading",
"solving crime"
]
}

Kafka 레코드 키 문서가 다음 사용자 식별 데이터와 유사하다고 가정합니다.

{
"username": "user5983",
"registration": {
"date": "2021-09-13",
"source": "mobile"
},
"authToken": {
"alg": "HS256",
"type": "JWT",
"payload": "zI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODk"
}
}

다음 설정으로 데이터를 저장하기 전에 "authToken" 및 "registration.source" 필드를 생략하도록 BlockList 키 프로젝터를 구성할 수 있습니다.

post.processor.chain=com.mongodb.kafka.connect.sink.processor.BlockListKeyProjector
key.projection.type=BlockList
key.projection.list=authToken,registration.source

포스트 프로세서가 프로젝션을 적용한 후 다음 레코드를 출력합니다.

{
"username": "user5983",
"registration": {
"date": "2021-09-13",
}
}

이 섹션에서는 와일드카드 패턴을 일치시켜 필드 이름과 일치하도록 프로젝터 포스트 프로세서를 구성하는 방법을 보여줍니다.

패턴
설명
*
현재 레벨에 있는 문자를 원하는 수만큼 일치시킵니다.
**
현재 레벨과 중첩된 모든 레벨에 있는 모든 문자를 일치시킵니다.

이 섹션의 허용 목록 및 차단 목록 와일드카드 패턴 일치 예는 날씨 측정이 포함된 다음 값 문서를 참조하세요.

{
"city": "Springfield",
"temperature": {
"high": 28,
"low": 24,
"units": "C"
},
"wind_speed_10m": {
"average": 3,
"units": "km/h"
},
"wind_speed_80m": {
"average": 8,
"units": "km/h"
},
"soil_conditions": {
"temperature": {
"high": 22,
"low": 17,
"units": "C"
},
"moisture": {
"average": 340,
"units": "mm"
}
}
}

* 와일드카드를 사용하여 여러 필드 이름과 일치시킬 수 있습니다. 다음 예시 구성은 다음 필드와 일치합니다.

  • 이름이 "city"인 최상위 필드

  • 이름이 "average"인 필드는 "wind_speed"라는 이름으로 시작하는 최상위 필드의 하위 문서입니다.

post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector
value.projection.type=AllowList
value.projection.list=city,wind_speed*.average

포스트 프로세서가 허용 목록 프로젝션을 적용한 후 다음 레코드를 출력합니다.

{
"city": "Springfield",
"wind_speed_10m": {
"average": 3,
},
"wind_speed_80m": {
"average": 8,
}
}

와일드카드를 지정한 수준부터 시작하여 모든 수준의 객체와 일치하는 ** 와일드카드를 사용할 수 있습니다. 다음 와일드카드 매칭 예시에서는 이름이 "low"인 필드를 포함하는 모든 문서를 프로젝션합니다.

post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector
value.projection.type=AllowList
value.projection.list=**.low

프로젝션을 적용하는 포스트 프로세서는 다음 레코드를 출력합니다.

{
"temperature": {
"high": 28,
"low": 24,
"units": "C"
},
"soil_conditions": {
"temperature": {
"high": 22,
"low": 17,
"units": "C"
}
}
}

와일드카드 패턴을 사용하여 다음 차단 목록 구성 예와 같이 특정 문서 수준의 필드를 일치시킬 수 있습니다.

post.processor.chain=com.mongodb.kafka.connect.sink.processor.BlockListValueProjector
value.projection.type=BlockList
value.projection.list=*.*.temperature
{
"city": "Springfield",
"temperature": {
"high": 28,
"low": 24,
"units": "C"
},
"wind_speed_10m": {
"average": 3,
"units": "km/h"
},
"wind_speed_80m": {
"average": 8,
"units": "km/h"
},
"soil_conditions": {
"moisture": {
"average": 340,
"units": "mm"
}
}
}

이 섹션에서는 싱크 레코드에서 필드 이름을 업데이트하도록 RenameByMappingRenameByRegex 필드 이름 변경 포스트 프로세서를 구성하는 방법을 보여 줍니다. 필드 이름 변경 설정은 다음을 지정합니다.

  • 레코드의 키 또는 값 문서 업데이트 여부

  • 업데이트할 필드 이름

  • 새 필드 이름

JSON 배열에 RenameByMappingRenameByRegex 설정을 지정해야 합니다. 점 표기법 또는 패턴 일치를 사용하여 중첩된 필드를 지정할 수 있습니다.

필드 이름 변경 포스트 프로세서 예시에서는 다음 예시 싱크 레코드를 사용합니다.

키 문서

{
"location": "Provence",
"date_month": "October",
"date_day": 17
}

값 문서

{
"flapjacks": {
"purchased": 598,
"size": "large"
}
}

RenameByMapping 포스트 프로세서 설정은 문자열과 일치하는 필드를 새 이름에 할당하는 하나 이상의 JSON 객체를 지정합니다. 각 객체에는 아래 표에 설명된 대로 oldName 요소에서 일치시킬 텍스트와 newName 요소의 대체 텍스트가 들어 있습니다.

키 이름
설명
oldName
키 또는 값 문서의 필드와 바꿀 필드 이름을 일치시킬지 여부를 지정합니다. 이 설정은 "." 문자를 사용하여 두 값을 구분합니다.
newName
필드와 일치하는 모든 항목에 대한 대체 필드 이름을 지정합니다.

다음 예시 속성은 키 문서의 "location" 필드와 일치하고 이름을 "country"로 바꿉니다.

field.renamer.mapping=[{"oldName":"key.location", "newName":"country"}]

이 설정은 RenameByMapping 포스트 프로세서가 원본 키 문서를 다음 문서로 변환하도록 지시합니다.

{
"country": "Provence",
"date_month": "October",
"date_day": 17
}

다음과 같이 oldName 필드에 필드 이름이 추가된 값 문서를 지정하여 값 문서에 대해 유사한 필드 이름 할당을 수행할 수 있습니다.

field.renamer.mapping=[{"oldName":"value.flapjacks", "newName":"crepes"}]

이 설정은 RenameByMapping 포스트 프로세서에 원본 값 문서를 다음 문서로 변환하도록 지시합니다.

{
"crepes": {
"purchased": 598,
"size": "large"
}
}

다음 설정과 같이 문자열 형식의 JSON 배열을 사용하여 field.renamer.mapping 속성에 매핑을 하나 이상 지정할 수도 있습니다.

field.renamer.mapping=[{ "oldName":"key.location", "newName":"city" }, { "oldName":"value.crepes", "newName":"flapjacks" }]

RenameByRegex 포스트 프로세서 설정은 일치해야 하는 필드 이름과 텍스트 패턴, 일치하는 텍스트의 대체 값을 지정합니다. 다음 표에 설명된 필드를 포함하는 JSON 객체에서 이름 변경 표현식을 하나 이상 지정할 수 있습니다.

키 이름
설명
regexp
대체를 수행할 필드와 일치하는 정규 표현식을 포함합니다.
패턴
바꿀 텍스트와 일치하는 정규 표현식을 포함합니다.
갈다
pattern 필드에 정의한 정규 표현식과 일치하는 모든 항목에 대한 대체 텍스트를 포함합니다.

다음 예시 설정은 포스트 프로세서에 다음을 수행하도록 지시합니다.

  • "date"로 시작하는 키 문서의 필드 이름을 일치시킵니다. 일치하는 필드 세트에서 _ 패턴과 일치하는 모든 텍스트를 - 문자로 바꿉니다.

  • crepes의 하위 문서인 값 문서의 모든 필드 이름을 매칭합니다. 매칭 필드 세트에서 purchased 패턴과 일치하는 모든 텍스트를 quantity로 교체합니다.

field.renamer.regexp=[{"regexp":"^key\\.date.*$","pattern":"_","replace":"-"}, {"regexp":"^value\\.crepes\\..*","pattern":"purchased","replace":"quantity"}]

커넥터가 키 예시 문서값 예시 문서에 포스트 프로세서를 적용하면 다음과 같이 출력됩니다.

키 문서

{
"location": "Provence",
"date-month": "October",
"date-day": 17
}

값 문서

{
"crepes": {
"quantity": 598,
"size": "large"
}
}

경고

Renamer 포스트 프로세서는 기존 필드 이름을 덮어쓰지 않습니다.

Renamer 포스트 프로세서에서 대상 필드 이름을 설정한 경우 동일한 문서에서 필드 이름이 중복될 수 있습니다. 이를 방지하기 위해 포스트 프로세서는 문서의 동일한 수준에 있는 기존 필드 이름을 복제할 때 이름 변경을 건너뜁니다.

내장 포스트 프로세서가 사용 사례를 다루지 않는 경우 다음 단계를 사용하여 사용자 지정 포스트 프로세서 클래스를 생성할 수 있습니다.

  1. 추상적 PostProcessor 클래스를 확장한 Java 클래스를 생성합니다.

  2. 클래스에서 process() 메서드를 재정의합니다. 싱크 기록 키 및 값 필드의 BSON 표현인 SinkDocument를 업데이트하여 메서드에서 Kafka SinkRecord 원본 파일에 액세스할 수 있습니다.

  3. 클래스를 JAR 파일로 컴파일합니다.

  4. 컴파일된 JAR을 모든 Kafka 작업자의 클래스 경로/플러그인 경로에 추가합니다. 플러그인 경로에 대한 자세한 내용은 커뮤니티 커넥터 수동 설치에 대한 Confluent 문서를 참조하세요.

  5. 포스트 프로세서 체인 구성에 포스트 프로세서 전체 클래스 이름을 추가합니다.

예를 들어 포스트 프로세서 의 경우 내장 포스트 프로세서 클래스의 소스 코드를 검색할 수 있습니다.

돌아가기

모델 전략 작성