Personalize um pipeline para filtrar eventos de mudança
Este exemplo de uso demonstra como configurar um pipeline para personalizar os dados que seu conector de origem MongoDB Kafka consome. Um pipeline é uma aggregation pipeline do MongoDB composta de instruções para o banco de dados para filtrar ou transformar dados.
O MongoDB notifica o conector de alterações de dados que correspondem ao seu pipeline de agregação em um fluxo de alterações. Um change stream é uma sequência de eventos que descreve as alterações de dados que um cliente fez em um sistema do MongoDB em tempo real. Para obter mais informações, consulte a entrada manual do servidor MongoDB em Change Streams.
Exemplo
Suponha que você esteja coordenando um evento e queira coletar os nomes e horários de chegada de cada convidado em um evento específico . Sempre que um convidado faz check-in do evento, um aplicação insere um novo documento que contém os seguintes detalhes:
{ "_id": ObjectId(...), "eventId": 321, "name": "Dorothy Gale", "arrivalTime": 2021-10-31T20:30:00.245Z }
Você pode definir a configuração do connector pipeline
para instruir o change stream a filtrar as informações do evento de alteração da seguinte forma:
Crie eventos de alteração para operações de inserção e omita eventos para todos os outros tipos de operações.
Crie eventos de alteração somente para documentos que correspondam ao valor
fullDocument.eventId
"321" e omita todos os outros documentos.Omita os campos
_id
eeventId
do objetofullDocument
utilizando uma projeção.
Para aplicar essas transformações, atribua o seguinte pipeline de agregação à sua configuração pipeline
:
pipeline=[{"$match": { "$and": [{"operationType": "insert"}, { "fullDocument.eventId": 321 }] } }, {"$project": { "fullDocument._id": 0, "fullDocument.eventId": 0 } } ]
Importante
Certifique-se de que os resultados do pipeline contenham os campos de nível superior _id
e ns
do objeto payload
. O MongoDB usa id
como o valor do token de retomada e ns
para gerar o nome do tópico de saída Kafka.
Quando a aplicação insere o documento de amostra, o connector configurado publica o seguinte registro no tópico do Kafka:
{ ... "payload": { _id: { _data: ... }, "operationType": "insert", "fullDocument": { "name": "Dorothy Gale", "arrivalTime": "2021-10-31T20:30:00.245Z", }, "ns": { ... }, "documentKey": { _id: {"$oid": ... } } } }
Para obter mais informações sobre como gerenciar fluxos de alterações com o conector de origem, consulte a documentação do conector em Change Streams.