Menu Docs
Página inicial do Docs
/ / /
Node.js
/

Fique atento às mudanças

Você pode observar alterações no MongoDB usando o método watch() nos seguintes objetos:

Para cada objeto, o método watch() abre um change stream para emitir documentos de evento de mudança quando eles ocorrem.

O método watch() recebe opcionalmente uma aggregation pipeline que consiste em uma array de estágios de aggregation como o primeiro parâmetro. Os estágios de aggregation filtram e transformam os eventos de mudança.

No trecho a seguir, o estágio $match corresponde a todos os documentos do evento de alteração com um valor runtime menor que 15, filtrando todos os outros.

const pipeline = [ { $match: { runtime: { $lt: 15 } } } ];
const changeStream = myColl.watch(pipeline);

O método watch() aceita um objeto options como segundo parâmetro. Consulte os links no final desta seção para obter mais informações sobre as configurações que você pode configurar com este objeto.

O método watch() retorna uma instância de um ChangeStream. Você pode ler eventos de fluxos de mudança iterando sobre eles ou ouvindo eventos.

Selecione a aba que corresponde à maneira como você deseja ler eventos a partir do change stream:

A partir da versão 4.12, os objetos ChangeStream são iteráveis assíncronos. Com essa mudança, você pode usar loops for-await para recuperar eventos de um fluxo de mudança aberto:

for await (const change of changeStream) {
console.log("Received change: ", change);
}

Você pode chamar métodos no objeto ChangeStream , como:

  • hasNext() para verificar se há documentos restantes no fluxo

  • next() para solicitar o próximo documento no fluxo

  • close() para fechar o ChangeStream

Você pode anexar funções de ouvinte ao objeto ChangeStream chamando o método on(). Este método é herdado da classe Javascript EventEmitter. Transmita a string "change" como o primeiro parâmetro e sua função de ouvinte como o segundo parâmetro, conforme mostrado abaixo:

changeStream.on("change", (changeEvent) => { /* your listener function */ });

A função de ouvinte aciona quando um evento change é emitido. Você pode especificar a lógica no ouvinte para processar o documento do evento de alteração quando ele for recebido.

Você pode controlar o fluxo de alterações chamando pause() para parar de emitir eventos ou resume() para continuar emitindo eventos.

Para parar de processar eventos de mudança, chame o método close() na ChangeStream instância . Isso fecha o fluxo de alterações e libera recursos.

changeStream.close();

Aviso

Utilizar um ChangeStream no modo EventEmitter e Iterator simultaneamente não é suportado pelo driver e causa um erro. Isso é para evitar comportamentos indefinidos, onde o driver não pode garantir qual consumidor recebe os documentos primeiro.

O exemplo seguinte abre um fluxo de alteração na collection haikus no banco de dados do insertDB e imprime eventos de alteração conforme ocorrem:

Observação

Você pode utilizar este exemplo para se conectar a uma instância do MongoDB e interagir com um banco de dados que contém dados de amostra. Para saber mais sobre como se conectar à sua instância do MongoDB e carregar um conjunto de dados de amostra, consulte o guia Exemplos de uso.

1import { MongoClient } from "mongodb";
2
3// Replace the uri string with your MongoDB deployment's connection string.
4const uri = "<connection string uri>";
5
6const client = new MongoClient(uri);
7
8let changeStream;
9async function run() {
10 try {
11 const database = client.db("insertDB");
12 const haikus = database.collection("haikus");
13
14 // Open a Change Stream on the "haikus" collection
15 changeStream = haikus.watch();
16
17 // Print change events
18 for await (const change of changeStream) {
19 console.log("Received change:\n", change);
20 }
21
22 await changeStream.close();
23
24 } finally {
25 await client.close();
26 }
27}
28run().catch(console.dir);
1import { MongoClient } from "mongodb";
2
3// Replace the uri string with your MongoDB deployment's connection string.
4const uri = "<connection string uri>";
5
6const client = new MongoClient(uri);
7
8let changeStream;
9async function run() {
10 try {
11 const database = client.db("insertDB");
12 const haikus = database.collection("haikus");
13
14 // Open a Change Stream on the "haikus" collection
15 changeStream = haikus.watch();
16
17 // Print change events
18 for await (const change of changeStream) {
19 console.log("Received change:\n", change);
20 }
21
22 await changeStream.close();
23
24 } finally {
25 await client.close();
26 }
27}
28run().catch(console.dir);

Observação

Trechos de código idênticos

Os trechos de código JavaScript e TypeScript acima são idênticos. Não existem características específicas do TypeScript do condutor relevantes para este caso de utilização.

Quando você executa este código e, em seguida, faça uma alteração na coleção haikus, como executar uma operação de inserção ou exclusão, você pode fazer query no documento do evento de alteração impresso no seu terminal.

