Fique atento às mudanças
Abrir um fluxo de alterações
Você pode observar alterações no MongoDB usando o método watch()
nos seguintes objetos:
Para cada objeto, o método watch()
abre um change stream para emitir documentos de evento de mudança quando eles ocorrem.
O método watch()
recebe opcionalmente uma aggregation pipeline que consiste em uma array de estágios de aggregation como o primeiro parâmetro. Os estágios de aggregation filtram e transformam os eventos de mudança.
No trecho a seguir, o estágio $match
corresponde a todos os documentos do evento de alteração com um valor runtime
menor que 15, filtrando todos os outros.
const pipeline = [ { $match: { runtime: { $lt: 15 } } } ]; const changeStream = myColl.watch(pipeline);
O método watch()
aceita um objeto options
como segundo parâmetro. Consulte os links no final desta seção para obter mais informações sobre as configurações que você pode configurar com este objeto.
O método watch()
retorna uma instância de um ChangeStream. Você pode ler eventos de fluxos de mudança iterando sobre eles ou ouvindo eventos.
Selecione a aba que corresponde à maneira como você deseja ler eventos a partir do change stream:
A partir da versão 4.12, os objetos ChangeStream
são iteráveis assíncronos. Com essa mudança, você pode usar loops for-await
para recuperar eventos de um fluxo de mudança aberto:
for await (const change of changeStream) { console.log("Received change: ", change); }
Você pode chamar métodos no objeto ChangeStream
, como:
hasNext()
para verificar se há documentos restantes no fluxonext()
para solicitar o próximo documento no fluxoclose()
para fechar o ChangeStream
Você pode anexar funções de ouvinte ao objeto ChangeStream
chamando o método on()
. Este método é herdado da classe Javascript EventEmitter
. Transmita a string "change"
como o primeiro parâmetro e sua função de ouvinte como o segundo parâmetro, conforme mostrado abaixo:
changeStream.on("change", (changeEvent) => { /* your listener function */ });
A função de ouvinte aciona quando um evento change
é emitido. Você pode especificar a lógica no ouvinte para processar o documento do evento de alteração quando ele for recebido.
Você pode controlar o fluxo de alterações chamando pause()
para parar de emitir eventos ou resume()
para continuar emitindo eventos.
Para parar de processar eventos de mudança, chame o método close() na ChangeStream
instância . Isso fecha o fluxo de alterações e libera recursos.
changeStream.close();
Aviso
Utilizar um ChangeStream
no modo EventEmitter
e Iterator
simultaneamente não é suportado pelo driver e causa um erro. Isso é para evitar comportamentos indefinidos, onde o driver não pode garantir qual consumidor recebe os documentos primeiro.
Exemplos
Iteração
O exemplo seguinte abre um fluxo de alteração na collection haikus
no banco de dados do insertDB
e imprime eventos de alteração conforme ocorrem:
Observação
Você pode utilizar este exemplo para se conectar a uma instância do MongoDB e interagir com um banco de dados que contém dados de amostra. Para saber mais sobre como se conectar à sua instância do MongoDB e carregar um conjunto de dados de amostra, consulte o guia Exemplos de uso.
1 // Watch for changes in a collection by using a change stream 2 import { MongoClient } from "mongodb"; 3 4 // Replace the uri string with your MongoDB deployment's connection string. 5 const uri = "<connection string uri>"; 6 7 const client = new MongoClient(uri); 8 9 // Declare a variable to hold the change stream 10 let changeStream; 11 12 // Define an asynchronous function to manage the change stream 13 async function run() { 14 try { 15 const database = client.db("insertDB"); 16 const haikus = database.collection("haikus"); 17 18 // Open a Change Stream on the "haikus" collection 19 changeStream = haikus.watch(); 20 21 // Print change events as they occur 22 for await (const change of changeStream) { 23 console.log("Received change:\n", change); 24 } 25 // Close the change stream when done 26 await changeStream.close(); 27 28 } finally { 29 // Close the MongoDB client connection 30 await client.close(); 31 } 32 } 33 run().catch(console.dir);
1 // Watch for changes in a collection by using a change stream 2 import { MongoClient } from "mongodb"; 3 4 // Replace the uri string with your MongoDB deployment's connection string. 5 const uri = "<connection string uri>"; 6 7 const client = new MongoClient(uri); 8 9 // Declare a variable to hold the change stream 10 let changeStream; 11 12 // Define an asynchronous function to manage the change stream 13 async function run() { 14 try { 15 const database = client.db("insertDB"); 16 const haikus = database.collection("haikus"); 17 18 // Open a Change Stream on the "haikus" collection 19 changeStream = haikus.watch(); 20 21 // Print change events as they occur 22 for await (const change of changeStream) { 23 console.log("Received change:\n", change); 24 } 25 // Close the change stream when done 26 await changeStream.close(); 27 28 } finally { 29 // Close the MongoDB client connection 30 await client.close(); 31 } 32 } 33 run().catch(console.dir);
Observação
Trechos de código idênticos
Os trechos de código JavaScript e TypeScript acima são idênticos. Não existem características específicas do TypeScript do condutor relevantes para este caso de utilização.
Quando você executa este código e, em seguida, faça uma alteração na coleção haikus
, como executar uma operação de inserção ou exclusão, você pode fazer query no documento do evento de alteração impresso no seu terminal.
Por exemplo, se você inserir um documento na coleção, o código imprime a seguinte saída:
Received change: { _id: { _data: '...' }, operationType: 'insert', clusterTime: new Timestamp({ t: 1675800603, i: 31 }), fullDocument: { _id: new ObjectId("..."), ... }, ns: { db: 'insertDB', coll: 'haikus' }, documentKey: { _id: new ObjectId("...") } }
Observação
Receber documentos completos de atualizações
Os eventos de alteração que contêm informações sobre operações de atualização retornam apenas os campos
modificados por padrão, em vez do documento atualizado completo. Você pode configurar seu fluxo de alterações para também retornar a versão mais atual do documento definindo o campo fullDocument
do objeto de opções para "updateLookup"
como segue:
const options = { fullDocument: "updateLookup" }; // This could be any pipeline. const pipeline = []; const changeStream = myColl.watch(pipeline, options);
Função de ouvinte
O exemplo a seguir abre um fluxo de alteração na coleção haikus
no banco de dados insertDB
. Vamos criar uma função de ouvinte para receber e imprimir eventos de mudança que ocorram na coleção.
Primeiro, abra o change stream na collection e, em seguida, defina um ouvinte no change stream usando o método on()
. Depois de definir o ouvinte, gere um evento de alteração realizando uma alteração na collection.
Para gerar o evento de alteração na coleção, vamos usar o método insertOne()
para adicionar um novo documento. Uma vez que insertOne()
pode ser executado antes da função de ouvinte pode registrar, usamos um temporizador, definido como simulateAsyncPause
para aguardar 1 segundo antes de executar a inserção.
Também usamos simulateAsyncPause
após a inserção do documento. Isto fornece tempo suficiente para a função de ouvinte receber o evento de alteração e para o ouvinte completar sua execução antes fechar a instância ChangeStream
utilizando o método close()
.
Observação
Razão para incluir temporizadores
Os cronômetros usados neste exemplo são apenas para fins de demonstração. Eles garantem que haja tempo suficiente para se registrar o ouvinte e faça com que o ouvinte processe o evento de mudança antes de finalizar.
1 /* Change stream listener */ 2 3 import { MongoClient } from "mongodb"; 4 5 // Replace the uri string with your MongoDB deployment's connection string 6 const uri = "<connection string uri>"; 7 8 const client = new MongoClient(uri); 9 10 const simulateAsyncPause = () => 11 new Promise(resolve => { 12 setTimeout(() => resolve(), 1000); 13 }); 14 15 let changeStream; 16 async function run() { 17 try { 18 const database = client.db("insertDB"); 19 const haikus = database.collection("haikus"); 20 21 // Open a Change Stream on the "haikus" collection 22 changeStream = haikus.watch(); 23 24 // Set up a change stream listener when change events are emitted 25 changeStream.on("change", next => { 26 // Print any change event 27 console.log("received a change to the collection: \t", next); 28 }); 29 30 // Pause before inserting a document 31 await simulateAsyncPause(); 32 33 // Insert a new document into the collection 34 await myColl.insertOne({ 35 title: "Record of a Shriveled Datum", 36 content: "No bytes, no problem. Just insert a document, in MongoDB", 37 }); 38 39 // Pause before closing the change stream 40 await simulateAsyncPause(); 41 42 // Close the change stream and print a message to the console when it is closed 43 await changeStream.close(); 44 console.log("closed the change stream"); 45 } finally { 46 // Close the database connection on completion or error 47 await client.close(); 48 } 49 } 50 run().catch(console.dir);
1 /* Change stream listener */ 2 3 import { MongoClient } from "mongodb"; 4 5 // Replace the uri string with your MongoDB deployment's connection string 6 const uri = "<connection string uri>"; 7 8 const client = new MongoClient(uri); 9 10 const simulateAsyncPause = () => 11 new Promise(resolve => { 12 setTimeout(() => resolve(), 1000); 13 }); 14 15 let changeStream; 16 async function run() { 17 try { 18 const database = client.db("insertDB"); 19 const haikus = database.collection("haikus"); 20 21 // Open a Change Stream on the "haikus" collection 22 changeStream = haikus.watch(); 23 24 // Set up a change stream listener when change events are emitted 25 changeStream.on("change", next => { 26 // Print any change event 27 console.log("received a change to the collection: \t", next); 28 }); 29 30 // Pause before inserting a document 31 await simulateAsyncPause(); 32 33 // Insert a new document into the collection 34 await myColl.insertOne({ 35 title: "Record of a Shriveled Datum", 36 content: "No bytes, no problem. Just insert a document, in MongoDB", 37 }); 38 39 // Pause before closing the change stream 40 await simulateAsyncPause(); 41 42 // Close the change stream and print a message to the console when it is closed 43 await changeStream.close(); 44 console.log("closed the change stream"); 45 } finally { 46 // Close the database connection on completion or error 47 await client.close(); 48 } 49 } 50 run().catch(console.dir);
Observação
Trechos de código idênticos
Os trechos de código JavaScript e TypeScript acima são idênticos. Não existem características específicas do TypeScript do condutor relevantes para este caso de utilização.
Visite os seguintes recursos para obter mais material sobre as classes e métodos mencionados nesta página: