Explore o novo chatbot do Developer Center! O MongoDB AI chatbot pode ser acessado na parte superior da sua navegação para responder a todas as suas perguntas sobre o MongoDB .

Junte-se a nós no Amazon Web Services re:Invent 2024! Saiba como usar o MongoDB para casos de uso de AI .
Desenvolvedor do MongoDB
Central de desenvolvedor do MongoDBchevron-right
Idiomaschevron-right
JavaScriptchevron-right

Tutorial de fluxos de alterações e triggers com o Node.js

Lauren Schaefer17 min read • Published Feb 04, 2022 • Updated Aug 24, 2023
Node.jsMongoDBFluxos de alteraçõesJavaScript
Ícone do FacebookÍcone do Twitterícone do linkedin
Avalie esse Início rápido
star-empty
star-empty
star-empty
star-empty
star-empty
Logotipo do Início rápido em Node.js
Às vezes, você precisa reagir imediatamente às mudanças em seu MongoDB database. Talvez você queira fazer um pedido com um distribuidor sempre que o estoque de um item cair abaixo de um determinado limite. Ou talvez você queira enviar uma notificação por e-mail sempre que o status de um pedido mudar. Independentemente do seu caso de uso específico, sempre que você quiser reagir imediatamente às alterações em seu MongoDB database, fluxos de alterações e triggers são opções fantásticas.
Se você está apenas se juntando a nós nesta série de Início rápido com MongoDB e Node.js, seja bem-vindo! Começamos explicando como conectar-se ao MongoDB e executar cada uma das operações CRUD (criar, ler, atualizar e excluir). Em seguida, abordamos tópicos mais avançados, como o framework de agregação e as transações. O código que escrevemos hoje usará a mesma estrutura do código que criamos na primeira publicação da série, portanto, se você tiver alguma dúvida sobre como começar ou como o código é estruturado, volte a essa postagem.
E, com isso, passemos aos fluxos de alterações e triggers! Este é um resumo do que abordaremos hoje:
Prefere um vídeo a um artigo? Confira o vídeo abaixo que aborda exatamente os mesmos tópicos que discuto neste artigo.
Comece hoje mesmo com um cluster M0 no Atlas. É gratuito para sempre e é a maneira mais fácil de experimentar as etapas desta série de blogs.

O que são os fluxos de alterações?

Os change streams permitem que você receba notificações sobre alterações feitas nos seus bancos de dados e coleções do MongoDB. Ao usar change streams, você pode optar por programar ações que serão executadas automaticamente sempre que ocorrer um evento de alteração.
Os change streams utilizam o framework de agregação, então você pode escolher filtrar eventos de alteração específicos ou transformar os documentos de eventos de alteração.
Por exemplo, digamos que eu queira ser notificado sempre que um novo anúncio no mercado de Sydney, Austrália, for adicionado à coleção ListingsAndReviews. Posso criar um change stream que monitore a coleção ListingsAndReviews e use um pipeline de agregação para fazer a correspondência com os anúncios em que estou interessado.
Vejamos três formas diferentes de implementar este change stream.

configurar

Como em todas as publicações nesta série de Início Rápido do MongoDB e Node.js, você precisa garantir que concluiu as etapas de pré-requisitos descritas na seção Configuração da primeira publicação da série.
É útil ter um script que gere dados de amostra quando estiver testando os change stream. Para ajudá-lo a gerar dados de amostra rapidamente, escrevi changeStreamsTestData.js. Baixe uma cópia do arquivo, atualize a constante uri para refletir suas informações de conexão do Atlas e execute node changeStreamsTestData.js. O script fará o seguinte:
  1. Criar 3 novos anúncios (Vistas para a Opera House, Sala privada em Londres e Casa de praia bonita)
  2. Atualize 2 desses anúncios (Vistas para a Opera House e Casa de praia bonita)
  3. Crie mais 2 anúncios (Villa italiana e Casa no Porto de Sydney)
  4. Excluir um anúncio (Sydney Harbour Home).

Criar um fluxo de alterações

Agora que estamos prontos, vamos explorar três maneiras diferentes de trabalhar com um change stream no Node.js.

Obter uma cópia do modelo Node.js

Para facilitar o acompanhamento desta publicação no blog, criei um modelo inicial para um script do Node.js que acessa um cluster do Atlas.
  1. Baixe uma cópia de template.js.
  2. Abra template.js no seu editor de código favorito.
  3. Atualize o URI de conexão para apontar para o seu cluster do Atlas. Se não tiver certeza de como fazer isso, consulte a primeira publicação desta série.
  4. Salve o arquivo como changeStreams.js.
Você pode executar esse arquivo executando node changeStreams.js em seu shell. Nesse ponto, o arquivo simplesmente abre e fecha uma conexão com o cluster do Atlas, portanto, nenhuma saída é esperada. Se você vir DeprecationWarnings, poderá ignorá-los para os fins deste post.

Crie uma função auxiliar para fechar o change stream

Independentemente de como monitoramos as mudanças em nosso fluxo de alterações, convém fechar o fluxo de alterações após um determinado período. Vamos criar uma função auxiliar para fazer exatamente isso.
  1. Cole a seguinte função em changeStreams.js.
    1function closeChangeStream(timeInMs = 60000, changeStream) {
    2 return new Promise((resolve) => {
    3 setTimeout(() => {
    4 console.log("Closing the change stream");
    5 resolve(changeStream.close());
    6 }, timeInMs)
    7 })
    8};

Monitore o change stream usando o on() do EventEmitter

A classe ChangeStream do driver Node.js do MongoDB herda da classe de nó integrada EventEmitter. Como resultado, podemos usar a função on() do EventEmitter para adicionar uma função de ouvinte que será chamada sempre que ocorrer uma mudança no fluxo de alterações.

Criar a função

Vamos criar uma função que monitorará as mudanças no change stream usando on()do EventEmitter.
  1. Continuando a trabalhar em changeStreams.js, crie uma função assíncrona denominada monitorListingsUsingEventEmitter. A função deve ter os seguintes parâmetros: um MongoClient conectado, um tempo em ms que indica por quanto tempo o change stream deve ser monitorado e um pipeline de agregação que o change stream usará.
    1async function monitorListingsUsingEventEmitter(client, timeInMs = 60000, pipeline = []){
    2
    3}
  2. Agora precisamos acessar a coleção que vamos monitorar quanto a alterações. Adicione o seguinte código a monitorListingsUsingEventEmitter().
    1const collection = client.db("sample_airbnb").collection("listingsAndReviews");
  3. Agora estamos prontos para criar nosso change stream. Podemos fazer isso usando o watch() de uma coleção. Adicione a seguinte linha abaixo do código existente em monitorListingsUsingEventEmitter().
    1const changeStream = collection.watch(pipeline);
  4. Quando tivermos nosso change stream, poderemos adicionar um ouvinte a ele. Vamos registrar cada evento de alteração no console. Adicione a seguinte linha abaixo do código existente em monitorListingsUsingEventEmitter().
    1changeStream.on('change', (next) => {
    2 console.log(next);
    3});
  5. Poderíamos optar por deixar o change stream aberto indefinidamente. Em vez disso, vamos chamar nossa função auxiliar para definir um cronômetro e fechar o change stream. Adicione a seguinte linha abaixo do código existente em monitorListingsUsingEventEmitter().
    1await closeChangeStream(timeInMs, changeStream);

Chamar a função

