Pós-processadores do conector de coletor
Nesta página
- Visão geral
- Como os pós-processadores modificam dados
- Como especificar pós-processadores
- Pós-processadores pré-construídos
- Configurar o pós-processador do adicionador de ID de documento
- Exemplos de pós-processador
- Exemplos de lista de permissões e lista de bloqueios
- Exemplo de projetor de lista de permissões
- Exemplo de projetor de lista de bloqueio
- Exemplos de correspondência de padrões curinga de projeção
- Exemplos de renomeação de campo
- Como criar um pós-processador personalizado
Visão geral
Nesta página, você aprenderá a configurar os pós-processadores em seu conector de sink do MongoDB Kafka. Os pós-processadores modificam os registros do coletor que o conector lê de um tópico do Kafka antes que o conector os armazene na sua coleção do MongoDB. Alguns exemplos de modificações de dados que os pós-processadores podem fazer incluem:
Definir o campo
_id
do documento como um valor personalizadoIncluir ou excluir campos de valor ou chave de mensagem
Renomear campos
Você pode usar os pós-processadores pré-construídos incluídos no conector ou implementar o seu.
Consulte as seções a seguir para obter mais informações sobre pós-processadores:
Como os pós-processadores modificam dados
Os pós-processadores modificam a leitura de dados de um tópico do Kafka. O conector armazena a mensagem em uma classe SinkDocument
que contém uma representação dos campos de valor e chave Kafka SinkRecord
. O conector aplica sequencialmente quaisquer pós-processadores especificados na configuração e armazena o resultado em uma coleção MongoDB.
Os pós-processadores executam tarefas de modificação de dados, como gerar o campo _id
do documento, projetar campos de chave ou valor de mensagem e renomear campos. Você pode usar os pós-processadores pré-definidos incluídos no connector ou implementar os seus próprios ao estender o PostProcessor aula.
Importante
Pós-processadores e manipuladores de Change Data Capture (CDC)
Você não pode aplicar um pós-processador aos dados de evento do manipulador CDC . Se você especificar ambos, o conector registrará um aviso.
Como especificar pós-processadores
Você pode especificar um ou mais pós-processadores na configuração do post.processor.chain
como uma lista separada por vírgula. Se você especificar mais de um, o conector os aplicará sequencialmente no qual cada pós-processador modifica a saída de dados do anterior.
Para garantir que os documentos que o conector grava no MongoDB contenham campos _id
exclusivos, ele adiciona automaticamente o pós-processador DocumentIdAdder
na primeira posição da cadeia, caso você não o inclua de outra forma.
A seguinte configuração de exemplo especifica que o conector deve executar o KafkaMetaAdder
pós-processador primeiro e, em seguida, o AllowListValueProjector
pós-processador na saída.
post.processor.chain=com.mongodb.kafka.connect.sink.processor.KafkaMetaAdder,com.mongodb.kafka.connect.sink.processor.AllowListValueProjector
Pós-processadores pré-construídos
A tabela a seguir contém uma lista de todos os pós-processadores incluídos no conector de pia.
Nome do pós-processador | Descrição | |
---|---|---|
DocumentIdAdder | Full Path:
Inserts an _id field determined by the configured strategy.The default strategy is BsonOidStrategy .For information on strategy options and configuration, see the
Configure the Document Id Adder Post Processor
section. | |
BlockListKeyProjector | Full Path:
Removes matching key fields from the sink record. For more information on configuration, see the
Allow List and Block List Examples. | |
BlockListValueProjector | Full Path:
Removes matching value fields from the sink record. For more information on configuration, see the
Allow List and Block List Examples. | |
AllowListKeyProjector | Full Path:
Includes only matching key fields from the sink record. For more information on configuration, see the
Allow List and Block List Examples. | |
AllowListValueProjector | Full Path:
Includes only matching value fields from the sink record. For more information on configuration, see the
Allow List and Block List Examples. | |
KafkaMetaAdder | Full Path:
Adds a field named "topic-partition-offset" and sets the value
to the concatenation of Kafka topic, partition, and offset to the
document. | |
RenameByMapping | Full Path:
Renames fields that are an exact match to a specified field name in
the key or value document. For information on configuration, see the
Renaming by Mapping Example. | |
RenameByRegex | Full Path:
Renames fields that match a regular expression in the key or
value document. For information on configuration, see the
Renaming by Regular Expression Example. |
Configurar o pós-processador do adicionador de ID de documento
O DocumentIdAdder
pós-processador utiliza uma estratégia para determinar como deve formatar o campo _id
no documento MongoDB. Uma estratégia define o comportamento predefinido que você pode personalizar para seu caso de uso.
Você pode especificar uma estratégia para este pós-processador na configuração document.id.strategy
como mostrado no seguinte exemplo:
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.UuidStrategy
A tabela seguinte mostra uma lista das estratégias que você pode utilizar para configurar o DocumentIdAdder
pós-processador:
Nome da estratégia | Descrição | |
---|---|---|
BsonOidStrategy | Full Path:
Generates a MongoDB BSON ObjectId. Default strategy for the DocumentIdAdder post processor. | |
KafkaMetaDataStrategy | Full Path:
Builds a string composed of the concatenation of Kafka topic,
partition, and offset. | |
FullKeyStrategy | Full Path:
Uses the complete key structure of the sink document to generate the
value for the _id field.Defaults to a blank document if no key exists. | |
ProvidedInKeyStrategy | Full Path:
Uses the _id field specified in the key structure of the sink
document.Throws an exception if the field is missing from the sink document. | |
ProvidedInValueStrategy | Full Path:
Uses the _id field specified in the value structure of the
sink document.Throws an exception if the field is missing from the sink document. | |
PartialKeyStrategy | Full Path:
Uses a block list or allow list projection of the sink document key
structure. Defaults to a blank document if no key exists. | |
PartialValueStrategy | Full Path:
Uses a block list or allow list projection of the sink document
value structure. Defaults to a blank document if no value exists. | |
UuidProvidedInKeyStrategy | Full Path:
Converts the _id key field to a UUID. The value must be either a
string or binary type and must conform to the
UUID format. | |
UuidProvidedInValueStrategy | Full Path:
Converts the _id value field to a UUID. The value must be either a
string or binary type and must conform to the
UUID format. | |
UuidStrategy | Full Path:
Uses a randomly generated UUID in string format. |
Crie uma estratégia de identificação de documento personalizada
Se as estratégias internas de adição de ID de documento não cobrirem seu caso de uso, você poderá definir uma estratégia de ID de documento personalizada seguindo as etapas abaixo:
Crie uma classe Java que implemente a interface IdStrategy e contenha sua lógica de configuração personalizada.
Compile a classe em um arquivo JAR.
Adicione o JAR compilado ao caminho de classe/plugin para todos os seus trabalhadores do Kafka. Para obter mais informações sobre caminhos de plug-in, consulte a documentação do Confluent.
Atualize a configuração
document.id.strategy
para o nome completo da classe da sua classe personalizada em todos os seus trabalhadores do Kafka.
Observação
A estratégia selecionada pode ter implicações na semântica de entrega
As estratégias BSON ObjectId ou UUID só podem garantir a entrega pelo menos uma vez, pois o conector gera novos ids em novas tentativas ou em processamento de registros novamente. Outras estratégias permitem a entrega exatamente uma vez, se você puder garantir que os campos que formam a identificação do documento sejam exclusivos.
Para obter por exemplo, implementações da IdStrategy
interface , consulte o diretório do código-fonte que contém ID implementações de estratégia de empacotado com o connector.
Exemplos de pós-processador
Esta seção mostra exemplos de configuração e saída de amostra dos seguintes tipos de pós-processadores:
Exemplos de lista de permissões e lista de bloqueios
Os pós-processadores do projetor da lista de permissões e da lista de bloqueios determinam quais campos devem ser incluídos e excluídos da saída.
Quando você usa o projetor de lista de permissões, o pós-processador só gera dados dos campos especificados.
Quando você usa o projetor de lista de bloqueios, o pós-processo omite somente os dados dos campos que você especifica.
Observação
Você pode usar o "." notação (de ponto) para fazer referência a campos aninhados no registro. Você também pode usar a notação para fazer referência a campos de documentos em uma array.
Ao adicionar um projetor à cadeia do pós-processador, você deve especificar o tipo de projetor e se ele deve ser aplicado à parte da chave ou do valor do documento coletor.
Consulte as seções a seguir, por exemplo, configurações e saída do projetor.
Exemplo de projetor de lista de permissões
Suponha que seus documentos de valor de registro do Kafka se assemelhassem aos seguintes dados de perfil do usuário:
{ "name": "Sally Kimball", "age": 10, "address": { "city": "Idaville", "country": "USA" }, "hobbies": [ "reading", "solving crime" ] }
Você pode configurar o projetor de valor AllowList
para armazenar dados selecionados como os campos "nome", "address.city" e "hobbies" de seus documentos de valor usando as seguintes configurações:
post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector value.projection.type=AllowList value.projection.list=name,address.city,hobbies
Depois que o pós-processador aplica a projeção, ele gera o seguinte registro:
{ "name": "Sally Kimball", "address": { "city": "Idaville" }, "hobbies": [ "reading", "solving crime" ] }
Exemplo de projetor de lista de bloqueio
Suponha que seus documentos chave de registro do Kafka se assemelhem aos seguintes dados de identificação do usuário:
{ "username": "user5983", "registration": { "date": "2021-09-13", "source": "mobile" }, "authToken": { "alg": "HS256", "type": "JWT", "payload": "zI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODk" } }
Você pode configurar o projetor de chave BlockList
para omitir o "authToken" e "registration.source" campos antes de armazenar os dados com as seguintes configurações:
post.processor.chain=com.mongodb.kafka.connect.sink.processor.BlockListKeyProjector key.projection.type=BlockList key.projection.list=authToken,registration.source
Depois que o pós-processador aplica a projeção, ele gera o seguinte registro:
{ "username": "user5983", "registration": { "date": "2021-09-13", } }
Exemplos de correspondência de padrões curinga de projeção
Esta seção mostra como você pode configurar os pós-processadores do projetor para que correspondam aos padrões curinga e correspondam aos nomes dos campos.
Padrão | Descrição |
* | Corresponde a qualquer número de caracteres no nível atual. |
** | Corresponde a quaisquer caracteres no nível atual e a todos os níveis aninhados. |
Para obter os exemplos de correspondência de padrão curinga da lista de permissões e da lista de bloqueios nesta seção, consulte o seguinte documento de valores que contém medições meteorológicas:
{ "city": "Springfield", "temperature": { "high": 28, "low": 24, "units": "C" }, "wind_speed_10m": { "average": 3, "units": "km/h" }, "wind_speed_80m": { "average": 8, "units": "km/h" }, "soil_conditions": { "temperature": { "high": 22, "low": 17, "units": "C" }, "moisture": { "average": 340, "units": "mm" } } }
Exemplos de curingas da lista de permissões
Você pode usar o curinga *
para corresponder a vários nomes de campos. O exemplo de configuração a seguir corresponde aos seguintes campos:
O campo de nível superior chamado "cidade"
Os campos chamados "média" que são subdocumentos de qualquer campo de nível superior que comece com o nome "wind_speed".
post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector value.projection.type=AllowList value.projection.list=city,wind_speed*.average
Após o pós-processador aplicar a projeção da lista de permissões, ele produz o seguinte registro:
{ "city": "Springfield", "wind_speed_10m": { "average": 3, }, "wind_speed_80m": { "average": 8, } }
Você pode usar o curinga **
que corresponde a objetos em qualquer nível a partir daquele em que você especificar o curinga. O exemplo de correspondência curinga a seguir projeta qualquer documento que contenha o campo chamado "low".
post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector value.projection.type=AllowList value.projection.list=**.low
O pós-processador que aplica os resultados da projeção ao seguinte registro:
{ "temperature": { "high": 28, "low": 24, "units": "C" }, "soil_conditions": { "temperature": { "high": 22, "low": 17, "units": "C" } } }
Exemplo de curinga de lista de bloqueio
Você pode usar os padrões curinga para corresponder campos em um nível de documento específico, conforme mostrado no exemplo de configuração de lista de bloqueios a seguir:
post.processor.chain=com.mongodb.kafka.connect.sink.processor.BlockListValueProjector value.projection.type=BlockList value.projection.list=*.*.temperature
{ "city": "Springfield", "temperature": { "high": 28, "low": 24, "units": "C" }, "wind_speed_10m": { "average": 3, "units": "km/h" }, "wind_speed_80m": { "average": 8, "units": "km/h" }, "soil_conditions": { "moisture": { "average": 340, "units": "mm" } } }
Exemplos de renomeação de campo
Esta seção mostra como configurar os pós-processadores de renomeação de campo RenameByMapping
e RenameByRegex
para atualizar os nomes de campo em um registro de sink. As configurações de renomeação de campo especificam o seguinte:
Se deve atualizar o documento chave ou valor no registro
Os nomes dos campos a serem atualizados
Os novos nomes de campo
Você deve especificar as configurações RenameByMapping
e RenameByRegex
em uma array JSON. Você pode especificar campos aninhados usando notação de ponto ou correspondência de padrão.
Os exemplos de pós-processador de renomeação de campo usam o seguinte exemplo de registro de coletor:
Documento chave
{ "location": "Provence", "date_month": "October", "date_day": 17 }
Documento de valor
{ "flapjacks": { "purchased": 598, "size": "large" } }
Renomear por exemplo de mapeamento
A configuração RenameByMapping
do pós-processador especifica um ou mais objetos JSON que atribuem campos correspondentes a uma string a um novo nome. Cada objeto contém o texto a ser combinado no elemento oldName
e o texto de substituição no elemento newName
, conforme descrito na tabela abaixo.
Nome da chave | Descrição |
---|---|
oldName | Especifica se os campos no documento de chave ou valor devem ser combinados com o nome do campo a ser substituído. A configuração usa um "." caractere para separar os dois valores. |
newName | Especifica o nome do campo de substituição para todas as correspondências do campo. |
A propriedade de exemplo a seguir corresponde ao campo "local" de um documento de chave e o renomeia para "país":
field.renamer.mapping=[{"oldName":"key.location", "newName":"country"}]
Essa configuração instrui o pós-processador RenameByMapping
a transformar o documento de chave original no seguinte documento:
{ "country": "Provence", "date_month": "October", "date_day": 17 }
Você pode executar uma atribuição de nome de campo semelhante para documentos de valor especificando o documento de valor com o nome de campo anexado no campo oldName
como segue:
field.renamer.mapping=[{"oldName":"value.flapjacks", "newName":"crepes"}]
Esta configuração instrui o RenameByMapping
pós-processador a transformar o documento de valor original para o seguinte documento:
{ "crepes": { "purchased": 598, "size": "large" } }
Você também pode especificar um ou mais mapeamentos na propriedade field.renamer.mapping
utilizando uma array JSON no formato de string como mostrado na seguinte configuração:
field.renamer.mapping=[{ "oldName":"key.location", "newName":"city" }, { "oldName":"value.crepes", "newName":"flapjacks" }]
Renomear por Expressão Regular
A configuração RenameByRegex
pós-processador especifica os nomes de campo e os padrões de texto que devem ser correspondidos, e os valores de substituição para o texto correspondido. Você pode especificar uma ou mais expressões de renomeação em objetos JSON contendo os campos descritos na seguinte tabela:
Nome da chave | Descrição |
---|---|
regexp | Contém uma expressão regular que corresponde aos campos para executar a substituição. |
padrão | Contém uma expressão regular que corresponde ao texto a ser substituído. |
replace | Contém o texto de substituição para todas as correspondências da expressão regular definida no campo pattern . |
A configuração de exemplo a seguir instrui o pós-processador a executar o seguinte:
Combine quaisquer nomes de campo no documento-chave que comecem com "data". No conjunto de campos correspondentes, substitua todo o texto que corresponda ao padrão
_
pelo caractere-
.Corresponde a qualquer nome de campo no documento de valor que seja um subdocumento de
crepes
. No conjunto de campos correspondentes, substitua todo o texto que corresponda ao padrãopurchased
porquantity
.
field.renamer.regexp=[{"regexp":"^key\\.date.*$","pattern":"_","replace":"-"}, {"regexp":"^value\\.crepes\\..*","pattern":"purchased","replace":"quantity"}]
Quando o conector aplica o pós-processador ao documento de chave de exemplo e ao documento de valor de exemplo, ele produz o seguinte:
Documento chave
{ "location": "Provence", "date-month": "October", "date-day": 17 }
Documento de valor
{ "crepes": { "quantity": 598, "size": "large" } }
Aviso
Os pós-processadores renomeadores não substituem nomes de campos existentes
Os nomes de campo de destino que você definiu no seu renomeado pós-processadores podem resultar em nomes de campo duplicados no mesmo documento. Para evitar isso, o pós-processador ignora renomear quando duplicaria um nome de campo existente no mesmo nível do documento.
Como criar um pós-processador personalizado
Se os pós-processadores integrados não cobrirem seu caso de uso, você poderá criar uma classe de pós-processador personalizada usando as seguintes etapas:
Crie uma classe Java que estenda a classe abstrata PostProcessor.
Substitua o método
process()
na sua turma. Você pode atualizar oSinkDocument
, uma representação BSON dos campos de chave e valor do registro do coletor e acessar oSinkRecord
Kafka original em seu método.Compile a classe em um arquivo JAR.
Adicione o JAR compilado ao caminho de classe/plugin para todos os seus trabalhadores do Kafka. Para obter mais informações sobre caminhos de plugin, consulte a documentação do Confluent sobre Instalando manualmente os Community Connectors.
Adicione o nome completo da classe do pós-processador à configuração da cadeia do pós-processador.
Por exemplo, pós-processadores, você pode procurar o código-fonte para as classes internas de pós-processador.