Escrever estratégias de modelo
Nesta página
- Visão geral
- Operações de gravação em massa
- Como especificar estratégias de modelo de escrita
- Especificar uma chave de negócios
- Exemplos
- Atualizar estratégia de carimbos de data/hora
- Substituir uma estratégia-chave de negócio
- Excluir uma estratégia de chave de negócios
- Estratégias de modelos de gravação personalizados
- Exemplo de estratégia de modelo de gravação
- Como instalar sua estratégia
Visão geral
Este guia mostra como alterar a maneira como seu conector de pia MongoDB Kafka grava dados no MongoDB.
Você pode alterar como seu conector grava dados no MongoDB para casos de uso, incluindo o seguinte:
Insira documentos em vez de atualizá-los
Substituir documentos que correspondam a um filtro diferente do campo
_id
Excluir documentos que correspondam a um filtro
Você pode configurar como seu conector grava dados no MongoDB especificando uma estratégia de modelo de gravação. Uma estratégia de modelo de gravação é uma classe que define como o conector do coletor deve gravar dados usando modelos de gravação. Um modelo de escrita é uma interface de driver MongoDB Java que define a estrutura de uma operação de escrita.
Para saber como modificar os registros de sink que seu conector recebe antes de gravá-los no MongoDB, leia o guia sobre Sink Connector Post Processors.
Para ver uma implementação de estratégia de modelo de gravação, consulte o código fonte da classe InsertOneDefaultStrategy.
Operações de gravação em massa
O conector de pia grava dados no MongoDB usando operações de gravação em massa. As gravações em massa agrupam várias operações de gravação, como inserções, atualizações ou exclusões.
Por padrão, o conector de pia executa gravações em massa ordenadas, o que garante a ordem das alterações de dados. Em uma gravação em massa ordenada, se alguma operação de gravação resultar em um erro, o conector pulará as gravações restantes nesse lote.
Se não precisar garantir a ordem das alterações de dados, você poderá definir a configuração bulk.write.ordered
como false
para que o conector execute gravações em massa não ordenadas. O conector coletor executa gravações em massa não ordenadas em paralelo, o que pode melhorar o desempenho.
Além disso, quando você habilita gravações em massa não ordenadas e define a configuração de errors.tolerance
como all
, mesmo que qualquer operação de gravação em sua gravação em massa falhe, o conector continua a executar as operações de gravação restantes no lote que não retornam erros.
Dica
Para saber mais sobre a configuração bulk.write.ordered
, consulte as Propriedades de processamento de mensagens do conector.
Para saber mais sobre operações de escrita em massa, consulte a seguinte documentação:
Como especificar estratégias de modelo de escrita
Para especificar uma estratégia de modelo de gravação, use a seguinte configuração:
writemodel.strategy=<write model strategy classname>
Para obter uma lista das estratégias de modelo de gravação pré-criadas incluídas no conector, consulte o guia sobre configurações de estratégia de modelo de gravação.
Especificar uma chave de negócios
Uma chave de negócios é um valor composto por um ou mais campos em seu registro coletor que o identifica como exclusivo. Por padrão, o conector do coletor usa o campo _id
do registro do coletor para recuperar a chave de negócios. Para especificar uma chave de negócios diferente, configure o pós-processador do adicionador de ID de Documentos para usar um valor personalizado.
É possível configurar o Document Id Adder para definir o campo _id
da chave de registro do coletor, conforme mostrado no exemplo de propriedades a seguir:
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialKeyStrategy document.id.strategy.partial.key.projection.list=<comma-separated field names> document.id.strategy.partial.key.projection.type=AllowList
Como alternativa, é possível configurá-lo para definir o campo _id
a partir do valor do registro do coletor, conforme mostrado no exemplo de propriedades a seguir:
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy document.id.strategy.partial.value.projection.list=<comma-separated field names> document.id.strategy.partial.value.projection.type=AllowList
Importante
Melhore o desempenho de gravação
Crie um índice exclusivo em sua coleção de destino que corresponda aos campos de sua chave comercial. Isso melhora o desempenho das operações de gravação do conector do coletor. Consulte o guia sobre índices exclusivos para obter mais informações.
As seguintes estratégias de modelo de gravação exigem uma chave de negócios:
ReplaceOneBusinessKeyStrategy
DeleteOneBusinessKeyStrategy
UpdateOneBusinessKeyTimestampStrategy
Para obter mais informações sobre o pós-processador Document Id Adder, consulte Configurar o pós-processador Document Id Adder.
Exemplos
Esta seção mostra exemplos de configuração e saída das seguintes estratégias de modelo de gravação:
Atualizar estratégia de carimbos de data/hora
Você pode configurar a estratégia Update One Timestamps para adicionar e atualizar carimbos de data/hora ao gravar documentos no MongoDB. Esta estratégia executa as seguintes ações:
Quando o conector insere um novo documento do MongoDB, ele define os campos
_insertedTS
e_modifiedTS
como a hora atual no servidor do conector.Quando o conector atualiza um documento MongoDB existente, ele atualiza o campo
_modifiedTS
para a hora atual no servidor do conector.
Suponha que você queira rastrear a posição de um trem ao longo de uma rota e seu conector de coletor receba mensagens com a seguinte estrutura:
{ "_id": "MN-1234", "start": "Beacon", "destination": "Grand Central" "position": [ 40, -73 ] }
Use o ProvidedInValueStrategy
para especificar que seu conector deve usar o valor _id
da mensagem para atribuir o campo _id
em seu documento MongoDB. Especifique seu ID e escreva as propriedades da estratégia do modelo da seguinte forma:
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy
Depois que o conector do coletor processa o registro de exemplo anterior, ele insere um documento que contém os campos _insertedTS
e _modifiedTS
, conforme mostrado no documento a seguir:
{ "_id": "MN-1234", "_insertedTS": ISODate("2021-09-20T15:08:000Z"), "_modifiedTS": ISODate("2021-09-20T15:08:000Z"), "start": "Beacon", "destination": "Grand Central" "position": [ 40, -73 ] }
Depois de uma hora, o trem informa sua nova localização ao longo de sua rota com uma nova posição, conforme mostrado no registro a seguir:
{ "_id": "MN-1234", "start": "Beacon", "destination": "Grand Central" "position": [ 42, -75 ] }
Depois que o conector de pia processar o registro anterior, ele insere um documento que contém os seguintes dados:
{ "_id": "MN-1234", "_insertedTS": ISODate("2021-09-20T15:08:000Z"), "_modifiedTS": ISODate("2021-09-20T16:08:000Z"), "start": "Beacon", "destination": "Grand Central" "position": [ 42, -75 ] }
Para obter mais informações sobre ProvidedInValueStrategy
, consulte a seção sobre como configurar o pós-processador do Document Id Adder.
Substituir uma estratégia-chave de negócio
Você pode configurar a estratégia Substituir uma chave de negócio para substituir documentos que correspondam ao valor da chave de negócios. Para definir uma chave comercial em vários campos de um registro e configurar o conector para substituir documentos que contenham chaves comerciais correspondentes, execute as seguintes tarefas:
Crie um índice exclusivo em sua coleção que corresponda aos campos de chave de negócios.
Especifique a estratégia de ID do
PartialValueStrategy
para identificar os campos que pertencem à chave de negócios na configuração do conector.Especifique a estratégia do modelo de gravação
ReplaceOneBusinessKeyStrategy
na configuração do conector.
Suponha que você queira rastrear a capacidade do avião pelo número do voo e pela localização do aeroporto, representados por flight_no
e airport_code
, respectivamente. Uma mensagem de exemplo contém as seguintes informações:
{ "flight_no": "Z342", "airport_code": "LAX", "seats": { "capacity": 180, "occupied": 152 } }
Para implementar a estratégia, usando flight_no
e airport_code
como chave de negócios, primeiro crie um índice exclusivo nesses campos no shell do MongoDB:
db.collection.createIndex({ "flight_no": 1, "airport_code": 1}, { unique: true })
Em seguida, especifique os campos de estratégia e chave de negócios do PartialValueStrategy
na lista de projeção. Especifique a id e escreva a configuração da estratégia do modelo da seguinte maneira:
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy document.id.strategy.partial.value.projection.list=flight_no,airport_code document.id.strategy.partial.value.projection.type=AllowList writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy
Os dados de amostra inseridos na collection contêm o seguinte:
{ "flight_no": "Z342" "airport_code": "LAX", "seats": { "capacity": 180, "occupied": 152 } }
Quando o conector processa dados de coletor que correspondem à chave comercial do documento existente, ele substitui o documento pelos novos valores sem alterar os campos de chave comercial:
{ "flight_no": "Z342" "airport_code": "LAX", "status": "canceled" }
Após o conector processar os dados da pia, ele substitui o documento de amostra original no MongoDB pelo anterior.
Excluir uma estratégia de chave de negócios
Você pode configurar o conector para remover um documento quando ele receber mensagens que correspondam a uma chave de negócio usando a estratégia Excluir uma chave de negócio. Para definir uma chave de negócios a partir de vários campos de um registro e configurar o conector para excluir um documento que contenha uma chave de negócios correspondente, execute as seguintes tarefas:
Crie um índice exclusivo em sua coleção do MongoDB que corresponda aos seus campos de chave de negócios.
Especifique o
PartialValueStrategy
como a estratégia de id para identificar os campos que pertencem à chave de negócios na configuração do conector.Especifique a estratégia do modelo de gravação
DeleteOneBusinessKeyStrategy
na configuração do conector.
Suponha que você precise excluir um evento de calendário de um ano específico de uma coleção que contenha um documento semelhante ao seguinte:
{ "year": 2005, "month": 3, "day": 15, "event": "Dentist Appointment" }
Para implementar a estratégia, usando year
como chave de negócios, primeiro crie um índice exclusivo nesses campos no shell do MongoDB:
db.collection.createIndex({ "year": 1 }, { unique: true })
Em seguida, especifique sua chave de negócios e escreva a estratégia de modelo em sua configuração da seguinte maneira:
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy document.id.strategy.partial.value.projection.list=year document.id.strategy.partial.value.projection.type=AllowList writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteOneBusinessKeyStrategy
Se o seu conector processar um registro de sink que contenha a chave de negócios year
, ele excluirá o primeiro documento com um valor de campo correspondente retornado pelo MongoDB. Suponha que o conector processe um registro de afundamento que contenha os seguintes dados de valor:
{ "year": 2005, ... }
Quando o conector processa o registro anterior, ele exclui o primeiro documento da coleção que contém um campo de year
com um valor de "2005", como o documento de amostra "Dentist Appointment".original.
Estratégias de modelos de gravação personalizados
Se nenhuma das estratégias de modelo de gravação incluídas com o conector se ajustar ao seu caso de uso, você poderá criar suas próprias.
Uma estratégia de modelo de escrita é uma classe Java que implementa a interface WriteModelStrategy
e deve substituir o método createWriteModel()
.
Consulte o código-fonte para a interface WriteModelStrategy para a assinatura de método exigida.
Exemplo de estratégia de modelo de gravação
A estratégia de modelo de gravação personalizada a seguir retorna uma operação de gravação que substitui um documento do MongoDB que corresponde ao campo _id
do registro de coletor pelo valor do campo de fullDocument
do registro de coletor:
/** * Custom write model strategy * * This class reads the 'fullDocument' field from a change stream and * returns a ReplaceOne operation. */ public class CustomWriteModelStrategy implements WriteModelStrategy { private static String ID = "_id"; public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) { BsonDocument changeStreamDocument = document.getValueDoc() .orElseThrow(() -> new DataException("Missing value document")); BsonDocument fullDocument = changeStreamDocument.getDocument("fullDocument", new BsonDocument()); if (fullDocument.isEmpty()) { return null; // Return null to indicate no op. } return new ReplaceOneModel<>(Filters.eq(ID, fullDocument.get(ID)), fullDocument); } }
Para obter outro exemplo de estratégia de modelo de gravação personalizada, consulte a estratégia de exemplo UpsertAsPartOfDocumentStrategy no GitHub.
Como instalar sua estratégia
Para configurar seu conector de pia para usar uma estratégia de escrita personalizada, você deve concluir as seguintes ações:
Compile a classe de estratégia de gravação personalizada a um arquivo JAR.
Adicione o JAR compilado ao caminho de classe/plugin para seus trabalhadores do Kafka. Para obter mais informações sobre caminhos de plugin, consulte a documentação confluente.
Observação
O Kafka Connect carrega plugins isoladamente. Quando você implanta uma estratégia de gravação personalizada, tanto o JAR do conector quanto o JAR da estratégia do modelo de gravação devem estar no mesmo caminho. Seus caminhos devem se assemelhar ao seguinte:
<plugin.path>/mongo-kafka-connect/mongo-kafka-connect-all.jar
<plugin.path>/mongo-kafka-connect/custom-write-model-strategy.jar
Para saber mais sobre plug-ins do Kafka Connect, consulte este guia do Confluent.
Especifique sua classe personalizada na configuração writemodel.strategy.
Para saber como compilar uma classe para um arquivo JAR, consulte o Guia de implantação JAR a partir da documentação Java SE.