Menu Docs
Página inicial do Docs
/
MongoDB Kafka Connector
/ /

Escrever estratégias de modelo

Nesta página

  • Visão geral
  • Operações de gravação em massa
  • Como especificar estratégias de modelo de escrita
  • Especificar uma chave de negócios
  • Exemplos
  • Atualizar estratégia de carimbos de data/hora
  • Substituir uma estratégia-chave de negócio
  • Excluir uma estratégia de chave de negócios
  • Estratégias de modelos de gravação personalizados
  • Exemplo de estratégia de modelo de gravação
  • Como instalar sua estratégia

Este guia mostra como alterar a maneira como seu conector de pia MongoDB Kafka grava dados no MongoDB.

Você pode alterar como seu conector grava dados no MongoDB para casos de uso, incluindo o seguinte:

  • Insira documentos em vez de atualizá-los

  • Substituir ou atualizar documentos que correspondam a um filtro diferente do campo _id

  • Excluir documentos que correspondam a um filtro

Você pode configurar como seu conector grava dados no MongoDB especificando uma estratégia de modelo de gravação. Uma estratégia de modelo de gravação é uma classe que define como o conector do coletor deve gravar dados usando modelos de gravação. Um modelo de escrita é uma interface de driver MongoDB Java que define a estrutura de uma operação de escrita.

Para saber como modificar os registros de sink que seu conector recebe antes de gravá-los no MongoDB, leia o guia sobre Sink Connector Post Processors.

Para ver uma implementação de estratégia de modelo de gravação, consulte o código fonte da classe InsertOneDefaultStrategy.

O conector de pia grava dados no MongoDB usando operações de gravação em massa. As gravações em massa agrupam várias operações de gravação, como inserções, atualizações ou exclusões.

Por padrão, o conector de pia executa gravações em massa ordenadas, o que garante a ordem das alterações de dados. Em uma gravação em massa ordenada, se alguma operação de gravação resultar em um erro, o conector pulará as gravações restantes nesse lote.

Se não precisar garantir a ordem das alterações de dados, você poderá definir a configuração bulk.write.ordered como false para que o conector execute gravações em massa não ordenadas. O conector coletor executa gravações em massa não ordenadas em paralelo, o que pode melhorar o desempenho.

Além disso, quando você habilita gravações em massa não ordenadas e define a configuração de errors.tolerance como all, mesmo que qualquer operação de gravação em sua gravação em massa falhe, o conector continua a executar as operações de gravação restantes no lote que não retornam erros.

Dica

Para saber mais sobre a configuração bulk.write.ordered, consulte as Propriedades de processamento de mensagens do conector.

Para saber mais sobre operações de escrita em massa, consulte a seguinte documentação:

Para especificar uma estratégia de modelo de gravação, use a seguinte configuração:

writemodel.strategy=<write model strategy classname>

Para obter uma lista das estratégias de modelo de gravação pré-criadas incluídas no conector, consulte o guia sobre configurações de estratégia de modelo de gravação.

Uma chave de negócios é um valor composto por um ou mais campos em seu registro coletor que o identifica como exclusivo. Por padrão, o conector do coletor usa o campo _id do registro do coletor para recuperar a chave de negócios. Para especificar uma chave de negócios diferente, configure o pós-processador do adicionador de ID de Documentos para usar um valor personalizado.

É possível configurar o Document Id Adder para definir o campo _id da chave de registro do coletor, conforme mostrado no exemplo de propriedades a seguir:

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialKeyStrategy
document.id.strategy.partial.key.projection.list=<comma-separated field names>
document.id.strategy.partial.key.projection.type=AllowList

Como alternativa, é possível configurá-lo para definir o campo _id a partir do valor do registro do coletor, conforme mostrado no exemplo de propriedades a seguir:

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
document.id.strategy.partial.value.projection.list=<comma-separated field names>
document.id.strategy.partial.value.projection.type=AllowList

Importante

Melhore o desempenho de gravação

Crie um índice exclusivo em sua coleção de destino que corresponda aos campos de sua chave comercial. Isso melhora o desempenho das operações de gravação do conector do coletor. Consulte o guia sobre índices exclusivos para obter mais informações.

As seguintes estratégias de modelo de gravação exigem uma chave de negócios:

  • ReplaceOneBusinessKeyStrategy

  • DeleteOneBusinessKeyStrategy

  • UpdateOneBusinessKeyTimestampStrategy

