Menu Docs
Página inicial do Docs
/
MongoDB Kafka Connector
/ /

Pós-processadores do conector de coletor

Nesta página

  • Visão geral
  • Como os pós-processadores modificam dados
  • Como especificar pós-processadores
  • Pós-processadores pré-construídos
  • Configurar o pós-processador do adicionador de ID de documento
  • Exemplos de pós-processador
  • Exemplos de lista de permissões e lista de bloqueios
  • Exemplo de projetor de lista de permissões
  • Exemplo de projetor de lista de bloqueio
  • Exemplos de correspondência de padrões curinga de projeção
  • Exemplos de renomeação de campo
  • Como criar um pós-processador personalizado

Nesta página, você aprenderá a configurar os pós-processadores em seu conector de sink do MongoDB Kafka. Os pós-processadores modificam os registros do coletor que o conector lê de um tópico do Kafka antes que o conector os armazene na sua coleção do MongoDB. Alguns exemplos de modificações de dados que os pós-processadores podem fazer incluem:

  • Definir o campo _id do documento como um valor personalizado

  • Incluir ou excluir campos de valor ou chave de mensagem

  • Renomear campos

Você pode usar os pós-processadores pré-construídos incluídos no conector ou implementar o seu.

Consulte as seções a seguir para obter mais informações sobre pós-processadores:

  • Como os pós-processadores modificam dados

  • Como especificar pós-processadores

  • Pós-processadores pré-construídos

  • Configurar o pós-processador do adicionador de ID de documento

  • Exemplos de configuração de pós-processador

  • Crie um pós-processador personalizado

Os pós-processadores modificam a leitura de dados de um tópico do Kafka. O conector armazena a mensagem em uma classe SinkDocument que contém uma representação dos campos de valor e chave Kafka SinkRecord. O conector aplica sequencialmente quaisquer pós-processadores especificados na configuração e armazena o resultado em uma coleção MongoDB.

Os pós-processadores executam tarefas de modificação de dados, como gerar o campo _id do documento, projetar campos de chave ou valor de mensagem e renomear campos. Você pode usar os pós-processadores pré-definidos incluídos no connector ou implementar os seus próprios ao estender o PostProcessor aula.

Importante

Pós-processadores e manipuladores de Change Data Capture (CDC)

Você não pode aplicar um pós-processador aos dados de evento do manipulador CDC . Se você especificar ambos, o conector registrará um aviso.

Você pode especificar um ou mais pós-processadores na configuração do post.processor.chain como uma lista separada por vírgula. Se você especificar mais de um, o conector os aplicará sequencialmente no qual cada pós-processador modifica a saída de dados do anterior.

Para garantir que os documentos que o conector grava no MongoDB contenham campos _id exclusivos, ele adiciona automaticamente o pós-processador DocumentIdAdder na primeira posição da cadeia, caso você não o inclua de outra forma.

A seguinte configuração de exemplo especifica que o conector deve executar o KafkaMetaAdder pós-processador primeiro e, em seguida, o AllowListValueProjector pós-processador na saída.

post.processor.chain=com.mongodb.kafka.connect.sink.processor.KafkaMetaAdder,com.mongodb.kafka.connect.sink.processor.AllowListValueProjector

A tabela a seguir contém uma lista de todos os pós-processadores incluídos no conector de pia.

Nome do pós-processador
Descrição
DocumentIdAdder
Full Path:
com.mongodb.kafka.connect.sink.processor.DocumentIdAdder
Inserts an _id field determined by the configured strategy.
The default strategy is BsonOidStrategy.
For information on strategy options and configuration, see the Configure the Document Id Adder Post Processor section.
BlockListKeyProjector
Full Path:
com.mongodb.kafka.connect.sink.processor.BlockListKeyProjector
Removes matching key fields from the sink record.
For more information on configuration, see the Allow List and Block List Examples.
BlockListValueProjector
Full Path:
com.mongodb.kafka.connect.sink.processor.BlockListValueProjector
Removes matching value fields from the sink record.
For more information on configuration, see the Allow List and Block List Examples.
AllowListKeyProjector
Full Path:
com.mongodb.kafka.connect.sink.processor.AllowListKeyProjector
Includes only matching key fields from the sink record.
For more information on configuration, see the Allow List and Block List Examples.
AllowListValueProjector
Full Path:
com.mongodb.kafka.connect.sink.processor.AllowListValueProjector``
Includes only matching value fields from the sink record.
For more information on configuration, see the Allow List and Block List Examples.
KafkaMetaAdder
Full Path:
com.mongodb.kafka.connect.sink.processor.KafkaMetaAdder
Adds a field named "topic-partition-offset" and sets the value to the concatenation of Kafka topic, partition, and offset to the document.
RenameByMapping
Full Path:
com.mongodb.kafka.connect.sink.processor.field.renaming.RenameByMapping
Renames fields that are an exact match to a specified field name in the key or value document.
For information on configuration, see the Renaming by Mapping Example.
RenameByRegex
Full Path:
com.mongodb.kafka.connect.sink.processor.field.renaming.RenameByRegex
Renames fields that match a regular expression in the key or value document.
For information on configuration, see the Renaming by Regular Expression Example.

O DocumentIdAdder pós-processador utiliza uma estratégia para determinar como deve formatar o campo _id no documento MongoDB. Uma estratégia define o comportamento predefinido que você pode personalizar para seu caso de uso.

Você pode especificar uma estratégia para este pós-processador na configuração document.id.strategy como mostrado no seguinte exemplo:

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.UuidStrategy

A tabela seguinte mostra uma lista das estratégias que você pode utilizar para configurar o DocumentIdAdder pós-processador:

Nome da estratégia
Descrição
BsonOidStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy
Generates a MongoDB BSON ObjectId.
Default strategy for the DocumentIdAdder post processor.
KafkaMetaDataStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.KafkaMetaDataStrategy
Builds a string composed of the concatenation of Kafka topic, partition, and offset.
FullKeyStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.FullKeyStrategy
Uses the complete key structure of the sink document to generate the value for the _id field.
Defaults to a blank document if no key exists.
ProvidedInKeyStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInKeyStrategy
Uses the _id field specified in the key structure of the sink document.
Throws an exception if the field is missing from the sink document.
ProvidedInValueStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy
Uses the _id field specified in the value structure of the sink document.
Throws an exception if the field is missing from the sink document.
PartialKeyStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.PartialKeyStrategy
Uses a block list or allow list projection of the sink document key structure.
Defaults to a blank document if no key exists.
PartialValueStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
Uses a block list or allow list projection of the sink document value structure.
Defaults to a blank document if no value exists.
UuidProvidedInKeyStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.UuidInKeyStrategy
Converts the _id key field to a UUID. The value must be either a string or binary type and must conform to the UUID format.
UuidProvidedInValueStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.UuidInValueStrategy
Converts the _id value field to a UUID. The value must be either a string or binary type and must conform to the UUID format.
UuidStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.UuidStrategy``
Uses a randomly generated UUID in string format.

Se as estratégias internas de adição de ID de documento não cobrirem seu caso de uso, você poderá definir uma estratégia de ID de documento personalizada seguindo as etapas abaixo:

  1. Crie uma classe Java que implemente a interface IdStrategy e contenha sua lógica de configuração personalizada.

  2. Compile a classe em um arquivo JAR.

  3. Adicione o JAR compilado ao caminho de classe/plugin para todos os seus trabalhadores do Kafka. Para obter mais informações sobre caminhos de plug-in, consulte a documentação do Confluent.

  4. Atualize a configuração document.id.strategy para o nome completo da classe da sua classe personalizada em todos os seus trabalhadores do Kafka.

Observação

A estratégia selecionada pode ter implicações na semântica de entrega

As estratégias BSON ObjectId ou UUID só podem garantir a entrega pelo menos uma vez, pois o conector gera novos ids em novas tentativas ou em processamento de registros novamente. Outras estratégias permitem a entrega exatamente uma vez, se você puder garantir que os campos que formam a identificação do documento sejam exclusivos.

Para obter por exemplo, implementações da IdStrategy interface , consulte o diretório do código-fonte que contém ID implementações de estratégia de empacotado com o connector.

Esta seção mostra exemplos de configuração e saída de amostra dos seguintes tipos de pós-processadores:

Os pós-processadores do projetor da lista de permissões e da lista de bloqueios determinam quais campos devem ser incluídos e excluídos da saída.

Quando você usa o projetor de lista de permissões, o pós-processador só gera dados dos campos especificados.

Quando você usa o projetor de lista de bloqueios, o pós-processo omite somente os dados dos campos que você especifica.

Observação

Você pode usar o "." notação (de ponto) para fazer referência a campos aninhados no registro. Você também pode usar a notação para fazer referência a campos de documentos em uma array.

Ao adicionar um projetor à cadeia do pós-processador, você deve especificar o tipo de projetor e se ele deve ser aplicado à parte da chave ou do valor do documento coletor.

Consulte as seções a seguir, por exemplo, configurações e saída do projetor.

Suponha que seus documentos de valor de registro do Kafka se assemelhassem aos seguintes dados de perfil do usuário:

{
"name": "Sally Kimball",
"age": 10,
"address": {
"city": "Idaville",
"country": "USA"
},
"hobbies": [
"reading",
"solving crime"
]
}