Agora que implementamos nossa função, vamos chamá-la!
  1. Dentro de main() abaixo do comentário que diz Make the appropriate DB calls, chame sua função monitorListingsUsingEventEmitter():
    1await monitorListingsUsingEventEmitter(client);
  2. Salve seu arquivo.
  3. Execute seu script executando node changeStreams.js em seu shell. O fluxo de alterações será aberto por 60 segundos.
  4. Crie e atualize dados de amostra executando o nó changeStreamsTestData.js em um novo shell. Uma saída semelhante à seguinte será exibida em seu primeiro shell onde você estiver executando o changeStreams.js.
    1{
    2 _id: { _data: '825DE67A42000000012B022C0100296E5A10046BBC1C6A9CBB4B6E9CA9447925E693EF46645F696400645DE67A42113EA7DE6472E7640004' },
    3 operationType: 'insert',
    4 clusterTime: Timestamp { _bsontype: 'Timestamp', low_: 1, high_: 1575385666 },
    5 fullDocument: {
    6 _id: 5de67a42113ea7de6472e764,
    7 name: 'Opera House Views',
    8 summary: 'Beautiful apartment with views of the iconic Sydney Opera House',
    9 property_type: 'Apartment',
    10 bedrooms: 1,
    11 bathrooms: 1,
    12 beds: 1,
    13 address: { market: 'Sydney', country: 'Australia' }
    14 },
    15 ns: { db: 'sample_airbnb', coll: 'listingsAndReviews' },
    16 documentKey: { _id: 5de67a42113ea7de6472e764 }
    17}
    18{
    19 _id: { _data: '825DE67A42000000022B022C0100296E5A10046BBC1C6A9CBB4B6E9CA9447925E693EF46645F696400645DE67A42113EA7DE6472E7650004' },
    20 operationType: 'insert',
    21 clusterTime: Timestamp { _bsontype: 'Timestamp', low_: 2, high_: 1575385666 },
    22 fullDocument: {
    23 _id: 5de67a42113ea7de6472e765,
    24 name: 'Private room in London',
    25 property_type: 'Apartment',
    26 bedrooms: 1,
    27 bathroom: 1
    28 },
    29 ns: { db: 'sample_airbnb', coll: 'listingsAndReviews' },
    30 documentKey: { _id: 5de67a42113ea7de6472e765 }
    31}
    32{
    33 _id: { _data: '825DE67A42000000032B022C0100296E5A10046BBC1C6A9CBB4B6E9CA9447925E693EF46645F696400645DE67A42113EA7DE6472E7660004' },
    34 operationType: 'insert',
    35 clusterTime: Timestamp { _bsontype: 'Timestamp', low_: 3, high_: 1575385666 },
    36 fullDocument: {
    37 _id: 5de67a42113ea7de6472e766,
    38 name: 'Beautiful Beach House',
    39 summary: 'Enjoy relaxed beach living in this house with a private beach',
    40 bedrooms: 4,
    41 bathrooms: 2.5,
    42 beds: 7,
    43 last_review: 2019-12-03T15:07:46.730Z
    44 },
    45 ns: { db: 'sample_airbnb', coll: 'listingsAndReviews' },
    46 documentKey: { _id: 5de67a42113ea7de6472e766 }
    47 }
    48 {
    49 _id: { _data: '825DE67A42000000042B022C0100296E5A10046BBC1C6A9CBB4B6E9CA9447925E693EF46645F696400645DE67A42113EA7DE6472E7640004' },
    50 operationType: 'update',
    51 clusterTime: Timestamp { _bsontype: 'Timestamp', low_: 4, high_: 1575385666 },
    52 ns: { db: 'sample_airbnb', coll: 'listingsAndReviews' },
    53 documentKey: { _id: 5de67a42113ea7de6472e764 },
    54 updateDescription: {
    55 updatedFields: { beds: 2 },
    56 removedFields: []
    57 }
    58 }
    59 {
    60 _id: { _data: '825DE67A42000000052B022C0100296E5A10046BBC1C6A9CBB4B6E9CA9447925E693EF46645F696400645DE67A42113EA7DE6472E7660004' },
    61 operationType: 'update',
    62 clusterTime: Timestamp { _bsontype: 'Timestamp', low_: 5, high_: 1575385666 },
    63 ns: { db: 'sample_airbnb', coll: 'listingsAndReviews' },
    64 documentKey: { _id: 5de67a42113ea7de6472e766 },
    65 updateDescription: {
    66 updatedFields: { address: [Object] },
    67 removedFields: []
    68 }
    69 }
    70 {
    71 _id: { _data: '825DE67A42000000062B022C0100296E5A10046BBC1C6A9CBB4B6E9CA9447925E693EF46645F696400645DE67A42113EA7DE6472E7670004' },
    72 operationType: 'insert',
    73 clusterTime: Timestamp { _bsontype: 'Timestamp', low_: 6, high_: 1575385666 },
    74 fullDocument: {
    75 _id: 5de67a42113ea7de6472e767,
    76 name: 'Italian Villa',
    77 property_type: 'Entire home/apt',
    78 bedrooms: 6,
    79 bathrooms: 4,
    80 address: { market: 'Cinque Terre', country: 'Italy' }
    81 },
    82 ns: { db: 'sample_airbnb', coll: 'listingsAndReviews' },
    83 documentKey: { _id: 5de67a42113ea7de6472e767 }
    84 }
    85 {
    86 _id: { _data: '825DE67A42000000072B022C0100296E5A10046BBC1C6A9CBB4B6E9CA9447925E693EF46645F696400645DE67A42113EA7DE6472E7680004' },
    87 operationType: 'insert',
    88 clusterTime: Timestamp { _bsontype: 'Timestamp', low_: 7, high_: 1575385666 },
    89 fullDocument: {
    90 _id: 5de67a42113ea7de6472e768,
    91 name: 'Sydney Harbour Home',
    92 bedrooms: 4,
    93 bathrooms: 2.5,
    94 address: { market: 'Sydney', country: 'Australia' } },
    95 ns: { db: 'sample_airbnb', coll: 'listingsAndReviews' },
    96 documentKey: { _id: 5de67a42113ea7de6472e768 }
    97 }
    98 {
    99 _id: { _data: '825DE67A42000000082B022C0100296E5A10046BBC1C6A9CBB4B6E9CA9447925E693EF46645F696400645DE67A42113EA7DE6472E7680004' },
    100 operationType: 'delete',
    101 clusterTime: Timestamp { _bsontype: 'Timestamp', low_: 8, high_: 1575385666 },
    102 ns: { db: 'sample_airbnb', coll: 'listingsAndReviews' },
    103 documentKey: { _id: 5de67a42113ea7de6472e768 }
    104 }
    Se você executar node changeStreamsTestData.js novamente antes que timer de 60 segundos tenha sido concluído, verá uma saída semelhante.
    Após 60 segundos, o seguinte será exibido:
    1Closing the change stream

