Menu Docs
Página inicial do Docs
/ / /
Controlador Node.js
/

Fique atento às mudanças

Você pode observar alterações no MongoDB usando o método watch() nos seguintes objetos:

  • collection

  • Database

  • Cliente Mongo

Para cada objeto, o método watch() abre um change stream para emitir documentos de evento de mudança quando eles ocorrem.

O método watch() recebe opcionalmente uma aggregation pipeline que consiste em uma array de estágios de aggregation como o primeiro parâmetro. Os estágios de aggregation filtram e transformam os eventos de mudança.

No trecho a seguir, o estágio $match corresponde a todos os documentos do evento de alteração com um valor runtime menor que 15, filtrando todos os outros.

const pipeline = [ { $match: { runtime: { $lt: 15 } } } ];
const changeStream = myColl.watch(pipeline);

O método watch() aceita um objeto options como segundo parâmetro. Consulte os links no final desta seção para obter mais informações sobre as configurações que você pode configurar com este objeto.

O método watch() retorna uma instância de um ChangeStream. Você pode ler eventos de fluxos de mudança iterando sobre eles ou ouvindo eventos.

Selecione a aba que corresponde à maneira como você deseja ler eventos a partir do change stream:

A partir da versão 4.12, os objetos ChangeStream são iteráveis assíncronos. Com essa mudança, você pode usar loops for-await para recuperar eventos de um fluxo de mudança aberto:

for await (const change of changeStream) {
console.log("Received change: ", change);
}

Você pode chamar métodos no objeto ChangeStream , como:

  • hasNext() para verificar se há documentos restantes no fluxo

  • next() para solicitar o próximo documento no fluxo

  • close() para fechar o ChangeStream

Você pode anexar funções de ouvinte ao objeto ChangeStream chamando o método on(). Este método é herdado da classe Javascript EventEmitter. Transmita a string "change" como o primeiro parâmetro e sua função de ouvinte como o segundo parâmetro, conforme mostrado abaixo:

changeStream.on("change", (changeEvent) => { /* your listener function */ });

A função de ouvinte aciona quando um evento change é emitido. Você pode especificar a lógica no ouvinte para processar o documento do evento de alteração quando ele for recebido.

Você pode controlar o fluxo de alterações chamando pause() para parar de emitir eventos ou resume() para continuar emitindo eventos.

Para parar de processar eventos de mudança, chame o método close() na ChangeStream instância . Isso fecha o fluxo de alterações e libera recursos.

changeStream.close();

Aviso

Utilizar um ChangeStream no modo EventEmitter e Iterator simultaneamente não é suportado pelo driver e causa um erro. Isso é para evitar comportamentos indefinidos, onde o driver não pode garantir qual consumidor recebe os documentos primeiro.

O exemplo seguinte abre um fluxo de alteração na collection haikus no banco de dados do insertDB e imprime eventos de alteração conforme ocorrem:

Observação

Você pode utilizar este exemplo para se conectar a uma instância do MongoDB e interagir com um banco de dados que contém dados de amostra. Para saber mais sobre como se conectar à sua instância do MongoDB e carregar um conjunto de dados de amostra, consulte o guia Exemplos de uso.

1// Watch for changes in a collection by using a change stream
2import { MongoClient } from "mongodb";
3
4// Replace the uri string with your MongoDB deployment's connection string.
5const uri = "<connection string uri>";
6
7const client = new MongoClient(uri);
8
9// Declare a variable to hold the change stream
10let changeStream;
11
12// Define an asynchronous function to manage the change stream
13async function run() {
14 try {
15 const database = client.db("insertDB");
16 const haikus = database.collection("haikus");
17
18 // Open a Change Stream on the "haikus" collection
19 changeStream = haikus.watch();
20
21 // Print change events as they occur
22 for await (const change of changeStream) {
23 console.log("Received change:\n", change);
24 }
25 // Close the change stream when done
26 await changeStream.close();
27
28 } finally {
29 // Close the MongoDB client connection
30 await client.close();
31 }
32}
33run().catch(console.dir);
1// Watch for changes in a collection by using a change stream
2import { MongoClient } from "mongodb";
3
4// Replace the uri string with your MongoDB deployment's connection string.
5const uri = "<connection string uri>";
6
7const client = new MongoClient(uri);
8
9// Declare a variable to hold the change stream
10let changeStream;
11
12// Define an asynchronous function to manage the change stream
13async function run() {
14 try {
15 const database = client.db("insertDB");
16 const haikus = database.collection("haikus");
17
18 // Open a Change Stream on the "haikus" collection
19 changeStream = haikus.watch();
20
21 // Print change events as they occur
22 for await (const change of changeStream) {
23 console.log("Received change:\n", change);
24 }
25 // Close the change stream when done
26 await changeStream.close();
27
28 } finally {
29 // Close the MongoDB client connection
30 await client.close();
31 }
32}
33run().catch(console.dir);

Observação

Trechos de código idênticos

Os trechos de código JavaScript e TypeScript acima são idênticos. Não existem características específicas do TypeScript do condutor relevantes para este caso de utilização.

Quando você executa este código e, em seguida, faça uma alteração na coleção haikus, como executar uma operação de inserção ou exclusão, você pode fazer query no documento do evento de alteração impresso no seu terminal.

Por exemplo, se você inserir um documento na coleção, o código imprime a seguinte saída:

Received change:
{
_id: {
_data: '...'
},
operationType: 'insert',
clusterTime: new Timestamp({ t: 1675800603, i: 31 }),
fullDocument: {
_id: new ObjectId("..."),
...
},
ns: { db: 'insertDB', coll: 'haikus' },
documentKey: { _id: new ObjectId("...") }
}

Observação

Receber documentos completos de atualizações

Os eventos de alteração que contêm informações sobre operações de atualização retornam apenas os campos modificados por padrão, em vez do documento atualizado completo. Você pode configurar seu fluxo de alterações para também retornar a versão mais atual do documento definindo o campo fullDocument do objeto de opções para "updateLookup" como segue:

const options = { fullDocument: "updateLookup" };
// This could be any pipeline.
const pipeline = [];
const changeStream = myColl.watch(pipeline, options);

O exemplo a seguir abre um fluxo de alteração na coleção haikus no banco de dados insertDB. Vamos criar uma função de ouvinte para receber e imprimir eventos de mudança que ocorram na coleção.

Primeiro, abra o change stream na collection e, em seguida, defina um ouvinte no change stream usando o método on(). Depois de definir o ouvinte, gere um evento de alteração realizando uma alteração na collection.

Para gerar o evento de alteração na coleção, vamos usar o método insertOne() para adicionar um novo documento. Uma vez que insertOne() pode ser executado antes da função de ouvinte pode registrar, usamos um temporizador, definido como simulateAsyncPause para aguardar 1 segundo antes de executar a inserção.

Também usamos simulateAsyncPause após a inserção do documento. Isto fornece tempo suficiente para a função de ouvinte receber o evento de alteração e para o ouvinte completar sua execução antes fechar a instância ChangeStream utilizando o método close().

Observação

Razão para incluir temporizadores

Os cronômetros usados neste exemplo são apenas para fins de demonstração. Eles garantem que haja tempo suficiente para se registrar o ouvinte e faça com que o ouvinte processe o evento de mudança antes de finalizar.

1/* Change stream listener */
2
3import { MongoClient } from "mongodb";
4
5// Replace the uri string with your MongoDB deployment's connection string
6const uri = "<connection string uri>";
7
8const client = new MongoClient(uri);
9
10const simulateAsyncPause = () =>
11 new Promise(resolve => {
12 setTimeout(() => resolve(), 1000);
13 });
14
15let changeStream;
16async function run() {
17 try {
18 const database = client.db("insertDB");
19 const haikus = database.collection("haikus");
20
21 // Open a Change Stream on the "haikus" collection
22 changeStream = haikus.watch();
23
24 // Set up a change stream listener when change events are emitted
25 changeStream.on("change", next => {
26 // Print any change event
27 console.log("received a change to the collection: \t", next);
28 });
29
30 // Pause before inserting a document
31 await simulateAsyncPause();
32
33 // Insert a new document into the collection
34 await myColl.insertOne({
35 title: "Record of a Shriveled Datum",
36 content: "No bytes, no problem. Just insert a document, in MongoDB",
37 });
38
39 // Pause before closing the change stream
40 await simulateAsyncPause();
41
42 // Close the change stream and print a message to the console when it is closed
43 await changeStream.close();
44 console.log("closed the change stream");
45 } finally {
46 // Close the database connection on completion or error
47 await client.close();
48 }
49}
50run().catch(console.dir);
1/* Change stream listener */
2
3import { MongoClient } from "mongodb";
4
5// Replace the uri string with your MongoDB deployment's connection string
6const uri = "<connection string uri>";
7
8const client = new MongoClient(uri);
9
10const simulateAsyncPause = () =>
11 new Promise(resolve => {
12 setTimeout(() => resolve(), 1000);
13 });
14
15let changeStream;
16async function run() {
17 try {
18 const database = client.db("insertDB");
19 const haikus = database.collection("haikus");
20
21 // Open a Change Stream on the "haikus" collection
22 changeStream = haikus.watch();
23
24 // Set up a change stream listener when change events are emitted
25 changeStream.on("change", next => {
26 // Print any change event
27 console.log("received a change to the collection: \t", next);
28 });
29
30 // Pause before inserting a document
31 await simulateAsyncPause();
32
33 // Insert a new document into the collection
34 await myColl.insertOne({
35 title: "Record of a Shriveled Datum",
36 content: "No bytes, no problem. Just insert a document, in MongoDB",
37 });
38
39 // Pause before closing the change stream
40 await simulateAsyncPause();
41
42 // Close the change stream and print a message to the console when it is closed
43 await changeStream.close();
44 console.log("closed the change stream");
45 } finally {
46 // Close the database connection on completion or error
47 await client.close();
48 }
49}
50run().catch(console.dir);

Observação

Trechos de código idênticos

Os trechos de código JavaScript e TypeScript acima são idênticos. Não existem características específicas do TypeScript do condutor relevantes para este caso de utilização.

Visite os seguintes recursos para obter mais material sobre as classes e métodos mencionados nesta página:

Voltar

Execute um comando