Por exemplo, se você inserir um documento na coleção, o código imprime a seguinte saída:

Received change:
{
_id: {
_data: '...'
},
operationType: 'insert',
clusterTime: new Timestamp({ t: 1675800603, i: 31 }),
fullDocument: {
_id: new ObjectId("..."),
...
},
ns: { db: 'insertDB', coll: 'haikus' },
documentKey: { _id: new ObjectId("...") }
}

Observação

Receber documentos completos de atualizações

Os eventos de alteração que contêm informações sobre operações de atualização retornam apenas os campos modificados por padrão, em vez do documento atualizado completo. Você pode configurar seu fluxo de alterações para também retornar a versão mais atual do documento definindo o campo fullDocument do objeto de opções para "updateLookup" como segue:

const options = { fullDocument: "updateLookup" };
// This could be any pipeline.
const pipeline = [];
const changeStream = myColl.watch(pipeline, options);

O exemplo a seguir abre um fluxo de alteração na coleção haikus no banco de dados insertDB. Vamos criar uma função de ouvinte para receber e imprimir eventos de mudança que ocorram na coleção.

Primeiro, abra o change stream na collection e, em seguida, defina um ouvinte no change stream usando o método on(). Depois de definir o ouvinte, gere um evento de alteração realizando uma alteração na collection.

Para gerar o evento de alteração na coleção, vamos usar o método insertOne() para adicionar um novo documento. Uma vez que insertOne() pode ser executado antes da função de ouvinte pode registrar, usamos um temporizador, definido como simulateAsyncPause para aguardar 1 segundo antes de executar a inserção.

Também usamos simulateAsyncPause após a inserção do documento. Isto fornece tempo suficiente para a função de ouvinte receber o evento de alteração e para o ouvinte completar sua execução antes fechar a instância ChangeStream utilizando o método close().

Observação

Razão para incluir temporizadores

Os cronômetros usados neste exemplo são apenas para fins de demonstração. Eles garantem que haja tempo suficiente para se registrar o ouvinte e faça com que o ouvinte processe o evento de mudança antes de finalizar.

1import { MongoClient } from "mongodb";
2
3// Replace the uri string with your MongoDB deployment's connection string.
4const uri = "<connection string uri>";
5
6const client = new MongoClient(uri);
7
8const simulateAsyncPause = () =>
9 new Promise(resolve => {
10 setTimeout(() => resolve(), 1000);
11 });
12
13let changeStream;
14async function run() {
15 try {
16 const database = client.db("insertDB");
17 const haikus = database.collection("haikus");
18
19 // open a Change Stream on the "haikus" collection
20 changeStream = haikus.watch();
21
22 // set up a listener when change events are emitted
23 changeStream.on("change", next => {
24 // process any change event
25 console.log("received a change to the collection: \t", next);
26 });
27
28 await simulateAsyncPause();
29
30 await myColl.insertOne({
31 title: "Record of a Shriveled Datum",
32 content: "No bytes, no problem. Just insert a document, in MongoDB",
33 });
34
35 await simulateAsyncPause();
36
37 await changeStream.close();
38
39 console.log("closed the change stream");
40 } finally {
41 await client.close();
42 }
43}
44run().catch(console.dir);
1import { MongoClient } from "mongodb";
2
3// Replace the uri string with your MongoDB deployment's connection string.
4const uri = "<connection string uri>";
5
6const client = new MongoClient(uri);
7
8const simulateAsyncPause = () =>
9 new Promise(resolve => {
10 setTimeout(() => resolve(), 1000);
11 });
12
13let changeStream;
14async function run() {
15 try {
16 const database = client.db("insertDB");
17 const haikus = database.collection("haikus");
18
19 // open a Change Stream on the "haikus" collection
20 changeStream = haikus.watch();
21
22 // set up a listener when change events are emitted
23 changeStream.on("change", next => {
24 // process any change event
25 console.log("received a change to the collection: \t", next);
26 });
27
28 await simulateAsyncPause();
29
30 await myColl.insertOne({
31 title: "Record of a Shriveled Datum",
32 content: "No bytes, no problem. Just insert a document, in MongoDB",
33 });
34
35 await simulateAsyncPause();
36
37 await changeStream.close();
38
39 console.log("closed the change stream");
40 } finally {
41 await client.close();
42 }
43}
44run().catch(console.dir);

Observação

Trechos de código idênticos

Os trechos de código JavaScript e TypeScript acima são idênticos. Não existem características específicas do TypeScript do condutor relevantes para este caso de utilização.

Visite os seguintes recursos para obter material adicional sobre as classes e métodos mencionados nesta página:

Voltar

Execute um comando