Fique atento às mudanças
Abrir um fluxo de alterações
Você pode observar alterações no MongoDB usando o método watch()
nos seguintes objetos:
Para cada objeto, o método watch()
abre um change stream para emitir documentos de evento de mudança quando eles ocorrem.
O método watch()
recebe opcionalmente uma aggregation pipeline que consiste em uma array de estágios de aggregation como o primeiro parâmetro. Os estágios de aggregation filtram e transformam os eventos de mudança.
No trecho a seguir, o estágio $match
corresponde a todos os documentos do evento de alteração com um valor runtime
menor que 15, filtrando todos os outros.
const pipeline = [ { $match: { runtime: { $lt: 15 } } } ]; const changeStream = myColl.watch(pipeline);
O método watch()
aceita um objeto options
como segundo parâmetro. Consulte os links no final desta seção para obter mais informações sobre as configurações que você pode configurar com este objeto.
O método watch()
retorna uma instância de um ChangeStream. Você pode ler eventos de fluxos de mudança iterando sobre eles ou ouvindo eventos.
Selecione a aba que corresponde à maneira como você deseja ler eventos a partir do change stream:
A partir da versão 4.12, os objetos ChangeStream
são iteráveis assíncronos. Com essa mudança, você pode usar loops for-await
para recuperar eventos de um fluxo de mudança aberto:
for await (const change of changeStream) { console.log("Received change: ", change); }
Você pode chamar métodos no objeto ChangeStream
, como:
hasNext()
para verificar se há documentos restantes no fluxonext()
para solicitar o próximo documento no fluxoclose()
para fechar o ChangeStream
Você pode anexar funções de ouvinte ao objeto ChangeStream
chamando o método on()
. Este método é herdado da classe Javascript EventEmitter
. Transmita a string "change"
como o primeiro parâmetro e sua função de ouvinte como o segundo parâmetro, conforme mostrado abaixo:
changeStream.on("change", (changeEvent) => { /* your listener function */ });
A função de ouvinte aciona quando um evento change
é emitido. Você pode especificar a lógica no ouvinte para processar o documento do evento de alteração quando ele for recebido.
Você pode controlar o fluxo de alterações chamando pause()
para parar de emitir eventos ou resume()
para continuar emitindo eventos.
Para parar de processar eventos de mudança, chame o método close() na ChangeStream
instância . Isso fecha o fluxo de alterações e libera recursos.
changeStream.close();
Aviso
Utilizar um ChangeStream
no modo EventEmitter
e Iterator
simultaneamente não é suportado pelo driver e causa um erro. Isso é para evitar comportamentos indefinidos, onde o driver não pode garantir qual consumidor recebe os documentos primeiro.
Exemplos
Iteração
O exemplo seguinte abre um fluxo de alteração na collection haikus
no banco de dados do insertDB
e imprime eventos de alteração conforme ocorrem:
Observação
Você pode utilizar este exemplo para se conectar a uma instância do MongoDB e interagir com um banco de dados que contém dados de amostra. Para saber mais sobre como se conectar à sua instância do MongoDB e carregar um conjunto de dados de amostra, consulte o guia Exemplos de uso.
1 import { MongoClient } from "mongodb"; 2 3 // Replace the uri string with your MongoDB deployment's connection string. 4 const uri = "<connection string uri>"; 5 6 const client = new MongoClient(uri); 7 8 let changeStream; 9 async function run() { 10 try { 11 const database = client.db("insertDB"); 12 const haikus = database.collection("haikus"); 13 14 // Open a Change Stream on the "haikus" collection 15 changeStream = haikus.watch(); 16 17 // Print change events 18 for await (const change of changeStream) { 19 console.log("Received change:\n", change); 20 } 21 22 await changeStream.close(); 23 24 } finally { 25 await client.close(); 26 } 27 } 28 run().catch(console.dir);
1 import { MongoClient } from "mongodb"; 2 3 // Replace the uri string with your MongoDB deployment's connection string. 4 const uri = "<connection string uri>"; 5 6 const client = new MongoClient(uri); 7 8 let changeStream; 9 async function run() { 10 try { 11 const database = client.db("insertDB"); 12 const haikus = database.collection("haikus"); 13 14 // Open a Change Stream on the "haikus" collection 15 changeStream = haikus.watch(); 16 17 // Print change events 18 for await (const change of changeStream) { 19 console.log("Received change:\n", change); 20 } 21 22 await changeStream.close(); 23 24 } finally { 25 await client.close(); 26 } 27 } 28 run().catch(console.dir);
Observação
Trechos de código idênticos
Os trechos de código JavaScript e TypeScript acima são idênticos. Não existem características específicas do TypeScript do condutor relevantes para este caso de utilização.
Quando você executa este código e, em seguida, faça uma alteração na coleção haikus
, como executar uma operação de inserção ou exclusão, você pode fazer query no documento do evento de alteração impresso no seu terminal.
Por exemplo, se você inserir um documento na coleção, o código imprime a seguinte saída:
Received change: { _id: { _data: '...' }, operationType: 'insert', clusterTime: new Timestamp({ t: 1675800603, i: 31 }), fullDocument: { _id: new ObjectId("..."), ... }, ns: { db: 'insertDB', coll: 'haikus' }, documentKey: { _id: new ObjectId("...") } }
Observação
Receber documentos completos de atualizações
Os eventos de alteração que contêm informações sobre operações de atualização retornam apenas os campos
modificados por padrão, em vez do documento atualizado completo. Você pode configurar seu fluxo de alterações para também retornar a versão mais atual do documento definindo o campo fullDocument
do objeto de opções para "updateLookup"
como segue:
const options = { fullDocument: "updateLookup" }; // This could be any pipeline. const pipeline = []; const changeStream = myColl.watch(pipeline, options);
Função de ouvinte
O exemplo a seguir abre um fluxo de alteração na coleção haikus
no banco de dados insertDB
. Vamos criar uma função de ouvinte para receber e imprimir eventos de mudança que ocorram na coleção.
Primeiro, abra o change stream na collection e, em seguida, defina um ouvinte no change stream usando o método on()
. Depois de definir o ouvinte, gere um evento de alteração realizando uma alteração na collection.
Para gerar o evento de alteração na coleção, vamos usar o método insertOne()
para adicionar um novo documento. Uma vez que insertOne()
pode ser executado antes da função de ouvinte pode registrar, usamos um temporizador, definido como simulateAsyncPause
para aguardar 1 segundo antes de executar a inserção.
Também usamos simulateAsyncPause
após a inserção do documento. Isto fornece tempo suficiente para a função de ouvinte receber o evento de alteração e para o ouvinte completar sua execução antes fechar a instância ChangeStream
utilizando o método close()
.
Observação
Razão para incluir temporizadores
Os cronômetros usados neste exemplo são apenas para fins de demonstração. Eles garantem que haja tempo suficiente para se registrar o ouvinte e faça com que o ouvinte processe o evento de mudança antes de finalizar.
1 import { MongoClient } from "mongodb"; 2 3 // Replace the uri string with your MongoDB deployment's connection string. 4 const uri = "<connection string uri>"; 5 6 const client = new MongoClient(uri); 7 8 const simulateAsyncPause = () => 9 new Promise(resolve => { 10 setTimeout(() => resolve(), 1000); 11 }); 12 13 let changeStream; 14 async function run() { 15 try { 16 const database = client.db("insertDB"); 17 const haikus = database.collection("haikus"); 18 19 // open a Change Stream on the "haikus" collection 20 changeStream = haikus.watch(); 21 22 // set up a listener when change events are emitted 23 changeStream.on("change", next => { 24 // process any change event 25 console.log("received a change to the collection: \t", next); 26 }); 27 28 await simulateAsyncPause(); 29 30 await myColl.insertOne({ 31 title: "Record of a Shriveled Datum", 32 content: "No bytes, no problem. Just insert a document, in MongoDB", 33 }); 34 35 await simulateAsyncPause(); 36 37 await changeStream.close(); 38 39 console.log("closed the change stream"); 40 } finally { 41 await client.close(); 42 } 43 } 44 run().catch(console.dir);
1 import { MongoClient } from "mongodb"; 2 3 // Replace the uri string with your MongoDB deployment's connection string. 4 const uri = "<connection string uri>"; 5 6 const client = new MongoClient(uri); 7 8 const simulateAsyncPause = () => 9 new Promise(resolve => { 10 setTimeout(() => resolve(), 1000); 11 }); 12 13 let changeStream; 14 async function run() { 15 try { 16 const database = client.db("insertDB"); 17 const haikus = database.collection("haikus"); 18 19 // open a Change Stream on the "haikus" collection 20 changeStream = haikus.watch(); 21 22 // set up a listener when change events are emitted 23 changeStream.on("change", next => { 24 // process any change event 25 console.log("received a change to the collection: \t", next); 26 }); 27 28 await simulateAsyncPause(); 29 30 await myColl.insertOne({ 31 title: "Record of a Shriveled Datum", 32 content: "No bytes, no problem. Just insert a document, in MongoDB", 33 }); 34 35 await simulateAsyncPause(); 36 37 await changeStream.close(); 38 39 console.log("closed the change stream"); 40 } finally { 41 await client.close(); 42 } 43 } 44 run().catch(console.dir);
Observação
Trechos de código idênticos
Os trechos de código JavaScript e TypeScript acima são idênticos. Não existem características específicas do TypeScript do condutor relevantes para este caso de utilização.
Visite os seguintes recursos para obter material adicional sobre as classes e métodos mencionados nesta página: