Tutorial de fluxos de alterações e triggers com o Node.js
Lauren Schaefer17 min read • Published Feb 04, 2022 • Updated Aug 24, 2023
Avalie esse Início rápido
Às vezes, você precisa reagir imediatamente às mudanças em seu MongoDB database. Talvez você queira fazer um pedido com um distribuidor sempre que o estoque de um item cair abaixo de um determinado limite. Ou talvez você queira enviar uma notificação por e-mail sempre que o status de um pedido mudar. Independentemente do seu caso de uso específico, sempre que você quiser reagir imediatamente às alterações em seu MongoDB database, fluxos de alterações e triggers são opções fantásticas.
Se você está apenas se juntando a nós nesta série de Início rápido com MongoDB e Node.js, seja bem-vindo! Começamos explicando como conectar-se ao MongoDB e executar cada uma das operações CRUD (criar, ler, atualizar e excluir). Em seguida, abordamos tópicos mais avançados, como o framework de agregação e as transações. O código que escrevemos hoje usará a mesma estrutura do código que criamos na primeira publicação da série, portanto, se você tiver alguma dúvida sobre como começar ou como o código é estruturado, volte a essa postagem.
E, com isso, passemos aos fluxos de alterações e triggers! Este é um resumo do que abordaremos hoje:
Prefere um vídeo a um artigo? Confira o vídeo abaixo que aborda exatamente os mesmos tópicos que discuto neste artigo.
Comece hoje mesmo com um cluster M0 no Atlas. É gratuito para sempre e é a maneira mais fácil de experimentar as etapas desta série de blogs.
Os change streams permitem que você receba notificações sobre alterações feitas nos seus bancos de dados e coleções do MongoDB. Ao usar change streams, você pode optar por programar ações que serão executadas automaticamente sempre que ocorrer um evento de alteração.
Os change streams utilizam o framework de agregação, então você pode escolher filtrar eventos de alteração específicos ou transformar os documentos de eventos de alteração.
Por exemplo, digamos que eu queira ser notificado sempre que um novo anúncio no mercado de Sydney, Austrália, for adicionado à coleção ListingsAndReviews. Posso criar um change stream que monitore a coleção ListingsAndReviews e use um pipeline de agregação para fazer a correspondência com os anúncios em que estou interessado.
Vejamos três formas diferentes de implementar este change stream.
Como em todas as publicações nesta série de Início Rápido do MongoDB e Node.js, você precisa garantir que concluiu as etapas de pré-requisitos descritas na seção Configuração da primeira publicação da série.
É útil ter um script que gere dados de amostra quando estiver testando os change stream. Para ajudá-lo a gerar dados de amostra rapidamente, escrevi changeStreamsTestData.js. Baixe uma cópia do arquivo, atualize a constante
uri
para refletir suas informações de conexão do Atlas e execute node changeStreamsTestData.js
. O script fará o seguinte:- Criar 3 novos anúncios (Vistas para a Opera House, Sala privada em Londres e Casa de praia bonita)
- Atualize 2 desses anúncios (Vistas para a Opera House e Casa de praia bonita)
- Crie mais 2 anúncios (Villa italiana e Casa no Porto de Sydney)
- Excluir um anúncio (Sydney Harbour Home).
Agora que estamos prontos, vamos explorar três maneiras diferentes de trabalhar com um change stream no Node.js.
Para facilitar o acompanhamento desta publicação no blog, criei um modelo inicial para um script do Node.js que acessa um cluster do Atlas.
- Abra
template.js
no seu editor de código favorito. - Atualize o URI de conexão para apontar para o seu cluster do Atlas. Se não tiver certeza de como fazer isso, consulte a primeira publicação desta série.
- Salve o arquivo como
changeStreams.js
.
Você pode executar esse arquivo executando
node changeStreams.js
em seu shell. Nesse ponto, o arquivo simplesmente abre e fecha uma conexão com o cluster do Atlas, portanto, nenhuma saída é esperada. Se você vir DeprecationWarnings, poderá ignorá-los para os fins deste post.Independentemente de como monitoramos as mudanças em nosso fluxo de alterações, convém fechar o fluxo de alterações após um determinado período. Vamos criar uma função auxiliar para fazer exatamente isso.
- Cole a seguinte função em
changeStreams.js
.1 function closeChangeStream(timeInMs = 60000, changeStream) { 2 return new Promise((resolve) => { 3 setTimeout(() => { 4 console.log("Closing the change stream"); 5 resolve(changeStream.close()); 6 }, timeInMs) 7 }) 8 };
A classe ChangeStream do driver Node.js do MongoDB herda da classe de nó integrada EventEmitter. Como resultado, podemos usar a função on() do EventEmitter para adicionar uma função de ouvinte que será chamada sempre que ocorrer uma mudança no fluxo de alterações.
Vamos criar uma função que monitorará as mudanças no change stream usando
on()
do EventEmitter.- Continuando a trabalhar em
changeStreams.js
, crie uma função assíncrona denominadamonitorListingsUsingEventEmitter
. A função deve ter os seguintes parâmetros: um MongoClient conectado, um tempo em ms que indica por quanto tempo o change stream deve ser monitorado e um pipeline de agregação que o change stream usará.1 async function monitorListingsUsingEventEmitter(client, timeInMs = 60000, pipeline = []){ 2 3 } - Agora precisamos acessar a coleção que vamos monitorar quanto a alterações. Adicione o seguinte código a
monitorListingsUsingEventEmitter()
.1 const collection = client.db("sample_airbnb").collection("listingsAndReviews"); - Quando tivermos nosso change stream, poderemos adicionar um ouvinte a ele. Vamos registrar cada evento de alteração no console. Adicione a seguinte linha abaixo do código existente em
monitorListingsUsingEventEmitter()
.1 changeStream.on('change', (next) => { 2 console.log(next); 3 }); - Poderíamos optar por deixar o change stream aberto indefinidamente. Em vez disso, vamos chamar nossa função auxiliar para definir um cronômetro e fechar o change stream. Adicione a seguinte linha abaixo do código existente em
monitorListingsUsingEventEmitter()
.1 await closeChangeStream(timeInMs, changeStream);
Agora que implementamos nossa função, vamos chamá-la!
- Dentro de
main()
abaixo do comentário que dizMake the appropriate DB calls
, chame sua funçãomonitorListingsUsingEventEmitter()
:1 await monitorListingsUsingEventEmitter(client); - Salve seu arquivo.
- Execute seu script executando
node changeStreams.js
em seu shell. O fluxo de alterações será aberto por 60 segundos. - Crie e atualize dados de amostra executando o nó changeStreamsTestData.js em um novo shell. Uma saída semelhante à seguinte será exibida em seu primeiro shell onde você estiver executando o
changeStreams.js
.1 { 2 _id: { _data: '825DE67A42000000012B022C0100296E5A10046BBC1C6A9CBB4B6E9CA9447925E693EF46645F696400645DE67A42113EA7DE6472E7640004' }, 3 operationType: 'insert', 4 clusterTime: Timestamp { _bsontype: 'Timestamp', low_: 1, high_: 1575385666 }, 5 fullDocument: { 6 _id: 5de67a42113ea7de6472e764, 7 name: 'Opera House Views', 8 summary: 'Beautiful apartment with views of the iconic Sydney Opera House', 9 property_type: 'Apartment', 10 bedrooms: 1, 11 bathrooms: 1, 12 beds: 1, 13 address: { market: 'Sydney', country: 'Australia' } 14 }, 15 ns: { db: 'sample_airbnb', coll: 'listingsAndReviews' }, 16 documentKey: { _id: 5de67a42113ea7de6472e764 } 17 } 18 { 19 _id: { _data: '825DE67A42000000022B022C0100296E5A10046BBC1C6A9CBB4B6E9CA9447925E693EF46645F696400645DE67A42113EA7DE6472E7650004' }, 20 operationType: 'insert', 21 clusterTime: Timestamp { _bsontype: 'Timestamp', low_: 2, high_: 1575385666 }, 22 fullDocument: { 23 _id: 5de67a42113ea7de6472e765, 24 name: 'Private room in London', 25 property_type: 'Apartment', 26 bedrooms: 1, 27 bathroom: 1 28 }, 29 ns: { db: 'sample_airbnb', coll: 'listingsAndReviews' }, 30 documentKey: { _id: 5de67a42113ea7de6472e765 } 31 } 32 { 33 _id: { _data: '825DE67A42000000032B022C0100296E5A10046BBC1C6A9CBB4B6E9CA9447925E693EF46645F696400645DE67A42113EA7DE6472E7660004' }, 34 operationType: 'insert', 35 clusterTime: Timestamp { _bsontype: 'Timestamp', low_: 3, high_: 1575385666 }, 36 fullDocument: { 37 _id: 5de67a42113ea7de6472e766, 38 name: 'Beautiful Beach House', 39 summary: 'Enjoy relaxed beach living in this house with a private beach', 40 bedrooms: 4, 41 bathrooms: 2.5, 42 beds: 7, 43 last_review: 2019-12-03T15:07:46.730Z 44 }, 45 ns: { db: 'sample_airbnb', coll: 'listingsAndReviews' }, 46 documentKey: { _id: 5de67a42113ea7de6472e766 } 47 } 48 { 49 _id: { _data: '825DE67A42000000042B022C0100296E5A10046BBC1C6A9CBB4B6E9CA9447925E693EF46645F696400645DE67A42113EA7DE6472E7640004' }, 50 operationType: 'update', 51 clusterTime: Timestamp { _bsontype: 'Timestamp', low_: 4, high_: 1575385666 }, 52 ns: { db: 'sample_airbnb', coll: 'listingsAndReviews' }, 53 documentKey: { _id: 5de67a42113ea7de6472e764 }, 54 updateDescription: { 55 updatedFields: { beds: 2 }, 56 removedFields: [] 57 } 58 } 59 { 60 _id: { _data: '825DE67A42000000052B022C0100296E5A10046BBC1C6A9CBB4B6E9CA9447925E693EF46645F696400645DE67A42113EA7DE6472E7660004' }, 61 operationType: 'update', 62 clusterTime: Timestamp { _bsontype: 'Timestamp', low_: 5, high_: 1575385666 }, 63 ns: { db: 'sample_airbnb', coll: 'listingsAndReviews' }, 64 documentKey: { _id: 5de67a42113ea7de6472e766 }, 65 updateDescription: { 66 updatedFields: { address: [Object] }, 67 removedFields: [] 68 } 69 } 70 { 71 _id: { _data: '825DE67A42000000062B022C0100296E5A10046BBC1C6A9CBB4B6E9CA9447925E693EF46645F696400645DE67A42113EA7DE6472E7670004' }, 72 operationType: 'insert', 73 clusterTime: Timestamp { _bsontype: 'Timestamp', low_: 6, high_: 1575385666 }, 74 fullDocument: { 75 _id: 5de67a42113ea7de6472e767, 76 name: 'Italian Villa', 77 property_type: 'Entire home/apt', 78 bedrooms: 6, 79 bathrooms: 4, 80 address: { market: 'Cinque Terre', country: 'Italy' } 81 }, 82 ns: { db: 'sample_airbnb', coll: 'listingsAndReviews' }, 83 documentKey: { _id: 5de67a42113ea7de6472e767 } 84 } 85 { 86 _id: { _data: '825DE67A42000000072B022C0100296E5A10046BBC1C6A9CBB4B6E9CA9447925E693EF46645F696400645DE67A42113EA7DE6472E7680004' }, 87 operationType: 'insert', 88 clusterTime: Timestamp { _bsontype: 'Timestamp', low_: 7, high_: 1575385666 }, 89 fullDocument: { 90 _id: 5de67a42113ea7de6472e768, 91 name: 'Sydney Harbour Home', 92 bedrooms: 4, 93 bathrooms: 2.5, 94 address: { market: 'Sydney', country: 'Australia' } }, 95 ns: { db: 'sample_airbnb', coll: 'listingsAndReviews' }, 96 documentKey: { _id: 5de67a42113ea7de6472e768 } 97 } 98 { 99 _id: { _data: '825DE67A42000000082B022C0100296E5A10046BBC1C6A9CBB4B6E9CA9447925E693EF46645F696400645DE67A42113EA7DE6472E7680004' }, 100 operationType: 'delete', 101 clusterTime: Timestamp { _bsontype: 'Timestamp', low_: 8, high_: 1575385666 }, 102 ns: { db: 'sample_airbnb', coll: 'listingsAndReviews' }, 103 documentKey: { _id: 5de67a42113ea7de6472e768 } 104 } Se você executarnode changeStreamsTestData.js
novamente antes que timer de 60 segundos tenha sido concluído, verá uma saída semelhante.Após 60 segundos, o seguinte será exibido:1 Closing the change stream
Em alguns casos, você não se importará com todos os eventos de alteração que ocorrem em uma coleção. Em vez disso, será uma boa limitar as alterações que você está monitorando. Você pode usar um pipeline de agregação para filtrar as alterações ou transformar os documentos de eventos do fluxo de alterações.
No nosso caso, só nos interessam os novos anúncios no mercado de Sydney, Austrália. Vamos criar um pipeline de agregação para filtrar apenas as alterações na coleção
listingsAndReviews
.Para saber mais sobre quais estágios do pipeline de agregação podem ser usados com fluxos de alterações, consulte a documentação oficial do fluxo de alterações.
- Dentro de
main()
e acima de sua chamada existente paramonitorListingsUsingEventEmitter()
, crie um pipeline de agregação:1 const pipeline = [ 2 { 3 '$match': { 4 'operationType': 'insert', 5 'fullDocument.address.country': 'Australia', 6 'fullDocument.address.market': 'Sydney' 7 }, 8 } 9 ]; - Vamos usar esse pipeline para filtrar as alterações em nosso change stream. Atualize sua chamada existente para
monitorListingsUsingEventEmitter()
para deixar o change stream aberto por 30 segundos e usar o pipeline.1 await monitorListingsUsingEventEmitter(client, 30000, pipeline); - Salve seu arquivo.
- Execute seu script executando
node changeStreams.js
em seu shell. O fluxo de alterações será aberto por 30 segundos. - Crie e atualize dados de amostra executando o nó changeStreamsTestData.js em um novo shell. Como o fluxo de alterações está usando o pipeline que você acabou de criar, somente os documentos inseridos na coleção
listingsAndReviews
que estão no mercado de Sydney, Austrália, estarão no fluxo de alterações. Uma saída semelhante à seguinte será exibida em sua primeira shell onde você estiver executandochangeStreams.js
.1 { 2 _id: { _data: '825DE67CED000000012B022C0100296E5A10046BBC1C6A9CBB4B6E9CA9447925E693EF46645F696400645DE67CED150EA2DF172344370004' }, 3 operationType: 'insert', 4 clusterTime: Timestamp { _bsontype: 'Timestamp', low_: 1, high_: 1575386349 }, 5 fullDocument: { 6 _id: 5de67ced150ea2df17234437, 7 name: 'Opera House Views', 8 summary: 'Beautiful apartment with views of the iconic Sydney Opera House', 9 property_type: 'Apartment', 10 bedrooms: 1, 11 bathrooms: 1, 12 beds: 1, 13 address: { market: 'Sydney', country: 'Australia' } 14 }, 15 ns: { db: 'sample_airbnb', coll: 'listingsAndReviews' }, 16 documentKey: { _id: 5de67ced150ea2df17234437 } 17 } 18 { 19 _id: { _data: '825DE67CEE000000032B022C0100296E5A10046BBC1C6A9CBB4B6E9CA9447925E693EF46645F696400645DE67CEE150EA2DF1723443B0004' }, 20 operationType: 'insert', 21 clusterTime: Timestamp { _bsontype: 'Timestamp', low_: 3, high_: 1575386350 }, 22 fullDocument: { 23 _id: 5de67cee150ea2df1723443b, 24 name: 'Sydney Harbour Home', 25 bedrooms: 4, 26 bathrooms: 2.5, 27 address: { market: 'Sydney', country: 'Australia' } 28 }, 29 ns: { db: 'sample_airbnb', coll: 'listingsAndReviews' }, 30 documentKey: { _id: 5de67cee150ea2df1723443b } 31 } Após 30 segundos, o seguinte será exibido:1 Closing the change stream
Na seção acima, usamos
on()
do EventEmitter para monitorar o change stream. Como alternativa, podemos criar um loop while
que aguarda o próximo elemento no change stream usando hasNext() da classe ChangeStream do driver MongoDB Node.js.Vamos criar uma função que monitorará as alterações no change stream usando o
hasNext()
do ChangeStream.- Continuando a trabalhar em
changeStreams.js
, crie uma função assíncrona denominadamonitorListingsUsingHasNext
. A função deve ter os seguintes parâmetros: um MongoClient conectado, um tempo em ms que indica por quanto tempo o change stream deve ser monitorado e um pipeline de agregação que o change stream usará.1 async function monitorListingsUsingHasNext(client, timeInMs = 60000, pipeline = []) { 2 3 } - Agora precisamos acessar a coleção que vamos monitorar quanto a alterações. Adicione o seguinte código a
monitorListingsUsingHasNext()
.1 const collection = client.db("sample_airbnb").collection("listingsAndReviews"); - Poderíamos optar por deixar o fluxo de alterações aberto indefinidamente. Em vez disso, vamos chamar nossa função auxiliar que definirá um cronômetro e fechará o fluxo de alterações. Adicione a seguinte linha abaixo do código existente em
monitorListingsUsingHasNext()
.1 closeChangeStream(timeInMs, changeStream); - Agora, vamos criar um loop
while
que aguardará novas alterações no fluxo de alterações. Podemos usar hasNext() do ChangeStream dentro do loopwhile
.hasNext()
aguardará o retorno verdadeiro até que uma nova alteração chegue ao fluxo de alterações.hasNext()
lançará um erro assim que o fluxo de alterações for fechado, portanto, cercaremos nosso loopwhile
com um blocotry { }
. Se um erro for gerado, verificaremos se o fluxo de alterações está fechado. Se o fluxo de alterações estiver fechado, registraremos essas informações. Caso contrário, algo inesperado aconteceu, então jogaremos o erro. Adicione o seguinte código abaixo do código existente emmonitorListingsUsingHasNext()
.1 try { 2 while (await changeStream.hasNext()) { 3 console.log(await changeStream.next()); 4 } 5 } catch (error) { 6 if (changeStream.isClosed()) { 7 console.log("The change stream is closed. Will not wait on any more changes.") 8 } else { 9 throw error; 10 } 11 }
Agora que implementamos nossa função, vamos chamá-la!
- Dentro de
main()
, substitua sua chamada existente paramonitorListingsUsingEventEmitter()
por uma chamada para seu novomonitorListingsUsingHasNext()
:1 await monitorListingsUsingHasNext(client); - Salve seu arquivo.
- Execute seu script executando
node changeStreams.js
em seu shell. O fluxo de alterações será aberto por 60 segundos. - Crie e atualize dados de amostra executando o nó changeStreamsTestData.js em um novo shell. Uma saída semelhante ao que vimos anteriormente será exibida em seu primeiro shell no qual você estiver executando
changeStreams.js
. Se você executarnode changeStreamsTestData.js
novamente antes que o segundo temporizador 60 seja concluído, verá uma saída semelhante novamente. Após 60 segundos, o seguinte será exibido:1 Closing the change stream
Como mencionamos anteriormente, às vezes, você vai querer usar um pipeline de agregação para filtrar as alterações em seu change stream ou transformar os documentos de eventos do change stream. Vamos passar o pipeline de agregação que criamos em uma seção anterior para a nossa nova função.
- Atualize sua chamada existente para
monitorListingsUsingHasNext()
para deixar o fluxo de alterações aberto apenas por 30 segundos e usar o pipeline de agregação.1 await monitorListingsUsingHasNext(client, 30000, pipeline); - Salve seu arquivo.
- Execute seu script executando
node changeStreams.js
em seu shell. O fluxo de alterações será aberto por 30 segundos. - Crie e atualize dados de amostra executando o nó changeStreamsTestData.js em um novo shell. Como o change stream está usando o pipeline que você acabou de criar, somente os documentos inseridos na collection
listingsAndReviews
que estão no mercado de Sydney, Austrália, estarão no change stream.Uma saída semelhante aoque vimos anteriormente ao usar um change stream com um aggregation pipeline será exibida em seu primeiro shell em que você está executandochangeStreams.js
. Após 30 segundos, o seguinte será exibido:1 Closing the change stream
Nas duas seções anteriores, usamos o
on()
do EventEmitter e o hasNext()
do ChangeStreams para monitorar as alterações. Vamos examinar uma terceira maneira de monitorar um change stream: usando a API de Streams do Node.Para usar o módulo Stream, precisaremos carregá-lo.
- Continuando a trabalhar em
changeStreams.js
, carregue o módulo Stream na parte superior do arquivo.1 const stream = require('stream');
Vamos criar uma função que monitorará as mudanças no fluxo de alterações usando a API Stream.
- Continuando a trabalhar em
changeStreams.js
, crie uma função assíncrona denominadamonitorListingsUsingStreamAPI
. A função deve ter os seguintes parâmetros: um MongoClient conectado, um tempo em ms que indica por quanto tempo o change stream deve ser monitorado e um pipeline de agregação que o change stream usará.1 async function monitorListingsUsingStreamAPI(client, timeInMs = 60000, pipeline = []) { 2 3 } - Agora precisamos acessar a coleção que vamos monitorar quanto a alterações. Adicione o seguinte código a
monitorListingsUsingStreamAPI()
.1 const collection = client.db("sample_airbnb").collection("listingsAndReviews"); - Agora estamos prontos para monitorar nosso change stream.Stream() do ChangeStream retornará um Node Readable stream. Chamaremos pipe() do Readable para extrair os dados do fluxo e gravá-los no console.
1 changeStream.stream().pipe( 2 new stream.Writable({ 3 objectMode: true, 4 write: function (doc, _, cb) { 5 console.log(doc); 6 cb(); 7 } 8 }) 9 ); - Poderíamos optar por deixar o fluxo de alterações aberto indefinidamente. Em vez disso, vamos chamar nossa função auxiliar que definirá um cronômetro e fechará o fluxo de alterações. Adicione a seguinte linha abaixo do código existente em
monitorListingsUsingStreamAPI()
.1 await closeChangeStream(timeInMs, changeStream);
Agora que implementamos nossa função, vamos chamá-la!
- Dentro de
main()
, substitua sua chamada existente paramonitorListingsUsingHasNext()
por uma chamada para seu novomonitorListingsUsingStreamAPI()
:1 await monitorListingsUsingStreamAPI(client); - Salve seu arquivo.
- Execute seu script executando
node changeStreams.js
em seu shell. O fluxo de alterações será aberto por 60 segundos. - Uma saída semelhante ao que vimos anteriormente será exibida em seu primeiro shell no qual você estiver executando
changeStreams.js
. Se você executarnode changeStreamsTestData.js
novamente antes que o segundo temporizador 60 seja concluído, verá uma saída semelhante novamente. Após 60 segundos, o seguinte será exibido:1 Closing the change stream
Como mencionamos anteriormente, às vezes, você vai querer usar um pipeline de agregação para filtrar as alterações em seu change stream ou transformar os documentos de eventos do change stream. Vamos passar o pipeline de agregação que criamos em uma seção anterior para a nossa nova função.
- Atualize sua chamada existente para
monitorListingsUsingStreamAPI()
para deixar o fluxo de alterações aberto apenas por 30 segundos e usar o pipeline de agregação.1 await monitorListingsUsingStreamAPI(client, 30000, pipeline); - Salve seu arquivo.
- Execute seu script executando
node changeStreams.js
em seu shell. O fluxo de alterações será aberto por 30 segundos. - Crie e atualize dados de amostra executando o nó changeStreamsTestData.js em um novo shell. Como o change stream está usando o pipeline que você acabou de criar, somente os documentos inseridos na collection
listingsAndReviews
que estão no mercado de Sydney, Austrália, estarão no change stream.Uma saída semelhante aoque vimos anteriormente ao usar um change stream com um aggregation pipeline será exibida em seu primeiro shell em que você está executandochangeStreams.js
. Após 30 segundos, o seguinte será exibido:1 Closing the change stream
Em algum momento, seu aplicativo provavelmente perderá a conexão com o change stream. Talvez ocorra um erro de rede e uma conexão entre o aplicativo e o banco de dados seja interrompida. Ou talvez seu aplicativo falhe e precise ser reiniciado (mas você é um desenvolvedor 10x e isso nunca aconteceria com você, certo?).
Nesses casos, talvez você queira retomar o fluxo de alterações de onde parou anteriormente para não perder nenhum dos eventos de alteração.
Cada documento de evento de change stream contém um token de retomada. O driver do Node.js armazena automaticamente o token de retomada no
_id
do documento do evento de change stream.O aplicativo pode passar o token de retomada ao criar um novo fluxo de alterações. O fluxo de alterações incluirá todos os eventos que aconteceram após o evento associado ao token de retomada fornecido.
O driver do MongoDB Node.js tentará restabelecer automaticamente as conexões no caso de erros ou eleições transitórias de rede. Nesses casos, o driver usará sua cópia em cache do token de retomada mais recente para que nenhum evento de fluxo de alterações seja perdido.
No caso de uma falha ou reinicialização do aplicativo, o aplicativo precisará passar o token de retomada ao criar o fluxo de alterações para garantir que nenhum evento de fluxo de alterações seja perdido. Lembre-se de que o driver perderá sua cópia em cache do token de retomada mais recente quando o aplicativo for reiniciado, portanto, seu aplicativo deve armazenar o token de retomada.
Para obter mais informações e código de exemplo para retomar fluxos de alteração, consulte a documentação oficial.
Os change streams permitem que você reaja imediatamente às alterações em seu banco de dados. Se você quiser estar constantemente monitorando as alterações em seu banco de dados, garantir que seu aplicativo que está monitorando o change stream esteja sempre ativo e não perca nenhum evento é possível... mas pode ser desafiador. É aqui que entram os triggers do MongoDB Atlas.
O MongoDB permite triggers no Atlas. Os triggers do Atlas permitem executar funções em tempo real com base em eventos do banco de dados (assim como fluxos de alterações) ou em intervalos agendados (como uma tarefa cronológica). Os triggers do Atlas têm algumas grandes vantagens:
- Não se preocupe em programar o fluxo de alterações. Você simplesmente programa a função que será executada quando o evento do banco de dados for acionado.
- Você não precisa se preocupar em gerenciar o servidor onde seu código de change stream está sendo executado. O Atlas toma conta do gerenciamento do servidor para você.
- Você obtém uma interface do usuário útil para configurar seu trigger, o que significa que tem menos código para escrever.
Os triggers do Atlas têm algumas restrições. A maior elas que encontrei foi que as funções não suportavam importações de módulos (ou seja, importar e exigir). Isso mudou, e agora você pode carregar dependências externas que pode usar em suas funções. Consulte Carregar dependências externas para obter mais informações. Para saber mais sobre funções e suas restrições, consulte a documentação oficial das Funções de Realm.
Assim como fizemos nas seções anteriores, vamos procurar novos anúncios no mercado de Sydney, Austrália. Em vez de trabalhar localmente em um editor de código para criar e monitorar um change stream, criaremos um trigger na interface web do Atlas.
Vamos criar um trigger do Atlas que monitora a coleção
listingsAndReviews
e chama uma função sempre que um novo anúncio for adicionado ao mercado de Sydney, Austrália.- Na seção Armazenamento de dados do painel de navegação esquerdo, clique em Triggers.
- Clique em Adicionar trigger. O assistente de Adicionar trigger será exibido.
- Na caixa de seleção Link Data Source(s), selecione o cluster que contém o banco de dados
sample_airbnb
e clique em Link. As alterações serão implantadas. A implantação pode levar um ou dois minutos. Role a tela até a parte superior da página para ver o status. - Na caixa de seleção Selecionar um cluster..., selecione o cluster que contém o banco de dados
sample_airbnb
. - Na caixa de seleção Selecionar um nome de banco de dados..., selecione sample_airbnb.
- Na caixa de seleção Selecionar um nome de coleção..., selecione listingsAndReviews.
- Na seção Tipo de operação, marque a caixa ao lado de Inserir.
- Na caixa Código de função, substitua o código comentado por uma chamada para registrar o evento de alteração. O código agora deve ter a seguinte aparência:
1 exports = function(changeEvent) { 2 console.log(JSON.stringify(changeEvent.fullDocument)); 3 }; - Podemos criar uma instrução $match para filtrar nossos eventos de alteração, assim como fizemos anteriormente com o pipeline de agregação que passamos para o fluxo de alterações em nosso script Node.js. Expanda a seção ADVANCED (OPTIONAL) na parte inferior da página e cole o seguinte na caixa de código Match Expression.
1 { 2 "fullDocument.address.country": "Australia", 3 "fullDocument.address.market": "Sydney" 4 } - Clique em Salvar. O trigger será ativado. Desse ponto em diante, a função para registrar o evento de alteração será chamada sempre que um novo documento no mercado de Sydney, Austrália, for inserido na coleção
listingsAndReviews
.
Agora que configuramos o trigger, vamos criar dados de amostra que acionarão o trigger.
- Retorne ao shell na sua máquina local.
Quando você criou o trigger, o MongoDB Atlas criou automaticamente um aplicativo Realm para você denominado Triggers_RealmApp.
Atualmente, a função associada ao seu trigger não faz muito. Ela simplesmente imprime o documento do evento de alteração. Vamos ver os resultados nos registros do aplicativo Realm associados ao seu trigger.
- Retorne ao seu navegador onde você está visualizando seu trigger no Atlas.
- Na barra de navegação na parte superior da página, clique em Realm.
- No painel Aplicativos, clique em Triggers_RealmApp. O aplicativo Realm Triggers_RealmApp será aberto.
- Na seção GERENCIAR do painel de navegação esquerdo, clique em Logs. Duas entradas serão exibidas no painel Logs: uma para cada anúncio no mercado de Sydney, Austrália, que foi inserido na coleção.
- Clique na seta no início de cada linha no painel Logs para expandir a entrada do log. Aqui você pode ver o documento completo que foi inserido.
Se você inserir mais listagens no mercado de Sydney, Austrália, poderá atualizar a página Logs para ver os eventos de alteração.
Hoje vimos quatro maneiras diferentes de realizar a mesma tarefa de reagir imediatamente a alterações no banco de dados. Começamos escrevendo um script Node.js que monitorou um fluxo de alterações usando a classe integrada EventEmitter do Node.js. Em seguida, atualizamos o script Node.js para monitorar um fluxo de alterações usando a classe ChangeStream do driver Node.js. do MongoDB. Em seguida, atualizamos o script Node.js para monitorar um fluxo de alterações usando a API Stream. Por fim, criamos um trigger do Atlas para monitorar as alterações. Nos quatro casos, conseguimos usar $match para filtrar os eventos do fluxo de alterações.
Esta publicação incluiu muitos trechos de código criados sobre o código escrito na primeira publicação desta série Início rápido do MongoDB e Node.js. Para obter uma cópia completa do código usado na postagem de hoje, visite o Repositório GitHub de início rápido em Node.js.
Todos os exemplos que exploramos hoje faziam coisas relativamente simples sempre que um evento era acionado: eles registravam os eventos de alteração. Os fluxos de alterações e triggers se tornam realmente poderosos quando você começa a fazer mais em resposta a eventos de alteração. Por exemplo, você pode querer disparar alarmes, enviar e-mails, fazer pedidos, atualizar outros sistemas ou fazer outras coisas incríveis.
Esta é a última publicação da série de início rápido do Node.js e do MongoDB (pelo menos por enquanto!). Espero que você tenha aproveitado! Se tiver ideias de outros tópicos que gostaria de ver abordados, me diga na MongoDB Community.