Para obter mais informações sobre o pós-processador Document Id Adder, consulte Configurar o pós-processador Document Id Adder.

Esta seção mostra exemplos de configuração e saída das seguintes estratégias de modelo de gravação:

Você pode configurar a estratégia Update One Timestamps para adicionar e atualizar carimbos de data/hora ao gravar documentos no MongoDB. Esta estratégia executa as seguintes ações:

  • Quando o conector insere um novo documento do MongoDB, ele define os campos _insertedTS e _modifiedTS como a hora atual no servidor do conector.

  • Quando o conector atualiza um documento MongoDB existente, ele atualiza o campo _modifiedTS para a hora atual no servidor do conector.

Suponha que você queira rastrear a posição de um trem ao longo de uma rota e seu conector de coletor receba mensagens com a seguinte estrutura:

{
"_id": "MN-1234",
"start": "Beacon",
"destination": "Grand Central"
"position": [ 40, -73 ]
}

Use o ProvidedInValueStrategy para especificar que seu conector deve usar o valor _id da mensagem para atribuir o campo _id em seu documento MongoDB. Especifique seu ID e escreva as propriedades da estratégia do modelo da seguinte forma:

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy

Depois que o conector do coletor processa o registro de exemplo anterior, ele insere um documento que contém os campos _insertedTS e _modifiedTS , conforme mostrado no documento a seguir:

{
"_id": "MN-1234",
"_insertedTS": ISODate("2021-09-20T15:08:000Z"),
"_modifiedTS": ISODate("2021-09-20T15:08:000Z"),
"start": "Beacon",
"destination": "Grand Central"
"position": [ 40, -73 ]
}

Depois de uma hora, o trem informa sua nova localização ao longo de sua rota com uma nova posição, conforme mostrado no registro a seguir:

{
"_id": "MN-1234",
"start": "Beacon",
"destination": "Grand Central"
"position": [ 42, -75 ]
}

Depois que o conector de pia processar o registro anterior, ele insere um documento que contém os seguintes dados:

{
"_id": "MN-1234",
"_insertedTS": ISODate("2021-09-20T15:08:000Z"),
"_modifiedTS": ISODate("2021-09-20T16:08:000Z"),
"start": "Beacon",
"destination": "Grand Central"
"position": [ 42, -75 ]
}

Para obter mais informações sobre ProvidedInValueStrategy, consulte a seção sobre como configurar o pós-processador do Document Id Adder.

Você pode configurar a estratégia Substituir uma chave de negócio para substituir documentos que correspondam ao valor da chave de negócios. Para definir uma chave comercial em vários campos de um registro e configurar o conector para substituir documentos que contenham chaves comerciais correspondentes, execute as seguintes tarefas:

  1. Crie um índice exclusivo em sua coleção que corresponda aos campos de chave de negócios.

  2. Especifique a estratégia de ID do PartialValueStrategy para identificar os campos que pertencem à chave de negócios na configuração do conector.

  3. Especifique a estratégia do modelo de gravação ReplaceOneBusinessKeyStrategy na configuração do conector.

Suponha que você queira rastrear a capacidade do avião pelo número do voo e pela localização do aeroporto, representados por flight_no e airport_code, respectivamente. Uma mensagem de exemplo contém as seguintes informações:

{
"flight_no": "Z342",
"airport_code": "LAX",
"seats": {
"capacity": 180,
"occupied": 152
}
}

Para implementar a estratégia, usando flight_no e airport_code como chave de negócios, primeiro crie um índice exclusivo nesses campos no shell do MongoDB:

db.collection.createIndex({ "flight_no": 1, "airport_code": 1}, { unique: true })

Em seguida, especifique os campos de estratégia e chave de negócios do PartialValueStrategy na lista de projeção. Especifique a id e escreva a configuração da estratégia do modelo da seguinte maneira:

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
document.id.strategy.partial.value.projection.list=flight_no,airport_code
document.id.strategy.partial.value.projection.type=AllowList
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy

Os dados de amostra inseridos na collection contêm o seguinte:

{
"flight_no": "Z342"
"airport_code": "LAX",
"seats": {
"capacity": 180,
"occupied": 152
}
}

Quando o conector processa dados de coletor que correspondem à chave comercial do documento existente, ele substitui o documento pelos novos valores sem alterar os campos de chave comercial:

{
"flight_no": "Z342"
"airport_code": "LAX",
"status": "canceled"
}

Após o conector processar os dados da pia, ele substitui o documento de amostra original no MongoDB pelo anterior.