Você pode configurar o projetor de valor AllowList para armazenar dados selecionados como os campos "nome", "address.city" e "hobbies" de seus documentos de valor usando as seguintes configurações:

post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector
value.projection.type=AllowList
value.projection.list=name,address.city,hobbies

Depois que o pós-processador aplica a projeção, ele gera o seguinte registro:

{
"name": "Sally Kimball",
"address": {
"city": "Idaville"
},
"hobbies": [
"reading",
"solving crime"
]
}

Suponha que seus documentos chave de registro do Kafka se assemelhem aos seguintes dados de identificação do usuário:

{
"username": "user5983",
"registration": {
"date": "2021-09-13",
"source": "mobile"
},
"authToken": {
"alg": "HS256",
"type": "JWT",
"payload": "zI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODk"
}
}

Você pode configurar o projetor de chave BlockList para omitir o "authToken" e "registration.source" campos antes de armazenar os dados com as seguintes configurações:

post.processor.chain=com.mongodb.kafka.connect.sink.processor.BlockListKeyProjector
key.projection.type=BlockList
key.projection.list=authToken,registration.source

Depois que o pós-processador aplica a projeção, ele gera o seguinte registro:

{
"username": "user5983",
"registration": {
"date": "2021-09-13",
}
}

Esta seção mostra como você pode configurar os pós-processadores do projetor para que correspondam aos padrões curinga e correspondam aos nomes dos campos.

Padrão
Descrição
*
Corresponde a qualquer número de caracteres no nível atual.
**
Corresponde a quaisquer caracteres no nível atual e a todos os níveis aninhados.

Para obter os exemplos de correspondência de padrão curinga da lista de permissões e da lista de bloqueios nesta seção, consulte o seguinte documento de valores que contém medições meteorológicas:

{
"city": "Springfield",
"temperature": {
"high": 28,
"low": 24,
"units": "C"
},
"wind_speed_10m": {
"average": 3,
"units": "km/h"
},
"wind_speed_80m": {
"average": 8,
"units": "km/h"
},
"soil_conditions": {
"temperature": {
"high": 22,
"low": 17,
"units": "C"
},
"moisture": {
"average": 340,
"units": "mm"
}
}
}

Você pode usar o curinga * para corresponder a vários nomes de campos. O exemplo de configuração a seguir corresponde aos seguintes campos:

  • O campo de nível superior chamado "cidade"

  • Os campos chamados "média" que são subdocumentos de qualquer campo de nível superior que comece com o nome "wind_speed".

post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector
value.projection.type=AllowList
value.projection.list=city,wind_speed*.average

Após o pós-processador aplicar a projeção da lista de permissões, ele produz o seguinte registro:

{
"city": "Springfield",
"wind_speed_10m": {
"average": 3,
},
"wind_speed_80m": {
"average": 8,
}
}

Você pode usar o curinga ** que corresponde a objetos em qualquer nível a partir daquele em que você especificar o curinga. O exemplo de correspondência curinga a seguir projeta qualquer documento que contenha o campo chamado "low".

post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector
value.projection.type=AllowList
value.projection.list=**.low

O pós-processador que aplica os resultados da projeção ao seguinte registro:

{
"temperature": {
"high": 28,
"low": 24,
"units": "C"
},
"soil_conditions": {
"temperature": {
"high": 22,
"low": 17,
"units": "C"
}
}
}

Você pode usar os padrões curinga para corresponder campos em um nível de documento específico, conforme mostrado no exemplo de configuração de lista de bloqueios a seguir:

post.processor.chain=com.mongodb.kafka.connect.sink.processor.BlockListValueProjector
value.projection.type=BlockList
value.projection.list=*.*.temperature
{
"city": "Springfield",
"temperature": {
"high": 28,
"low": 24,
"units": "C"
},
"wind_speed_10m": {
"average": 3,
"units": "km/h"
},
"wind_speed_80m": {
"average": 8,
"units": "km/h"
},
"soil_conditions": {
"moisture": {
"average": 340,
"units": "mm"
}
}
}

Esta seção mostra como configurar os pós-processadores de renomeação de campo RenameByMapping e RenameByRegex para atualizar os nomes de campo em um registro de sink. As configurações de renomeação de campo especificam o seguinte:

  • Se deve atualizar o documento chave ou valor no registro

  • Os nomes dos campos a serem atualizados

  • Os novos nomes de campo

Você deve especificar as configurações RenameByMapping e RenameByRegex em uma array JSON. Você pode especificar campos aninhados usando notação de ponto ou correspondência de padrão.

Os exemplos de pós-processador de renomeação de campo usam o seguinte exemplo de registro de coletor:

Documento chave

{
"location": "Provence",
"date_month": "October",
"date_day": 17
}

Documento de valor

{
"flapjacks": {
"purchased": 598,
"size": "large"
}
}

A configuração RenameByMapping do pós-processador especifica um ou mais objetos JSON que atribuem campos correspondentes a uma string a um novo nome. Cada objeto contém o texto a ser combinado no elemento oldName e o texto de substituição no elemento newName, conforme descrito na tabela abaixo.

Nome da chave
Descrição
oldName
Especifica se os campos no documento de chave ou valor devem ser combinados com o nome do campo a ser substituído. A configuração usa um "." caractere para separar os dois valores.
newName
Especifica o nome do campo de substituição para todas as correspondências do campo.

A propriedade de exemplo a seguir corresponde ao campo "local" de um documento de chave e o renomeia para "país":

field.renamer.mapping=[{"oldName":"key.location", "newName":"country"}]

Essa configuração instrui o pós-processador RenameByMapping a transformar o documento de chave original no seguinte documento:

{
"country": "Provence",
"date_month": "October",
"date_day": 17
}

Você pode executar uma atribuição de nome de campo semelhante para documentos de valor especificando o documento de valor com o nome de campo anexado no campo oldName como segue:

field.renamer.mapping=[{"oldName":"value.flapjacks", "newName":"crepes"}]

Esta configuração instrui o RenameByMapping pós-processador a transformar o documento de valor original para o seguinte documento:

{
"crepes": {
"purchased": 598,
"size": "large"
}
}

Você também pode especificar um ou mais mapeamentos na propriedade field.renamer.mapping utilizando uma array JSON no formato de string como mostrado na seguinte configuração:

field.renamer.mapping=[{ "oldName":"key.location", "newName":"city" }, { "oldName":"value.crepes", "newName":"flapjacks" }]

A configuração RenameByRegex pós-processador especifica os nomes de campo e os padrões de texto que devem ser correspondidos, e os valores de substituição para o texto correspondido. Você pode especificar uma ou mais expressões de renomeação em objetos JSON contendo os campos descritos na seguinte tabela:

Nome da chave
Descrição
regexp
Contém uma expressão regular que corresponde aos campos para executar a substituição.
padrão
Contém uma expressão regular que corresponde ao texto a ser substituído.
replace
Contém o texto de substituição para todas as correspondências da expressão regular definida no campo pattern.

A configuração de exemplo a seguir instrui o pós-processador a executar o seguinte:

  • Combine quaisquer nomes de campo no documento-chave que comecem com "data". No conjunto de campos correspondentes, substitua todo o texto que corresponda ao padrão _ pelo caractere -.

  • Corresponde a qualquer nome de campo no documento de valor que seja um subdocumento de crepes. No conjunto de campos correspondentes, substitua todo o texto que corresponda ao padrão purchased por quantity.

field.renamer.regexp=[{"regexp":"^key\\.date.*$","pattern":"_","replace":"-"}, {"regexp":"^value\\.crepes\\..*","pattern":"purchased","replace":"quantity"}]

Quando o conector aplica o pós-processador ao documento de chave de exemplo e ao documento de valor de exemplo, ele produz o seguinte:

Documento chave

{
"location": "Provence",
"date-month": "October",
"date-day": 17
}

Documento de valor

{
"crepes": {
"quantity": 598,
"size": "large"
}
}

Aviso

Os pós-processadores renomeadores não substituem nomes de campos existentes

Os nomes de campo de destino que você definiu no seu renomeado pós-processadores podem resultar em nomes de campo duplicados no mesmo documento. Para evitar isso, o pós-processador ignora renomear quando duplicaria um nome de campo existente no mesmo nível do documento.

Se os pós-processadores integrados não cobrirem seu caso de uso, você poderá criar uma classe de pós-processador personalizada usando as seguintes etapas:

  1. Crie uma classe Java que estenda a classe abstrata PostProcessor.

  2. Substitua o método process() na sua turma. Você pode atualizar o SinkDocument, uma representação BSON dos campos de chave e valor do registro do coletor e acessar o SinkRecord Kafka original em seu método.

  3. Compile a classe em um arquivo JAR.

  4. Adicione o JAR compilado ao caminho de classe/plugin para todos os seus trabalhadores do Kafka. Para obter mais informações sobre caminhos de plugin, consulte a documentação do Confluent sobre Instalando manualmente os Community Connectors.

  5. Adicione o nome completo da classe do pós-processador à configuração da cadeia do pós-processador.

Por exemplo, pós-processadores, você pode procurar o código-fonte para as classes internas de pós-processador.

Voltar

Escrever estratégias de modelo