Chame a função com um pipeline de agregação

Em alguns casos, você não se importará com todos os eventos de alteração que ocorrem em uma coleção. Em vez disso, será uma boa limitar as alterações que você está monitorando. Você pode usar um pipeline de agregação para filtrar as alterações ou transformar os documentos de eventos do fluxo de alterações.
No nosso caso, só nos interessam os novos anúncios no mercado de Sydney, Austrália. Vamos criar um pipeline de agregação para filtrar apenas as alterações na coleção listingsAndReviews.
Para saber mais sobre quais estágios do pipeline de agregação podem ser usados com fluxos de alterações, consulte a documentação oficial do fluxo de alterações.
  1. Dentro de main() e acima de sua chamada existente para monitorListingsUsingEventEmitter(), crie um pipeline de agregação:
    1const pipeline = [
    2 {
    3 '$match': {
    4 'operationType': 'insert',
    5 'fullDocument.address.country': 'Australia',
    6 'fullDocument.address.market': 'Sydney'
    7 },
    8 }
    9 ];
  2. Vamos usar esse pipeline para filtrar as alterações em nosso change stream. Atualize sua chamada existente para monitorListingsUsingEventEmitter() para deixar o change stream aberto por 30 segundos e usar o pipeline.
    1await monitorListingsUsingEventEmitter(client, 30000, pipeline);
  3. Salve seu arquivo.
  4. Execute seu script executando node changeStreams.js em seu shell. O fluxo de alterações será aberto por 30 segundos.
  5. Crie e atualize dados de amostra executando o nó changeStreamsTestData.js em um novo shell. Como o fluxo de alterações está usando o pipeline que você acabou de criar, somente os documentos inseridos na coleção listingsAndReviews que estão no mercado de Sydney, Austrália, estarão no fluxo de alterações. Uma saída semelhante à seguinte será exibida em sua primeira shell onde você estiver executando changeStreams.js.
    1{
    2 _id: { _data: '825DE67CED000000012B022C0100296E5A10046BBC1C6A9CBB4B6E9CA9447925E693EF46645F696400645DE67CED150EA2DF172344370004' },
    3 operationType: 'insert',
    4 clusterTime: Timestamp { _bsontype: 'Timestamp', low_: 1, high_: 1575386349 },
    5 fullDocument: {
    6 _id: 5de67ced150ea2df17234437,
    7 name: 'Opera House Views',
    8 summary: 'Beautiful apartment with views of the iconic Sydney Opera House',
    9 property_type: 'Apartment',
    10 bedrooms: 1,
    11 bathrooms: 1,
    12 beds: 1,
    13 address: { market: 'Sydney', country: 'Australia' }
    14 },
    15 ns: { db: 'sample_airbnb', coll: 'listingsAndReviews' },
    16 documentKey: { _id: 5de67ced150ea2df17234437 }
    17}
    18{
    19 _id: { _data: '825DE67CEE000000032B022C0100296E5A10046BBC1C6A9CBB4B6E9CA9447925E693EF46645F696400645DE67CEE150EA2DF1723443B0004' },
    20 operationType: 'insert',
    21 clusterTime: Timestamp { _bsontype: 'Timestamp', low_: 3, high_: 1575386350 },
    22 fullDocument: {
    23 _id: 5de67cee150ea2df1723443b,
    24 name: 'Sydney Harbour Home',
    25 bedrooms: 4,
    26 bathrooms: 2.5,
    27 address: { market: 'Sydney', country: 'Australia' }
    28 },
    29 ns: { db: 'sample_airbnb', coll: 'listingsAndReviews' },
    30 documentKey: { _id: 5de67cee150ea2df1723443b }
    31}
    Após 30 segundos, o seguinte será exibido:
    1Closing the change stream

