Explore o novo chatbot do Developer Center! O MongoDB AI chatbot pode ser acessado na parte superior da sua navegação para responder a todas as suas perguntas sobre o MongoDB .

Saiba por que o MongoDB foi selecionado como um líder no 2024 Gartner_Magic Quadrupnt()
Desenvolvedor do MongoDB
Central de desenvolvedor do MongoDBchevron-right
Idiomaschevron-right
Pythonchevron-right

Fluxos de alterações do MongoDB com Python

Naomi Pentrel9 min read • Published Feb 05, 2022 • Updated Sep 23, 2022
MongoDBFluxos de alteraçõesPython
Ícone do FacebookÍcone do Twitterícone do linkedin
Avalie esse Início rápido
star-empty
star-empty
star-empty
star-empty
star-empty

Introdução

Logotipo do QuickStart Python
Change streams permite que você ouça as alterações que ocorrem no seu MongoDB database. No MongoDB 3.6 ou superior, esta funcionalidade permite criar aplicativos que podem responder imediatamente a alterações de dados em tempo real. Neste tutorial, mostraremos como usar change streams com o Python. Você irá:
  • Saber mais sobre change streams
  • Crie um programa que ouça inserções
  • Alterar o programa para ouvir outros tipos de eventos
  • Altere o programa para ouvir notificações específicas
Para acompanhar o processo, você pode criar um ambiente de teste usando as etapas abaixo. Isso é opcional, mas altamente recomendado, pois permitirá que você teste o uso da funcionalidade de change stream com os exemplos fornecidos. Você receberá todos os comandos, mas é necessária alguma familiaridade com o MongoDB.

Saiba mais sobre Change Streams

A capacidade de ouvir alterações específicas nos dados permite que um aplicativo seja muito mais rápido na resposta às mudanças. Se um usuário do seu sistema atualizar as informações, o sistema poderá ouvir e propagar essas alterações imediatamente. Por exemplo, isso pode significar que os usuários não precisam mais clicar em atualizar para ver quando as alterações foram aplicadas. Ou, se as alterações de um usuário em um sistema precisarem da aprovação de alguém, outro sistema poderá ouvir as alterações e enviar notificações solicitando aprovações instantaneamente.
Antes dos change streams, os aplicativos que precisavam saber sobre a adição de novos dados em tempo real tinham que pesquisar dados continuamente ou confiar em outros mecanismos de atualização. Uma técnica comum, embora complexa, para monitorar as mudanças era seguir o Registro de Operações (Oplog) do MongoDB. O Oplog faz parte do sistema de replicação do MongoDB e, como tal, já acompanha modificações no banco de dados, mas não é fácil de usar para a lógica de negócios. Change streams são criados sobre o Oplog, mas fornecem uma API nativa que melhora a eficiência e a usabilidade. Observe que você não pode abrir um change stream em uma coleção em um MongoDB Server autônomo porque o recurso depende do Oplog, que é usado apenas em conjuntos de réplicas.
Ao registrar um fluxo de alterações, você precisa especificar a coleção e quais tipos de alterações deseja ouvir. Você pode fazer isso usando o $match e alguns outros estágios do pipeline de agregação que limitam a quantidade de dados que você receberá. Se seu banco de dados impõe autenticação e autorização, os fluxos de alterações fornecem o mesmo controle de acesso das queries normais.

Teste os recursos do Change Stream

A melhor maneira de entender como os fluxos de alterações operam é trabalhando com eles. Na próxima seção, mostraremos como configurar um servidor e scripts. Depois de concluir a configuração, você terá dois scripts: um script Python ouve as notificações do fluxo de alterações e as imprime. O outro script imita um aplicativo executando operações de inserção, atualização, substituição e exclusão para que você possa ver as notificações na saída do primeiro script. Você também aprenderá a limitar as notificações àquelas em que está interessado.

Configurar o PyMongo

Para começar, configure um ambiente virtual usando o Virtualenv. O Virtualenv permite isolar dependências do seu projeto de outros projetos. Crie um diretório para este projeto e copie o seguinte para um arquivo chamado requires.txt no seu novo diretório:
1pymongo==3.8.0
2dnspython
Para criar e ativar seu ambiente virtual, execute os seguintes comandos em seu terminal:
1virtualenv venv # sets up the environment
2source venv/bin/activate # activates the environment
3pip3 install -r requirements.txt # installs our dependencies
Para facilitar a leitura, presumimos que você esteja executando o Python 3 com os comandos python3 e pip3 . Se você estiver executando o Python 2.7, substitua python e pip por esses comandos.

Configure seu cluster

