Menu Docs
Página inicial do Docs
/
MongoDB Kafka Connector
/ /

Tratamento de erros e retomada das propriedades de interrupção

Nesta página

  • Visão geral
  • Configurações
  • Heartbeats com transformações de mensagem única

Use as seguintes definições de configuração para especificar como o conector de origem do MongoDB Kafka se comporta quando encontra erros e para especificar as configurações relacionadas à retomada de leituras interrompidas.

Nome
Descrição
mongo.errors.tolerance
Type: string

Description:
Whether to continue processing messages when the connector encounters an error.

Set this to "none" if you want the connector to stop processing messages and report the issue if it encounters an error.

Set this to "all" if you want the connector to continue processing messages and ignore any errors it encounters.

IMPORTANT: This property overrides the errors.tolerance Connect Framework property.

Default: "none"
Accepted Values: "none" or "all"
mongo.errors.log.enable
Type: boolean

Description:
Whether the connector should report errors in the log file.

Set this to true to log all errors the connector encounters.

Set this to false to log errors that are not tolerated by the connector. You can specify which errors the connector should tolerate using the errors.tolerance or mongo.errors.tolerance setting.

IMPORTANT: This property overrides the errors.log.enable Connect Framework property.

Default: false
Accepted Values: true or false
mongo.errors.deadletterqueue.topic.name
Type: string

Description:
The name of topic to use as the dead letter queue.

If you specify a value, the connector writes invalid messages to the dead letter queue topic as extended JSON strings.

If you leave this setting blank, the connector does not write invalid messages to any topic.
IMPORTANT: You must set errors.tolerance or mongo.errors.tolerance setting to "all" to enable this property.

Default: ""
Accepted Values: A valid Kafka topic name
offset.partition.name
Type: string

Description:
The custom offset partition name to use. You can use this option to instruct the connector to start a new change stream when an existing offset contains an invalid resume token.

If you leave this setting blank, the connector uses the default partition name based on the connection details.

To view a strategy for naming offset partitions, see Reset Stored Offsets.

Default: ""
Accepted Values: A string. To learn more about naming a partition, see SourceRecord in the Apache Kafka API documentation.
heartbeat.interval.ms
Type: long

Description:
The number of milliseconds the connector waits between sending heartbeat messages. The connector sends heartbeat messages when source records are not published in the specified interval. This mechanism improves resumability of the connector for low volume namespaces.

Heartbeat messages contain a postBatchResumeToken data field. The value of this field contains the MongoDB server oplog entry that the connector last read from the change stream.

Set this to 0 to disable heartbeat messages.

To learn more, see Prevention in the Invalid Resume Token page.

Default: 0
Accepted Values: An integer
heartbeat.topic.name
Type: string

Description:
The name of the topic on which the connector should publish heartbeat messages. You must provide a positive value in the heartbeat.interval.ms setting to enable this feature.

Default: __mongodb_heartbeats
Accepted Values: A valid Kafka topic name

Se você habilitar pulsações e especificar Transformações de Mensagem Única (SMTs) em sua implantação do Kafka Connect, deverá excluir suas mensagens de pulsação dos SMTs. Os SMTs são uma funcionalidade do Kafka Connect que permite a você especificar transformações nas mensagens que passam pelo connector de origem sem ter que implantar uma aplicação de Atlas Stream Processing.

Para excluir mensagens de pulsação de seus SMTs, você deve criar e aplicar um predicado aos seus SMTs. Os predicados são um recurso dos SMTs que permite verificar se uma mensagem corresponde a uma declaração condicional antes de aplicar uma transformação.

A seguinte configuração define o predicado IsHeartbeat que corresponde a mensagens de pulsação enviadas para o tópico de pulsação padrão:

predicates=IsHeartbeat
predicates.IsHeartbeat.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.IsHeartbeat.pattern=__mongodb_heartbeats

A seguinte configuração utiliza o predicado anterior para excluir mensagens de pulsação de uma transformação ExtractField :

transforms=Extract
transforms.Extract.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.Extract.field=<the field to extract from your Apache Kafka key>
transforms.Extract.predicate=IsHeartbeat
transforms.Extract.negate=true
# apply the default key schema as the extract transformation requires a struct object
output.format.key=schema

Se você não excluir suas mensagens de pulsação da transformação anterior, o connector gerará o seguinte erro após processar uma mensagem de pulsação:

ERROR WorkerSourceTask{id=mongo-source-0} Task threw an uncaught and unrecoverable exception. Task is being killed ...
...
Only Struct objects supported for [field extraction], found: java.lang.String

Para saber mais sobre SMTs, consulte Como usar transformações de mensagem única no Kafka Connect da Confluent.

Para saber mais sobre predicados, consulte Filtro (Apache Kafka) da Confluent.

Para saber mais sobre a ExtractField transformação do , consulte ExtrairField da Confluent.

Para saber mais sobre o esquema de chave padrão, consulte a página Esquemas Padrão .

Voltar

Inicialização