Monitore o change stream usando hasNext() do ChangeStream

Na seção acima, usamos on() do EventEmitter para monitorar o change stream. Como alternativa, podemos criar um loop while que aguarda o próximo elemento no change stream usando hasNext() da classe ChangeStream do driver MongoDB Node.js.

Criar a função

Vamos criar uma função que monitorará as alterações no change stream usando o hasNext() do ChangeStream.
  1. Continuando a trabalhar em changeStreams.js, crie uma função assíncrona denominada monitorListingsUsingHasNext. A função deve ter os seguintes parâmetros: um MongoClient conectado, um tempo em ms que indica por quanto tempo o change stream deve ser monitorado e um pipeline de agregação que o change stream usará.
    1async function monitorListingsUsingHasNext(client, timeInMs = 60000, pipeline = []) {
    2
    3}
  2. Agora precisamos acessar a coleção que vamos monitorar quanto a alterações. Adicione o seguinte código a monitorListingsUsingHasNext().
    1const collection = client.db("sample_airbnb").collection("listingsAndReviews");
  3. Agora estamos prontos para criar nosso change stream. Podemos fazer isso usando o watch() de uma coleção. Adicione a seguinte linha abaixo do código existente em monitorListingsUsingHasNext().
    1const changeStream = collection.watch(pipeline);
  4. Poderíamos optar por deixar o fluxo de alterações aberto indefinidamente. Em vez disso, vamos chamar nossa função auxiliar que definirá um cronômetro e fechará o fluxo de alterações. Adicione a seguinte linha abaixo do código existente em monitorListingsUsingHasNext().
    1closeChangeStream(timeInMs, changeStream);
  5. Agora, vamos criar um loop while que aguardará novas alterações no fluxo de alterações. Podemos usar hasNext() do ChangeStream dentro do loop while. hasNext() aguardará o retorno verdadeiro até que uma nova alteração chegue ao fluxo de alterações. hasNext() lançará um erro assim que o fluxo de alterações for fechado, portanto, cercaremos nosso loop while com um bloco try { }. Se um erro for gerado, verificaremos se o fluxo de alterações está fechado. Se o fluxo de alterações estiver fechado, registraremos essas informações. Caso contrário, algo inesperado aconteceu, então jogaremos o erro. Adicione o seguinte código abaixo do código existente em monitorListingsUsingHasNext().
    1try {
    2 while (await changeStream.hasNext()) {
    3 console.log(await changeStream.next());
    4 }
    5} catch (error) {
    6 if (changeStream.isClosed()) {
    7 console.log("The change stream is closed. Will not wait on any more changes.")
    8 } else {
    9 throw error;
    10 }
    11}

Chamar a função

Agora que implementamos nossa função, vamos chamá-la!
  1. Dentro de main(), substitua sua chamada existente para monitorListingsUsingEventEmitter() por uma chamada para seu novo monitorListingsUsingHasNext():
    1await monitorListingsUsingHasNext(client);
  2. Salve seu arquivo.
  3. Execute seu script executando node changeStreams.js em seu shell. O fluxo de alterações será aberto por 60 segundos.
  4. Crie e atualize dados de amostra executando o nó changeStreamsTestData.js em um novo shell. Uma saída semelhante ao que vimos anteriormente será exibida em seu primeiro shell no qual você estiver executando changeStreams.js. Se você executar node changeStreamsTestData.js novamente antes que o segundo temporizador 60 seja concluído, verá uma saída semelhante novamente. Após 60 segundos, o seguinte será exibido:
    1Closing the change stream

Chame a função com um pipeline de agregação

Como mencionamos anteriormente, às vezes, você vai querer usar um pipeline de agregação para filtrar as alterações em seu change stream ou transformar os documentos de eventos do change stream. Vamos passar o pipeline de agregação que criamos em uma seção anterior para a nossa nova função.
  1. Atualize sua chamada existente para monitorListingsUsingHasNext() para deixar o fluxo de alterações aberto apenas por 30 segundos e usar o pipeline de agregação.
    1await monitorListingsUsingHasNext(client, 30000, pipeline);
  2. Salve seu arquivo.
  3. Execute seu script executando node changeStreams.js em seu shell. O fluxo de alterações será aberto por 30 segundos.
  4. Crie e atualize dados de amostra executando o nó changeStreamsTestData.js em um novo shell. Como o change stream está usando o pipeline que você acabou de criar, somente os documentos inseridos na collectionlistingsAndReviews que estão no mercado de Sydney, Austrália, estarão no change stream.Uma saída semelhante aoque vimos anteriormente ao usar um change stream com um aggregation pipeline será exibida em seu primeiro shell em que você está executando changeStreams.js. Após 30 segundos, o seguinte será exibido:
    1Closing the change stream