Analisaremos duas opções para configurar um conjunto de réplicas de teste do MongoDB para nos conectarmos. Se você tiver o MongoDB 3.6 ou posterior instalado e se sentir confortável em fazer alterações em sua configuração local, escolha essa opção, siga o guia no apêndice e pule para a próxima seção.
Se você não tiver o MongoDB instalado, preferir não misturar com sua configuração local ou se você acabou de começar a usar o MongoDB, recomendamos que você configure um MongoDB Atlas cluster; há uma camada grátis que oferece um conjunto de réplica de três nós, ideal para experimentar e aprender. Basta seguir estas etapas até obter a string de conexão do URI na etapa 8. Pegue essa string de conexão URI, insira a senha onde está escrito <password>e adicione-a ao seu ambiente executando
1export CHANGE_STREAM_DB="mongodb+srv://user:<password>@example-xkfzv.mongodb.net/test?retryWrites=true"
no seu terminal. A string que você usa como valor será diferente.

Ouça inserções de um aplicativo

Antes de continuar, teste rapidamente sua configuração. Crie um arquivo test.py com o seguinte conteúdo:
1import os
2import pymongo
3
4client = pymongo.MongoClient(os.environ['CHANGE_STREAM_DB'])
5print(client.changestream.collection.insert_one({"hello": "world"}).inserted_id)
Ao executar python3 test.py você deverá ver um ObjectId sendo impresso.
Agora que você confirmou sua configuração, vamos criar o pequeno programa que escutará as alterações no banco de dados usando um change stream. Crie um arquivo change_streams.py diferente com o seguinte conteúdo:
1import os
2import pymongo
3from bson.json_util import dumps
4
5client = pymongo.MongoClient(os.environ['CHANGE_STREAM_DB'])
6change_stream = client.changestream.collection.watch()
7for change in change_stream:
8 print(dumps(change))
9 print('') # for readability only
Execute python3 change_streams.py, você notará que o programa não imprime nada e apenas espera que as operações aconteçam na coleção especificada. Enquanto mantém o programa change_streams em execução, abra outra janela do terminal e execute python3 test.py. Você terá que executar o mesmo comando de exportação executado na seção Configurar seu cluster para adicionar a variável de ambiente à nova janela do terminal.
Ao verificar a janela do terminal que está executando o programa change_streams, você verá que a operação de inserção foi registrada. Deve ser como a saída abaixo, mas com um ObjectId diferente e com um valor diferente para $binary.
1➜ python3 change_streams.py
2{"_id": {"_data": {"$binary": "glsIjGUAAAABRmRfaWQAZFsIjGXiJuWPOIv2PgBaEAQIaEd7r8VFkazelcuRgfgeBA==", "$type": "00"}}, "operationType": "insert", "fullDocument": {"_id": {"$oid": "5b088c65e226e58f388bf63e"}, "hello": "world"}, "ns": {"db": "changestream", "coll": "collection"}, "documentKey": {"_id": {"$oid": "5b088c65e226e58f388bf63e"}}}

Ouça diferentes tipos de eventos

Você pode ouvir quatro tipos de eventos baseados em documentos:
  • inserir
  • Atualização
  • Substituir
  • Excluir
