Docs Menu
Docs Home
/
MongoDB Kafka Connector
/ /

오류 처리 및 중단 후 재개 속성

이 페이지의 내용

  • 개요
  • 설정
  • 단일 메시지 변환으로 하트비트

다음 구성 설정을 사용하여 MongoDB Kafka 소스 커넥터에서 오류가 발생할 때 작동하는 방식을 지정하고 중단된 읽기 재개와 관련된 설정을 지정할 수 있습니다.

이름
설명
mongo.errors.tolerance
Type: string

Description:
Whether to continue processing messages when the connector encounters an error.

Set this to "none" if you want the connector to stop processing messages and report the issue if it encounters an error.

Set this to "all" if you want the connector to continue processing messages and ignore any errors it encounters.

IMPORTANT: This property overrides the errors.tolerance Connect Framework property.

Default: "none"
Accepted Values: "none" or "all"
mongo.errors.log.enable
Type: boolean

Description:
Whether the connector should report errors in the log file.

Set this to true to log all errors the connector encounters.

Set this to false to log errors that are not tolerated by the connector. You can specify which errors the connector should tolerate using the errors.tolerance or mongo.errors.tolerance setting.

IMPORTANT: This property overrides the errors.log.enable Connect Framework property.

Default: false
Accepted Values: true or false
mongo.errors.deadletterqueue.topic.name
Type: string

Description:
The name of topic to use as the dead letter queue.

If you specify a value, the connector writes invalid messages to the dead letter queue topic as extended JSON strings.

If you leave this setting blank, the connector does not write invalid messages to any topic.
IMPORTANT: You must set errors.tolerance or mongo.errors.tolerance setting to "all" to enable this property.

Default: ""
Accepted Values: A valid Kafka topic name
offset.partition.name
Type: string

Description:
The custom offset partition name to use. You can use this option to instruct the connector to start a new change stream when an existing offset contains an invalid resume token.

If you leave this setting blank, the connector uses the default partition name based on the connection details.

To view a strategy for naming offset partitions, see Reset Stored Offsets.

Default: ""
Accepted Values: A string. To learn more about naming a partition, see SourceRecord in the Apache Kafka API documentation.
heartbeat.interval.ms
Type: long

Description:
The number of milliseconds the connector waits between sending heartbeat messages. The connector sends heartbeat messages when source records are not published in the specified interval. This mechanism improves resumability of the connector for low volume namespaces.

Heartbeat messages contain a postBatchResumeToken data field. The value of this field contains the MongoDB server oplog entry that the connector last read from the change stream.

Set this to 0 to disable heartbeat messages.

To learn more, see Prevention in the Invalid Resume Token page.

Default: 0
Accepted Values: An integer
heartbeat.topic.name
Type: string

Description:
The name of the topic on which the connector should publish heartbeat messages. You must provide a positive value in the heartbeat.interval.ms setting to enable this feature.

Default: __mongodb_heartbeats
Accepted Values: A valid Kafka topic name

Kafka Connect 배포에서 하트비트를 활성화하고 SMT(단일 메시지 변환) 를 지정하는 경우 SMT에서 하트비트 메시지를 제외해야 합니다. SMT는 Atlas Stream Processing 애플리케이션을 배포할 필요 없이 소스 connector를 통과하는 메시지에 대한 변환을 지정할 수 있는 Kafka의 기능입니다.

SMT에서 하트비트 메시지를 제외하려면 조건 자를 생성하여 SMT에 적용해야 합니다. 조건자는 변환을 적용하기 전에 메시지가 조건문과 일치하는지 확인할 수 있는 SMT의 기능입니다.

다음 구성은 기본 하트비트 주제로 전송된 하트비트 메시지와 일치하는 IsHeartbeat 조건자를 정의합니다.

predicates=IsHeartbeat
predicates.IsHeartbeat.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.IsHeartbeat.pattern=__mongodb_heartbeats

다음 구성에서는 앞의 조건자를 사용하여 ExtractField 변환에서 하트비트 메시지를 제외합니다.

transforms=Extract
transforms.Extract.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.Extract.field=<the field to extract from your Apache Kafka key>
transforms.Extract.predicate=IsHeartbeat
transforms.Extract.negate=true
# apply the default key schema as the extract transformation requires a struct object
output.format.key=schema

이전 변환에서 하트비트 메시지를 제외하지 않으면 connector가 하트비트 메시지를 처리한 후 다음 오류가 발생합니다.

ERROR WorkerSourceTask{id=mongo-source-0} Task threw an uncaught and unrecoverable exception. Task is being killed ...
...
Only Struct objects supported for [field extraction], found: java.lang.String

SMT에 대해 자세히 알아보려면 Kafka Connect에서 단일 메시지 변환을 사용하는 방법을 참조하세요. Confluent에서.

조건자에 대해 자세히 알아보려면 필터(Apache Kafka) 를 참조하세요. Confluent에서.

변환에 학습 보려면 ExtractField ExtractField 를 참조하세요. Confluent에서.

기본 키 스키마에 대해 자세히 알아보려면 기본 스키마 페이지를 참조하세요.

돌아가기

시작