Monitore o fluxo de alterações usando a API Stream

Nas duas seções anteriores, usamos o on() do EventEmitter e o hasNext() do ChangeStreams para monitorar as alterações. Vamos examinar uma terceira maneira de monitorar um change stream: usando a API de Streams do Node.

Carregar o Stream Module

Para usar o módulo Stream, precisaremos carregá-lo.
  1. Continuando a trabalhar em changeStreams.js, carregue o módulo Stream na parte superior do arquivo.
    1const stream = require('stream');

Criar a função

Vamos criar uma função que monitorará as mudanças no fluxo de alterações usando a API Stream.
  1. Continuando a trabalhar em changeStreams.js, crie uma função assíncrona denominada monitorListingsUsingStreamAPI. A função deve ter os seguintes parâmetros: um MongoClient conectado, um tempo em ms que indica por quanto tempo o change stream deve ser monitorado e um pipeline de agregação que o change stream usará.
    1async function monitorListingsUsingStreamAPI(client, timeInMs = 60000, pipeline = []) {
    2
    3}
  2. Agora precisamos acessar a coleção que vamos monitorar quanto a alterações. Adicione o seguinte código a monitorListingsUsingStreamAPI().
    1const collection = client.db("sample_airbnb").collection("listingsAndReviews");
  3. Agora estamos prontos para criar nosso change stream. Podemos fazer isso usando o watch() de uma coleção. Adicione a seguinte linha abaixo do código existente em monitorListingsUsingStreamAPI().
    1const changeStream = collection.watch(pipeline);
  4. Agora estamos prontos para monitorar nosso change stream.Stream() do ChangeStream retornará um Node Readable stream. Chamaremos pipe() do Readable para extrair os dados do fluxo e gravá-los no console.
    1changeStream.stream().pipe(
    2 new stream.Writable({
    3 objectMode: true,
    4 write: function (doc, _, cb) {
    5 console.log(doc);
    6 cb();
    7 }
    8 })
    9);
  5. Poderíamos optar por deixar o fluxo de alterações aberto indefinidamente. Em vez disso, vamos chamar nossa função auxiliar que definirá um cronômetro e fechará o fluxo de alterações. Adicione a seguinte linha abaixo do código existente em monitorListingsUsingStreamAPI().
    1await closeChangeStream(timeInMs, changeStream);

Chamar a função

Agora que implementamos nossa função, vamos chamá-la!
  1. Dentro de main(), substitua sua chamada existente para monitorListingsUsingHasNext() por uma chamada para seu novo monitorListingsUsingStreamAPI():
    1await monitorListingsUsingStreamAPI(client);
  2. Salve seu arquivo.
  3. Execute seu script executando node changeStreams.js em seu shell. O fluxo de alterações será aberto por 60 segundos.
  4. Uma saída semelhante ao que vimos anteriormente será exibida em seu primeiro shell no qual você estiver executando changeStreams.js. Se você executar node changeStreamsTestData.js novamente antes que o segundo temporizador 60 seja concluído, verá uma saída semelhante novamente. Após 60 segundos, o seguinte será exibido:
    1Closing the change stream

Chame a função com um pipeline de agregação

Como mencionamos anteriormente, às vezes, você vai querer usar um pipeline de agregação para filtrar as alterações em seu change stream ou transformar os documentos de eventos do change stream. Vamos passar o pipeline de agregação que criamos em uma seção anterior para a nossa nova função.
  1. Atualize sua chamada existente para monitorListingsUsingStreamAPI() para deixar o fluxo de alterações aberto apenas por 30 segundos e usar o pipeline de agregação.
    1await monitorListingsUsingStreamAPI(client, 30000, pipeline);
  2. Salve seu arquivo.
  3. Execute seu script executando node changeStreams.js em seu shell. O fluxo de alterações será aberto por 30 segundos.
  4. Crie e atualize dados de amostra executando o nó changeStreamsTestData.js em um novo shell. Como o change stream está usando o pipeline que você acabou de criar, somente os documentos inseridos na collectionlistingsAndReviews que estão no mercado de Sydney, Austrália, estarão no change stream.Uma saída semelhante aoque vimos anteriormente ao usar um change stream com um aggregation pipeline será exibida em seu primeiro shell em que você está executando changeStreams.js. Após 30 segundos, o seguinte será exibido:
    1Closing the change stream