Dependendo do tipo de evento, a estrutura do documento que você receberá será ligeiramente diferente, mas você sempre receberá o seguinte:
1{
2 _id: <resume_token>,
3 operationType: "<type>",
4 ns: {db: "<db name>", coll: "<collection name>"},
5 documentKey: { <unique identifier> }
6}
No caso de inserções e substituições, o fullDocument também é fornecido por padrão. No caso de operações de atualização, o campo extra fornecido é updateDescription e dá o delta do documento (ou seja, a diferença entre o documento antes e depois da operação). Por padrão, as operações de atualização incluem apenas o delta entre o documento antes e depois da operação. Para obter o documento completo a cada atualização, você pode passar "updateLookup" para a opção de documento completo. Se uma operação de atualização alterar vários documentos, haverá uma notificação para cada documento atualizado. Essa transformação ocorre para garantir que as declarações no oplog sejam idempotentes.
Outro tipo de evento que pode ser recebido é o evento de invalidação. Isso informa ao driver que o change stream não é mais válido. Em seguida, o driver fechará o stream. Possíveis razões para isso incluem a coleção ter sido descartada ou renomeada.
Para ver isso em ação, atualize seu test.py e execute-o ao mesmo tempo em que executa o programa change_stream:
1import os
2import pymongo
3
4client = pymongo.MongoClient(os.environ['CHANGE_STREAM_DB'])
5client.changestream.collection.insert_one({"_id": 1, "hello": "world"})
6client.changestream.collection.update_one({"_id": 1}, {"$set": {"hello": "mars"}})
7client.changestream.collection.replace_one({"_id": 1} , {"bye": "world"})
8client.changestream.collection.delete_one({"_id": 1})
9client.changestream.collection.drop()
A saída deve ser semelhante a:
1➜ python3 change_streams.py
2{"fullDocument": {"_id": 1, "hello": "world"}, "documentKey": {"_id": 1}, "_id": {"_data": {"$binary": "glsIjuEAAAABRh5faWQAKwIAWhAECGhHe6/FRZGs3pXLkYH4HgQ=", "$type": "00"}}, "ns": {"coll": "collection", "db": "changestream"}, "operationType": "insert"}
3
4{"documentKey": {"_id": 1}, "_id": {"_data": {"$binary": "glsIjuEAAAACRh5faWQAKwIAWhAECGhHe6/FRZGs3pXLkYH4HgQ=", "$type": "00"}}, "updateDescription": {"removedFields": [], "updatedFields": {"hello": "mars"}}, "ns": {"coll": "collection", "db": "changestream"}, "operationType": "update"}
5
6{"fullDocument": {"bye": "world", "_id": 1}, "documentKey": {"_id": 1}, "_id": {"_data": {"$binary": "glsIjuEAAAADRh5faWQAKwIAWhAECGhHe6/FRZGs3pXLkYH4HgQ=", "$type": "00"}}, "ns": {"coll": "collection", "db": "changestream"}, "operationType": "replace"}
7
8{"documentKey": {"_id": 1}, "_id": {"_data": {"$binary": "glsIjuEAAAAERh5faWQAKwIAWhAECGhHe6/FRZGs3pXLkYH4HgQ=", "$type": "00"}}, "ns": {"coll": "collection", "db": "changestream"}, "operationType": "delete"}
9
10{"_id": {"_data": {"$binary": "glsIjuEAAAAFFFoQBAhoR3uvxUWRrN6Vy5GB+B4E", "$type": "00"}}, "operationType": "invalidate"}

Ouça notificações específicas

Até agora, seu programa tem escutado todas as operações. Em um aplicativo real, isso seria excessivo e, muitas vezes, desnecessário, pois cada parte do aplicativo geralmente deseja ouvir apenas operações específicas. Para limitar a quantidade de operações, você pode usar determinados estágios de agregação ao configurar o stream. Esses estágios são: $match, $project, $addfields, $replaceRoot e $redact. Todos os outros estágios de agregação não estão disponíveis.
Você pode testar essa funcionalidade alterando seu arquivo change_stream.py com o código abaixo e executando o script test.py. A saída agora deve conter apenas notificações de inserção.
1import os
2import pymongo
3from bson.json_util import dumps
4
5client = pymongo.MongoClient(os.environ['CHANGE_STREAM_DB'])
6change_stream = client.changestream.collection.watch([{
7 '$match': {
8 'operationType': { '$in': ['insert'] }
9 }
10}])
11
12for change in change_stream:
13 print(dumps(change))
14 print('')
Você também pode combinar os campos do documento e, assim, limitar o fluxo a determinados DocumentIds ou a documentos que tenham um determinado campo de documento etc.

Retome seus fluxos de alterações

Não importa se sua rede é ótima, haverá situações com falhas de conexão. Para garantir que nenhuma alteração seja perdida nesses casos, é necessário adicionar algum código para armazenar e manipular resumeTokens. Cada evento contém um resumeToken, por exemplo:
1"_id": {"_data": {"$binary": "glsIj84AAAACRh5faWQAKwIAWhAEvyfcy4djS8CUKRZ8tvWuOgQ=", "$type": "00"}}
Quando ocorre uma falha, o driver deve fazer automaticamente uma tentativa de reconexão. O aplicativo precisa lidar com outras tentativas, conforme necessário. Isso significa que o aplicativo deve se encarregar de sempre persistir o resumeToken.
Para tentar se conectar novamente, o resumeToken deve ser passado para o campo opcional resumeAfter ao criar o novo fluxo de alterações. Isso não garante que sempre possamos retomar o fluxo de alterações. O oplog do MongoDB é uma coleção limitada que mantém um registro contínuo das operações mais recentes. A retomada de um fluxo de alterações só é possível se o oplog ainda não tiver sido rolado (ou seja, se as alterações nas quais estamos interessados ainda estiverem no oplog).

