오류 처리 및 중단 후 재개 속성
이 페이지의 내용
개요
다음 구성 설정을 사용하여 MongoDB Kafka 소스 커넥터에서 오류가 발생할 때 작동하는 방식을 지정하고 중단된 읽기 재개와 관련된 설정을 지정할 수 있습니다.
설정
이름 | 설명 |
---|---|
mongo.errors.tolerance | Type: string Description: Whether to continue processing messages when the connector encounters
an error. Set this to "none" if you want the connector to stop
processing messages and report the issue if it encounters an
error.Set this to "all" if you want the connector to continue
processing messages and ignore any errors it encounters.IMPORTANT: This property overrides the
errors.tolerance
Connect Framework property. Default: "none" Accepted Values: "none" or "all" |
mongo.errors.log.enable | Type: boolean Description: Whether the connector should report errors in the log file. Set this to true to log all errors the connector encounters.Set this to false to log errors that are not tolerated by the
connector. You can specify which errors the connector should
tolerate using the errors.tolerance or mongo.errors.tolerance
setting.IMPORTANT: This property overrides the
errors.log.enable
Connect Framework property. Default: false Accepted Values: true or false |
mongo.errors.deadletterqueue.topic.name | Type: string Description: The name of topic to use as the dead letter queue. If you specify a value, the connector writes invalid messages to the
dead letter queue topic as extended JSON strings. If you leave this setting blank, the connector does not write
invalid messages to any topic. IMPORTANT: You must set errors.tolerance or mongo.errors.tolerance
setting to "all" to enable this property.Default: "" Accepted Values: A valid Kafka topic name |
offset.partition.name | Type: string Description: The custom offset partition name to use. You can use this option
to instruct the connector to start a new change stream when an
existing offset contains an invalid resume token. If you leave this setting blank, the connector uses the default partition name
based on the connection details. To view a strategy for naming
offset partitions, see Reset Stored Offsets. Default: "" Accepted Values: A string. To learn more about naming a partition,
see
SourceRecord
in the Apache Kafka API documentation. |
heartbeat.interval.ms | Type: long Description: The number of milliseconds the connector waits between sending
heartbeat messages. The connector sends heartbeat messages when
source records are not published in the specified interval. This mechanism improves
resumability of the connector for low volume namespaces. Heartbeat messages contain a postBatchResumeToken data field.
The value of this field contains the MongoDB server oplog entry that
the connector last read from the change stream.Set this to 0 to disable heartbeat messages.To learn more, see Prevention
in the Invalid Resume Token
page. Default: 0 Accepted Values: An integer |
heartbeat.topic.name | Type: string Description: The name of the topic on which the connector should publish
heartbeat messages. You must provide a positive value in the
heartbeat.interval.ms setting to enable this feature.Default: __mongodb_heartbeats Accepted Values: A valid Kafka topic name |
단일 메시지 변환으로 하트비트
Kafka Connect 배포에서 하트비트를 활성화하고 SMT(단일 메시지 변환) 를 지정하는 경우 SMT에서 하트비트 메시지를 제외해야 합니다. SMT는 Atlas Stream Processing 애플리케이션을 배포할 필요 없이 소스 connector를 통과하는 메시지에 대한 변환을 지정할 수 있는 Kafka의 기능입니다.
SMT에서 하트비트 메시지를 제외하려면 조건 자를 생성하여 SMT에 적용해야 합니다. 조건자는 변환을 적용하기 전에 메시지가 조건문과 일치하는지 확인할 수 있는 SMT의 기능입니다.
다음 구성은 기본 하트비트 주제로 전송된 하트비트 메시지와 일치하는 IsHeartbeat
조건자를 정의합니다.
predicates=IsHeartbeat predicates.IsHeartbeat.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches predicates.IsHeartbeat.pattern=__mongodb_heartbeats
다음 구성에서는 앞의 조건자를 사용하여 ExtractField
변환에서 하트비트 메시지를 제외합니다.
transforms=Extract transforms.Extract.type=org.apache.kafka.connect.transforms.ExtractField$Key transforms.Extract.field=<the field to extract from your Apache Kafka key> transforms.Extract.predicate=IsHeartbeat transforms.Extract.negate=true # apply the default key schema as the extract transformation requires a struct object output.format.key=schema
이전 변환에서 하트비트 메시지를 제외하지 않으면 connector가 하트비트 메시지를 처리한 후 다음 오류가 발생합니다.
ERROR WorkerSourceTask{id=mongo-source-0} Task threw an uncaught and unrecoverable exception. Task is being killed ... ... Only Struct objects supported for [field extraction], found: java.lang.String
SMT에 대해 자세히 알아보려면 Kafka Connect에서 단일 메시지 변환을 사용하는 방법을 참조하세요. Confluent에서.
조건자에 대해 자세히 알아보려면 필터(Apache Kafka) 를 참조하세요. Confluent에서.
변환에 학습 보려면 ExtractField
ExtractField 를 참조하세요. Confluent에서.
기본 키 스키마에 대해 자세히 알아보려면 기본 스키마 페이지를 참조하세요.