Retomar um change stream

Em algum momento, seu aplicativo provavelmente perderá a conexão com o change stream. Talvez ocorra um erro de rede e uma conexão entre o aplicativo e o banco de dados seja interrompida. Ou talvez seu aplicativo falhe e precise ser reiniciado (mas você é um desenvolvedor 10x e isso nunca aconteceria com você, certo?).
Nesses casos, talvez você queira retomar o fluxo de alterações de onde parou anteriormente para não perder nenhum dos eventos de alteração.
Cada documento de evento de change stream contém um token de retomada. O driver do Node.js armazena automaticamente o token de retomada no _id do documento do evento de change stream.
O aplicativo pode passar o token de retomada ao criar um novo fluxo de alterações. O fluxo de alterações incluirá todos os eventos que aconteceram após o evento associado ao token de retomada fornecido.
O driver do MongoDB Node.js tentará restabelecer automaticamente as conexões no caso de erros ou eleições transitórias de rede. Nesses casos, o driver usará sua cópia em cache do token de retomada mais recente para que nenhum evento de fluxo de alterações seja perdido.
No caso de uma falha ou reinicialização do aplicativo, o aplicativo precisará passar o token de retomada ao criar o fluxo de alterações para garantir que nenhum evento de fluxo de alterações seja perdido. Lembre-se de que o driver perderá sua cópia em cache do token de retomada mais recente quando o aplicativo for reiniciado, portanto, seu aplicativo deve armazenar o token de retomada.
Para obter mais informações e código de exemplo para retomar fluxos de alteração, consulte a documentação oficial.

O que são os MongoDB Atlas Triggers?

Os change streams permitem que você reaja imediatamente às alterações em seu banco de dados. Se você quiser estar constantemente monitorando as alterações em seu banco de dados, garantir que seu aplicativo que está monitorando o change stream esteja sempre ativo e não perca nenhum evento é possível... mas pode ser desafiador. É aqui que entram os triggers do MongoDB Atlas.
O MongoDB permite triggers no Atlas. Os triggers do Atlas permitem executar funções em tempo real com base em eventos do banco de dados (assim como fluxos de alterações) ou em intervalos agendados (como uma tarefa cronológica). Os triggers do Atlas têm algumas grandes vantagens:
  • Não se preocupe em programar o fluxo de alterações. Você simplesmente programa a função que será executada quando o evento do banco de dados for acionado.
  • Você não precisa se preocupar em gerenciar o servidor onde seu código de change stream está sendo executado. O Atlas toma conta do gerenciamento do servidor para você.
  • Você obtém uma interface do usuário útil para configurar seu trigger, o que significa que tem menos código para escrever.
Os triggers do Atlas têm algumas restrições. A maior elas que encontrei foi que as funções não suportavam importações de módulos (ou seja, importar e exigir). Isso mudou, e agora você pode carregar dependências externas que pode usar em suas funções. Consulte Carregar dependências externas para obter mais informações. Para saber mais sobre funções e suas restrições, consulte a documentação oficial das Funções de Realm.

Criar um MongoDB Atlas trigger

Assim como fizemos nas seções anteriores, vamos procurar novos anúncios no mercado de Sydney, Austrália. Em vez de trabalhar localmente em um editor de código para criar e monitorar um change stream, criaremos um trigger na interface web do Atlas.

Criar um gatilho

Vamos criar um trigger do Atlas que monitora a coleção listingsAndReviews e chama uma função sempre que um novo anúncio for adicionado ao mercado de Sydney, Austrália.
  1. Navegue até seu projeto no Atlas.
  2. Na seção Armazenamento de dados do painel de navegação esquerdo, clique em Triggers.
  3. Clique em Adicionar trigger. O assistente de Adicionar trigger será exibido.
  4. Na caixa de seleção Link Data Source(s), selecione o cluster que contém o banco de dados sample_airbnb e clique em Link. As alterações serão implantadas. A implantação pode levar um ou dois minutos. Role a tela até a parte superior da página para ver o status.
  5. Na caixa de seleção Selecionar um cluster..., selecione o cluster que contém o banco de dados sample_airbnb.
  6. Na caixa de seleção Selecionar um nome de banco de dados..., selecione sample_airbnb.
  7. Na caixa de seleção Selecionar um nome de coleção..., selecione listingsAndReviews.
  8. Na seção Tipo de operação, marque a caixa ao lado de Inserir.
  9. Na caixa Código de função, substitua o código comentado por uma chamada para registrar o evento de alteração. O código agora deve ter a seguinte aparência:
    1exports = function(changeEvent) {
    2 console.log(JSON.stringify(changeEvent.fullDocument));
    3};
  10. Podemos criar uma instrução $match para filtrar nossos eventos de alteração, assim como fizemos anteriormente com o pipeline de agregação que passamos para o fluxo de alterações em nosso script Node.js. Expanda a seção ADVANCED (OPTIONAL) na parte inferior da página e cole o seguinte na caixa de código Match Expression.
    1{
    2 "fullDocument.address.country": "Australia",
    3 "fullDocument.address.market": "Sydney"
    4}
  11. Clique em Salvar. O trigger será ativado. Desse ponto em diante, a função para registrar o evento de alteração será chamada sempre que um novo documento no mercado de Sydney, Austrália, for inserido na coleção listingsAndReviews.