Caveats

  • Change Streams na produção: se você planeja usar change streams na produção, leia as recomendações do MongoDB.
  • Ordenação e rollbacks: o MongoDB garante que os eventos recebidos estarão na ordem em que ocorreram (fornecendo assim uma ordenação total das alterações nos fragmentos se você usar fragmentos). Além disso, apenas mudanças duráveis, ou seja, a maioria das mudanças confirmadas, serão enviadas aos ouvintes. Isso significa que os ouvintes não precisam considerar rollbacks em seus aplicativos.
  • Leitura de secundários: change streams podem ser abertos em qualquer nó portador de dados em um cluster, independentemente de ser primário ou secundário. No entanto, geralmente não é recomendável ler a partir de secundários, pois failovers podem levar a um aumento da carga e falhas nessa configuração.
  • Atualizações com a opção fullDocument: a opção fullDocument para operações de atualização não garante que o documento retornado não inclua mais alterações. Em contraste com os deltas de documentos que têm a garantia de serem enviados em ordem com notificações de atualização, não há garantia de que o fullDocument retornado represente o documento como era exatamente após a operação. updateLookup pesquisa a versão atual do documento. Se as alterações ocorrerem rapidamente, é possível que o documento tenha sido alterado antes da conclusão do updateLookup. Isso significa que o fullDocument pode não representar o documento no momento do evento, dando a impressão de que os eventos ocorreram em uma ordem diferente.
  • Impacto no desempenho: até 1.000 fluxos de alteração simultâneos para cada nó são aceitos com impacto insignificante no desempenho geral. No entanto, em clusters fragmentados, a garantia de ordenação total pode fazer com que os tempos de resposta do fluxo de alteração sejam mais lentos.
  • WiredTiger: change streams são um recurso do MongoDB 3.6 e posterior. Não está disponível para versões mais antigas, armazenamento MMAPv1 ou replicações pré-pv1.

Saiba mais

Para saber mais sobre isso, consulte a documentação sobre fluxo de alterações.
Se você estiver interessado em mais dicas do MongoDB, siga-nos no Twitter @mongodb.

Apêndice

Como configurar um cluster na nuvem

Se você ainda não configurou seu cluster gratuito no MongoDB Atlas, agora é um ótimo momento para fazer isso. Você tem todas as instruções nesta publicação no blog.

Como configurar um cluster local

Antes de configurar as instâncias, confirme que você está executando a versão 3.6 ou posterior do MongoDB Server (mongod) e do MongoDB shell (mongo). Você pode fazer isso executando mongod --version e mongo --version. Se algum deles não atender aos nossos requisitos,atualize para uma versão mais recente antes de continuar.
A seguir, você configurará um conjunto de réplicas de nó único chamado test-change-streams. Para um conjunto de réplicas de produção, são recomendados pelo menos três nós.
  1. Execute os seguintes comandos no seu terminal para criar um diretório para os arquivos do banco de dados e iniciar o processo mongod na porta 27017:
    1mkdir -p /data/test-change-streams
    2mongod --replSet test-change-streams --logpath "mongodb.log" --dbpath /data/test-change-streams --port 27017 --fork
  2. Agora abra um mongo shell na porta 27017:
    1mongo --port 27017
  3. No mongo shell que você acabou de abrir, configure seu conjunto de réplicas:
    1config = {
    2 _id: "test-change-streams",
    3 members: [{ _id : 0, host : "localhost:27017"}]
    4};
    5rs.initiate(config);
  4. Ainda dentro do mongo shell, agora você pode verificar se seu conjunto de réplicas está funcionando executando: rs.status();. A saída deve indicar que seu nó se tornou primário. Pode levar alguns segundos para mostrar isso, portanto, se você não estiver vendo isso imediatamente, execute o comando novamente após alguns segundos.
  5. EXECUTAR
    1export CHANGE_STREAM_DB=mongodb://localhost:27017
    em seu shell e continue.

Ícone do FacebookÍcone do Twitterícone do linkedin
Avalie esse Início rápido
star-empty
star-empty
star-empty
star-empty
star-empty
Relacionado
Tutorial

Confissões de um PyMongoArrowholic: usando Atlas Vector Search e PyMongoArrow para pesquisar semanticamente itens de moda de luxo


Aug 09, 2024 | 9 min read
Tutorial

RAG com Atlas Vector Search, LangChain e OpenAI


Sep 18, 2024 | 10 min read
Início rápido

Início rápido 2: pesquisa vetorial com MongoDB e OpenAI


May 06, 2024 | 12 min read
Tutorial

Sorva, Swig e Pesquise Com Playwright, OpenAI e MongoDB Atlas Search


Oct 01, 2024 | 12 min read
Sumário