Você pode configurar o conector para remover um documento quando ele receber mensagens que correspondam a uma chave de negócio usando a estratégia Excluir uma chave de negócio. Para definir uma chave de negócios a partir de vários campos de um registro e configurar o conector para excluir um documento que contenha uma chave de negócios correspondente, execute as seguintes tarefas:

  1. Crie um índice exclusivo em sua coleção do MongoDB que corresponda aos seus campos de chave de negócios.

  2. Especifique o PartialValueStrategy como a estratégia de id para identificar os campos que pertencem à chave de negócios na configuração do conector.

  3. Especifique a estratégia do modelo de gravação DeleteOneBusinessKeyStrategy na configuração do conector.

Suponha que você queira excluir um evento de calendário de um ano específico de uma coleção que contenha um documento semelhante ao seguinte:

{
"year": 2005,
"month": 3,
"day": 15,
"event": "Dentist Appointment"
}

Para implementar a estratégia, usando year como chave de negócios, primeiro crie um índice exclusivo nesses campos no shell do MongoDB:

db.collection.createIndex({ "year": 1 }, { unique: true })

Em seguida, especifique sua chave de negócios e escreva a estratégia de modelo em sua configuração da seguinte maneira:

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
document.id.strategy.partial.value.projection.list=year
document.id.strategy.partial.value.projection.type=AllowList
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteOneBusinessKeyStrategy

Se o seu conector processar um registro de sink que contenha a chave de negócios year, ele excluirá o primeiro documento com um valor de campo correspondente retornado pelo MongoDB. Suponha que o conector processe um registro de afundamento que contenha os seguintes dados de valor:

{
"year": 2005,
...
}

Quando o conector processa o registro anterior, ele exclui o primeiro documento da coleção que contém um campo de year com um valor de "2005", como o documento de amostra "Dentist Appointment".original.

Se nenhuma das estratégias de modelo de gravação incluídas com o conector se ajustar ao seu caso de uso, você poderá criar suas próprias.

Uma estratégia de modelo de escrita é uma classe Java que implementa a interface WriteModelStrategy e deve substituir o método createWriteModel().

Consulte o código-fonte para a interface WriteModelStrategy para a assinatura de método exigida.

A estratégia de modelo de gravação personalizada a seguir retorna uma operação de gravação que substitui um documento do MongoDB que corresponde ao campo _id do registro de coletor pelo valor do campo de fullDocument do registro de coletor:

/**
* Custom write model strategy
*
* This class reads the 'fullDocument' field from a change stream and
* returns a ReplaceOne operation.
*/
public class CustomWriteModelStrategy implements WriteModelStrategy {
private static String ID = "_id";
@Override
public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {
BsonDocument changeStreamDocument = document.getValueDoc()
.orElseThrow(() -> new DataException("Missing value document"));
BsonDocument fullDocument = changeStreamDocument.getDocument("fullDocument", new BsonDocument());
if (fullDocument.isEmpty()) {
return null; // Return null to indicate no op.
}
return new ReplaceOneModel<>(Filters.eq(ID, fullDocument.get(ID)), fullDocument);
}
}

Para obter outro exemplo de estratégia de modelo de gravação personalizada, consulte a estratégia de exemplo UpsertAsPartOfDocumentStrategy no GitHub.

Para configurar seu conector de pia para usar uma estratégia de escrita personalizada, você deve concluir as seguintes ações:

  1. Compile a classe de estratégia de gravação personalizada a um arquivo JAR.

  2. Adicione o JAR compilado ao caminho de classe/plugin para seus trabalhadores do Kafka. Para obter mais informações sobre caminhos de plugin, consulte a documentação confluente.

    Observação

    O Kafka Connect carrega plugins isoladamente. Quando você implanta uma estratégia de gravação personalizada, tanto o JAR do conector quanto o JAR da estratégia do modelo de gravação devem estar no mesmo caminho. Seus caminhos devem se assemelhar ao seguinte:

    <plugin.path>/mongo-kafka-connect/mongo-kafka-connect-all.jar
    <plugin.path>/mongo-kafka-connect/custom-write-model-strategy.jar

    Para saber mais sobre plug-ins do Kafka Connect, consulte este guia do Confluent.

  3. Especifique sua classe personalizada na configuração writemodel.strategy.

Para saber como compilar uma classe para um arquivo JAR, consulte o Guia de implantação JAR a partir da documentação Java SE.

Voltar

Fundamentals