Docs 菜单
Docs 主页
/
MongoDB Kafka Connector
/ /

错误处理和从中断属性恢复

在此页面上

  • Overview
  • 设置
  • 使用单个消息转换的心跳

使用以下配置设置指定 MongoDB Kafka Source 连接器在遇到错误时的行为方式,并指定与恢复中断读取相关的设置。

名称
说明
mongo.errors.tolerance
Type: string

Description:
Whether to continue processing messages when the connector encounters an error.

Set this to "none" if you want the connector to stop processing messages and report the issue if it encounters an error.

Set this to "all" if you want the connector to continue processing messages and ignore any errors it encounters.

IMPORTANT: This property overrides the errors.tolerance Connect Framework property.

Default: "none"
Accepted Values: "none" or "all"
mongo.errors.log.enable
Type: boolean

Description:
Whether the connector should report errors in the log file.

Set this to true to log all errors the connector encounters.

Set this to false to log errors that are not tolerated by the connector. You can specify which errors the connector should tolerate using the errors.tolerance or mongo.errors.tolerance setting.

IMPORTANT: This property overrides the errors.log.enable Connect Framework property.

Default: false
Accepted Values: true or false
mongo.errors.deadletterqueue.topic.name
Type: string

Description:
The name of topic to use as the dead letter queue.

If you specify a value, the connector writes invalid messages to the dead letter queue topic as extended JSON strings.

If you leave this setting blank, the connector does not write invalid messages to any topic.
IMPORTANT: You must set errors.tolerance or mongo.errors.tolerance setting to "all" to enable this property.

Default: ""
Accepted Values: A valid Kafka topic name
offset.partition.name
Type: string

Description:
The custom offset partition name to use. You can use this option to instruct the connector to start a new change stream when an existing offset contains an invalid resume token.

If you leave this setting blank, the connector uses the default partition name based on the connection details.

To view a strategy for naming offset partitions, see Reset Stored Offsets.

Default: ""
Accepted Values: A string. To learn more about naming a partition, see SourceRecord in the Apache Kafka API documentation.
heartbeat.interval.ms
Type: long

Description:
The number of milliseconds the connector waits between sending heartbeat messages. The connector sends heartbeat messages when source records are not published in the specified interval. This mechanism improves resumability of the connector for low volume namespaces.

Heartbeat messages contain a postBatchResumeToken data field. The value of this field contains the MongoDB server oplog entry that the connector last read from the change stream.

Set this to 0 to disable heartbeat messages.

To learn more, see Prevention in the Invalid Resume Token page.

Default: 0
Accepted Values: An integer
heartbeat.topic.name
Type: string

Description:
The name of the topic on which the connector should publish heartbeat messages. You must provide a positive value in the heartbeat.interval.ms setting to enable this feature.

Default: __mongodb_heartbeats
Accepted Values: A valid Kafka topic name

如果您在 Kafka Connect 部署中启用心跳并指定单一消息转换 (SMT) ,则必须从 SMT 中排除心跳消息。 SMT 是 Kafka 的一项功能,可让您对通过源 connector 传递的消息指定转换,而无需部署 Atlas Stream Processing 应用程序。

要从 SMT 中排除心跳消息,您必须创建谓词并将其应用于 SMT。 谓词是 SMT 的一项功能,可让您在应用转换之前检查消息是否与条件语句匹配。

以下配置定义了IsHeartbeat谓词,用于匹配发送到默认心跳主题的心跳消息:

predicates=IsHeartbeat
predicates.IsHeartbeat.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.IsHeartbeat.pattern=__mongodb_heartbeats

以下配置使用前面的谓词从ExtractField转换中排除心跳消息:

transforms=Extract
transforms.Extract.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.Extract.field=<the field to extract from your Apache Kafka key>
transforms.Extract.predicate=IsHeartbeat
transforms.Extract.negate=true
# apply the default key schema as the extract transformation requires a struct object
output.format.key=schema

如果不从前面的转换中排除心跳消息,则connector在处理心跳消息后会引发以下错误:

ERROR WorkerSourceTask{id=mongo-source-0} Task threw an uncaught and unrecoverable exception. Task is being killed ...
...
Only Struct objects supported for [field extraction], found: java.lang.String

要了解有关 SMT 的更多信息,请参阅 如何在 Kafka Connect 中使用单个消息转换 来自 Confluent。

要了解有关谓词的更多信息,请参阅 筛选器 (Apache Kafka) 来自 Confluent。

要学习;了解有关ExtractField 转换的更多信息,请参阅 ExtractField 来自 Confluent。

要了解有关默认密钥模式的更多信息,请参阅默认模式页面。

后退

初创企业