Acione o trigger

Agora que configuramos o trigger, vamos criar dados de amostra que acionarão o trigger.
  1. Retorne ao shell na sua máquina local.
  2. Crie e atualize dados de amostra executando nodechangeStreamsTestData.js em um novo shell.

Ver os resultados do trigger

Quando você criou o trigger, o MongoDB Atlas criou automaticamente um aplicativo Realm para você denominado Triggers_RealmApp.
Atualmente, a função associada ao seu trigger não faz muito. Ela simplesmente imprime o documento do evento de alteração. Vamos ver os resultados nos registros do aplicativo Realm associados ao seu trigger.
  1. Retorne ao seu navegador onde você está visualizando seu trigger no Atlas.
  2. Na barra de navegação na parte superior da página, clique em Realm.
  3. No painel Aplicativos, clique em Triggers_RealmApp. O aplicativo Realm Triggers_RealmApp será aberto.
  4. Na seção GERENCIAR do painel de navegação esquerdo, clique em Logs. Duas entradas serão exibidas no painel Logs: uma para cada anúncio no mercado de Sydney, Austrália, que foi inserido na coleção.
  5. Clique na seta no início de cada linha no painel Logs para expandir a entrada do log. Aqui você pode ver o documento completo que foi inserido.
Se você inserir mais listagens no mercado de Sydney, Austrália, poderá atualizar a página Logs para ver os eventos de alteração.

Encerrando

Hoje vimos quatro maneiras diferentes de realizar a mesma tarefa de reagir imediatamente a alterações no banco de dados. Começamos escrevendo um script Node.js que monitorou um fluxo de alterações usando a classe integrada EventEmitter do Node.js. Em seguida, atualizamos o script Node.js para monitorar um fluxo de alterações usando a classe ChangeStream do driver Node.js. do MongoDB. Em seguida, atualizamos o script Node.js para monitorar um fluxo de alterações usando a API Stream. Por fim, criamos um trigger do Atlas para monitorar as alterações. Nos quatro casos, conseguimos usar $match para filtrar os eventos do fluxo de alterações.
Esta publicação incluiu muitos trechos de código criados sobre o código escrito na primeira publicação desta série Início rápido do MongoDB e Node.js. Para obter uma cópia completa do código usado na postagem de hoje, visite o Repositório GitHub de início rápido em Node.js.
Todos os exemplos que exploramos hoje faziam coisas relativamente simples sempre que um evento era acionado: eles registravam os eventos de alteração. Os fluxos de alterações e triggers se tornam realmente poderosos quando você começa a fazer mais em resposta a eventos de alteração. Por exemplo, você pode querer disparar alarmes, enviar e-mails, fazer pedidos, atualizar outros sistemas ou fazer outras coisas incríveis.
Esta é a última publicação da série de início rápido do Node.js e do MongoDB (pelo menos por enquanto!). Espero que você tenha aproveitado! Se tiver ideias de outros tópicos que gostaria de ver abordados, me diga na MongoDB Community.

Recursos adicionais


Ícone do FacebookÍcone do Twitterícone do linkedin
Avalie esse Início rápido
star-empty
star-empty
star-empty
star-empty
star-empty
Relacionado
Tutorial

Adicione memória ao seu aplicativo JavaScript RAG usando MongoDB e LangChain


Sep 18, 2024 | 9 min read
Início rápido

Início rápido: tipos de dados BSON - ObjectId


Sep 23, 2022 | 3 min read
Artigo

Usar o AWS Rekognition para analisar e marcar imagens carregadas


Sep 11, 2024 | 1 min read
Tutorial

Adição de notificações em tempo real ao Ghost CMS usando MongoDB e eventos enviados pelo servidor


Aug 14, 2023 | 7 min read
Sumário