Menu Docs
Página inicial do Docs
/
MongoDB Kafka Connector
/ /

Personalize um pipeline para filtrar eventos de mudança

Este exemplo de uso demonstra como configurar um pipeline para personalizar os dados que seu conector de origem MongoDB Kafka consome. Um pipeline é uma aggregation pipeline do MongoDB composta de instruções para o banco de dados para filtrar ou transformar dados.

O MongoDB notifica o conector de alterações de dados que correspondem ao seu pipeline de agregação em um fluxo de alterações. Um change stream é uma sequência de eventos que descreve as alterações de dados que um cliente fez em um sistema do MongoDB em tempo real. Para obter mais informações, consulte a entrada manual do servidor MongoDB em Change Streams.

Suponha que você esteja coordenando um evento e queira coletar os nomes e horários de chegada de cada convidado em um evento específico . Sempre que um convidado faz check-in do evento, um aplicação insere um novo documento que contém os seguintes detalhes:

{
"_id": ObjectId(...),
"eventId": 321,
"name": "Dorothy Gale",
"arrivalTime": 2021-10-31T20:30:00.245Z
}

Você pode definir a configuração do connector pipeline para instruir o change stream a filtrar as informações do evento de alteração da seguinte forma:

  • Crie eventos de alteração para operações de inserção e omita eventos para todos os outros tipos de operações.

  • Crie eventos de alteração somente para documentos que correspondam ao valor fullDocument.eventId "321" e omita todos os outros documentos.

  • Omita os campos _id e eventId do objeto fullDocument utilizando uma projeção.

Para aplicar essas transformações, atribua o seguinte pipeline de agregação à sua configuração pipeline :

pipeline=[{"$match": { "$and": [{"operationType": "insert"}, { "fullDocument.eventId": 321 }] } }, {"$project": { "fullDocument._id": 0, "fullDocument.eventId": 0 } } ]

Importante

Certifique-se de que os resultados do pipeline contenham os campos de nível superior _id e ns do objeto payload . O MongoDB usa id como o valor do token de retomada e ns para gerar o nome do tópico de saída Kafka.

Quando a aplicação insere o documento de amostra, o connector configurado publica o seguinte registro no tópico do Kafka:

{
...
"payload": {
_id: { _data: ... },
"operationType": "insert",
"fullDocument": {
"name": "Dorothy Gale",
"arrivalTime": "2021-10-31T20:30:00.245Z",
},
"ns": { ... },
"documentKey": {
_id: {"$oid": ... }
}
}
}

Para obter mais informações sobre como gerenciar fluxos de alterações com o conector de origem, consulte a documentação do conector em Change Streams.

Voltar

Exemplos de uso