Fluxos de alterações do MongoDB com Python
Naomi Pentrel9 min read • Published Feb 05, 2022 • Updated Sep 23, 2022
Avalie esse Início rápido
Change streams permite que você ouça as alterações que ocorrem no seu MongoDB database. No MongoDB 3.6 ou superior, esta funcionalidade permite criar aplicativos que podem responder imediatamente a alterações de dados em tempo real. Neste tutorial, mostraremos como usar change streams com o Python. Você irá:
- Saber mais sobre change streams
- Crie um programa que ouça inserções
- Alterar o programa para ouvir outros tipos de eventos
- Altere o programa para ouvir notificações específicas
Para acompanhar o processo, você pode criar um ambiente de teste usando as etapas abaixo. Isso é opcional, mas altamente recomendado, pois permitirá que você teste o uso da funcionalidade de change stream com os exemplos fornecidos. Você receberá todos os comandos, mas é necessária alguma familiaridade com o MongoDB.
A capacidade de ouvir alterações específicas nos dados permite que um aplicativo seja muito mais rápido na resposta às mudanças. Se um usuário do seu sistema atualizar as informações, o sistema poderá ouvir e propagar essas alterações imediatamente. Por exemplo, isso pode significar que os usuários não precisam mais clicar em atualizar para ver quando as alterações foram aplicadas. Ou, se as alterações de um usuário em um sistema precisarem da aprovação de alguém, outro sistema poderá ouvir as alterações e enviar notificações solicitando aprovações instantaneamente.
Antes dos change streams, os aplicativos que precisavam saber sobre a adição de novos dados em tempo real tinham que pesquisar dados continuamente ou confiar em outros mecanismos de atualização. Uma técnica comum, embora complexa, para monitorar as mudanças era seguir o Registro de Operações (Oplog) do MongoDB. O Oplog faz parte do sistema de replicação do MongoDB e, como tal, já acompanha modificações no banco de dados, mas não é fácil de usar para a lógica de negócios. Change streams são criados sobre o Oplog, mas fornecem uma API nativa que melhora a eficiência e a usabilidade. Observe que você não pode abrir um change stream em uma coleção em um MongoDB Server autônomo porque o recurso depende do Oplog, que é usado apenas em conjuntos de réplicas.
Ao registrar um fluxo de alterações, você precisa especificar a coleção e quais tipos de alterações deseja ouvir. Você pode fazer isso usando o
$match
e alguns outros estágios do pipeline de agregação que limitam a quantidade de dados que você receberá. Se seu banco de dados impõe autenticação e autorização, os fluxos de alterações fornecem o mesmo controle de acesso das queries normais.A melhor maneira de entender como os fluxos de alterações operam é trabalhando com eles. Na próxima seção, mostraremos como configurar um servidor e scripts. Depois de concluir a configuração, você terá dois scripts: um script Python ouve as notificações do fluxo de alterações e as imprime. O outro script imita um aplicativo executando operações de inserção, atualização, substituição e exclusão para que você possa ver as notificações na saída do primeiro script. Você também aprenderá a limitar as notificações àquelas em que está interessado.
Para começar, configure um ambiente virtual usando o Virtualenv. O Virtualenv permite isolar dependências do seu projeto de outros projetos. Crie um diretório para este projeto e copie o seguinte para um arquivo chamado requires.txt no seu novo diretório:
1 pymongo==3.8.0 2 dnspython
Para criar e ativar seu ambiente virtual, execute os seguintes comandos em seu terminal:
1 virtualenv venv # sets up the environment 2 source venv/bin/activate # activates the environment 3 pip3 install -r requirements.txt # installs our dependencies
Para facilitar a leitura, presumimos que você esteja executando o Python 3 com os comandos python3 e pip3 . Se você estiver executando o Python 2.7, substitua python e pip por esses comandos.
Analisaremos duas opções para configurar um conjunto de réplicas de teste do MongoDB para nos conectarmos. Se você tiver o MongoDB 3.6 ou posterior instalado e se sentir confortável em fazer alterações em sua configuração local, escolha essa opção, siga o guia no apêndice e pule para a próxima seção.
Se você não tiver o MongoDB instalado, preferir não misturar com sua configuração local ou se você acabou de começar a usar o MongoDB, recomendamos que você configure um MongoDB Atlas cluster; há uma camada grátis que oferece um conjunto de réplica de três nós, ideal para experimentar e aprender. Basta seguir estas etapas até obter a string de conexão do URI na etapa 8. Pegue essa string de conexão URI, insira a senha onde está escrito
<password>
e adicione-a ao seu ambiente executando1 export CHANGE_STREAM_DB="mongodb+srv://user:<password>@example-xkfzv.mongodb.net/test?retryWrites=true"
no seu terminal. A string que você usa como valor será diferente.
Antes de continuar, teste rapidamente sua configuração. Crie um arquivo
test.py
com o seguinte conteúdo:1 import os 2 import pymongo 3 4 client = pymongo.MongoClient(os.environ['CHANGE_STREAM_DB']) 5 print(client.changestream.collection.insert_one({"hello": "world"}).inserted_id)
Ao executar
python3 test.py
você deverá ver um ObjectId
sendo impresso.Agora que você confirmou sua configuração, vamos criar o pequeno programa que escutará as alterações no banco de dados usando um change stream. Crie um arquivo
change_streams.py
diferente com o seguinte conteúdo:1 import os 2 import pymongo 3 from bson.json_util import dumps 4 5 client = pymongo.MongoClient(os.environ['CHANGE_STREAM_DB']) 6 change_stream = client.changestream.collection.watch() 7 for change in change_stream: 8 print(dumps(change)) 9 print('') # for readability only
Execute
python3 change_streams.py
, você notará que o programa não imprime nada e apenas espera que as operações aconteçam na coleção especificada. Enquanto mantém o programa change_streams
em execução, abra outra janela do terminal e execute python3 test.py
. Você terá que executar o mesmo comando de exportação executado na seção Configurar seu cluster para adicionar a variável de ambiente à nova janela do terminal.Ao verificar a janela do terminal que está executando o programa
change_streams
, você verá que a operação de inserção foi registrada. Deve ser como a saída abaixo, mas com um ObjectId
diferente e com um valor diferente para $binary
.1 ➜ python3 change_streams.py 2 {"_id": {"_data": {"$binary": "glsIjGUAAAABRmRfaWQAZFsIjGXiJuWPOIv2PgBaEAQIaEd7r8VFkazelcuRgfgeBA==", "$type": "00"}}, "operationType": "insert", "fullDocument": {"_id": {"$oid": "5b088c65e226e58f388bf63e"}, "hello": "world"}, "ns": {"db": "changestream", "coll": "collection"}, "documentKey": {"_id": {"$oid": "5b088c65e226e58f388bf63e"}}}
Você pode ouvir quatro tipos de eventos baseados em documentos:
- inserir
- Atualização
- Substituir
- Excluir
Dependendo do tipo de evento, a estrutura do documento que você receberá será ligeiramente diferente, mas você sempre receberá o seguinte:
1 { 2 _id: <resume_token>, 3 operationType: "<type>", 4 ns: {db: "<db name>", coll: "<collection name>"}, 5 documentKey: { <unique identifier> } 6 }
No caso de inserções e substituições, o
fullDocument
também é fornecido por padrão. No caso de operações de atualização, o campo extra fornecido é updateDescription
e dá o delta do documento (ou seja, a diferença entre o documento antes e depois da operação). Por padrão, as operações de atualização incluem apenas o delta entre o documento antes e depois da operação. Para obter o documento completo a cada atualização, você pode passar "updateLookup" para a opção de documento completo. Se uma operação de atualização alterar vários documentos, haverá uma notificação para cada documento atualizado. Essa transformação ocorre para garantir que as declarações no oplog sejam idempotentes.Outro tipo de evento que pode ser recebido é o evento de invalidação. Isso informa ao driver que o change stream não é mais válido. Em seguida, o driver fechará o stream. Possíveis razões para isso incluem a coleção ter sido descartada ou renomeada.
Para ver isso em ação, atualize seu
test.py
e execute-o ao mesmo tempo em que executa o programa change_stream
:1 import os 2 import pymongo 3 4 client = pymongo.MongoClient(os.environ['CHANGE_STREAM_DB']) 5 client.changestream.collection.insert_one({"_id": 1, "hello": "world"}) 6 client.changestream.collection.update_one({"_id": 1}, {"$set": {"hello": "mars"}}) 7 client.changestream.collection.replace_one({"_id": 1} , {"bye": "world"}) 8 client.changestream.collection.delete_one({"_id": 1}) 9 client.changestream.collection.drop()
A saída deve ser semelhante a:
1 ➜ python3 change_streams.py 2 {"fullDocument": {"_id": 1, "hello": "world"}, "documentKey": {"_id": 1}, "_id": {"_data": {"$binary": "glsIjuEAAAABRh5faWQAKwIAWhAECGhHe6/FRZGs3pXLkYH4HgQ=", "$type": "00"}}, "ns": {"coll": "collection", "db": "changestream"}, "operationType": "insert"} 3 4 {"documentKey": {"_id": 1}, "_id": {"_data": {"$binary": "glsIjuEAAAACRh5faWQAKwIAWhAECGhHe6/FRZGs3pXLkYH4HgQ=", "$type": "00"}}, "updateDescription": {"removedFields": [], "updatedFields": {"hello": "mars"}}, "ns": {"coll": "collection", "db": "changestream"}, "operationType": "update"} 5 6 {"fullDocument": {"bye": "world", "_id": 1}, "documentKey": {"_id": 1}, "_id": {"_data": {"$binary": "glsIjuEAAAADRh5faWQAKwIAWhAECGhHe6/FRZGs3pXLkYH4HgQ=", "$type": "00"}}, "ns": {"coll": "collection", "db": "changestream"}, "operationType": "replace"} 7 8 {"documentKey": {"_id": 1}, "_id": {"_data": {"$binary": "glsIjuEAAAAERh5faWQAKwIAWhAECGhHe6/FRZGs3pXLkYH4HgQ=", "$type": "00"}}, "ns": {"coll": "collection", "db": "changestream"}, "operationType": "delete"} 9 10 {"_id": {"_data": {"$binary": "glsIjuEAAAAFFFoQBAhoR3uvxUWRrN6Vy5GB+B4E", "$type": "00"}}, "operationType": "invalidate"}
Até agora, seu programa tem escutado todas as operações. Em um aplicativo real, isso seria excessivo e, muitas vezes, desnecessário, pois cada parte do aplicativo geralmente deseja ouvir apenas operações específicas. Para limitar a quantidade de operações, você pode usar determinados estágios de agregação ao configurar o stream. Esses estágios são:
$match
, $project
, $addfields
, $replaceRoot
e $redact
. Todos os outros estágios de agregação não estão disponíveis.Você pode testar essa funcionalidade alterando seu arquivo
change_stream.py
com o código abaixo e executando o script test.py
. A saída agora deve conter apenas notificações de inserção.1 import os 2 import pymongo 3 from bson.json_util import dumps 4 5 client = pymongo.MongoClient(os.environ['CHANGE_STREAM_DB']) 6 change_stream = client.changestream.collection.watch([{ 7 '$match': { 8 'operationType': { '$in': ['insert'] } 9 } 10 }]) 11 12 for change in change_stream: 13 print(dumps(change)) 14 print('')
Você também pode combinar os campos do documento e, assim, limitar o fluxo a determinados
DocumentIds
ou a documentos que tenham um determinado campo de documento etc.Não importa se sua rede é ótima, haverá situações com falhas de conexão. Para garantir que nenhuma alteração seja perdida nesses casos, é necessário adicionar algum código para armazenar e manipular
resumeToken
s. Cada evento contém um resumeToken
, por exemplo:1 "_id": {"_data": {"$binary": "glsIj84AAAACRh5faWQAKwIAWhAEvyfcy4djS8CUKRZ8tvWuOgQ=", "$type": "00"}}
Quando ocorre uma falha, o driver deve fazer automaticamente uma tentativa de reconexão. O aplicativo precisa lidar com outras tentativas, conforme necessário. Isso significa que o aplicativo deve se encarregar de sempre persistir o
resumeToken
.Para tentar se conectar novamente, o
resumeToken
deve ser passado para o campo opcional resumeAfter ao criar o novo fluxo de alterações. Isso não garante que sempre possamos retomar o fluxo de alterações. O oplog do MongoDB é uma coleção limitada que mantém um registro contínuo das operações mais recentes. A retomada de um fluxo de alterações só é possível se o oplog ainda não tiver sido rolado (ou seja, se as alterações nas quais estamos interessados ainda estiverem no oplog).- Change Streams na produção: se você planeja usar change streams na produção, leia as recomendações do MongoDB.
- Ordenação e rollbacks: o MongoDB garante que os eventos recebidos estarão na ordem em que ocorreram (fornecendo assim uma ordenação total das alterações nos fragmentos se você usar fragmentos). Além disso, apenas mudanças duráveis, ou seja, a maioria das mudanças confirmadas, serão enviadas aos ouvintes. Isso significa que os ouvintes não precisam considerar rollbacks em seus aplicativos.
- Leitura de secundários: change streams podem ser abertos em qualquer nó portador de dados em um cluster, independentemente de ser primário ou secundário. No entanto, geralmente não é recomendável ler a partir de secundários, pois failovers podem levar a um aumento da carga e falhas nessa configuração.
- Atualizações com a opção fullDocument: a opção fullDocument para operações de atualização não garante que o documento retornado não inclua mais alterações. Em contraste com os deltas de documentos que têm a garantia de serem enviados em ordem com notificações de atualização, não há garantia de que o fullDocument retornado represente o documento como era exatamente após a operação.
updateLookup
pesquisa a versão atual do documento. Se as alterações ocorrerem rapidamente, é possível que o documento tenha sido alterado antes da conclusão doupdateLookup
. Isso significa que o fullDocument pode não representar o documento no momento do evento, dando a impressão de que os eventos ocorreram em uma ordem diferente. - Impacto no desempenho: até 1.000 fluxos de alteração simultâneos para cada nó são aceitos com impacto insignificante no desempenho geral. No entanto, em clusters fragmentados, a garantia de ordenação total pode fazer com que os tempos de resposta do fluxo de alteração sejam mais lentos.
- WiredTiger: change streams são um recurso do MongoDB 3.6 e posterior. Não está disponível para versões mais antigas, armazenamento MMAPv1 ou replicações pré-pv1.
Se você ainda não configurou seu cluster gratuito no MongoDB Atlas, agora é um ótimo momento para fazer isso. Você tem todas as instruções nesta publicação no blog.
Antes de configurar as instâncias, confirme que você está executando a versão 3.6 ou posterior do MongoDB Server (mongod) e do MongoDB shell (mongo). Você pode fazer isso executando
mongod --version
e mongo --version
. Se algum deles não atender aos nossos requisitos,atualize para uma versão mais recente antes de continuar.A seguir, você configurará um conjunto de réplicas de nó único chamado
test-change-streams
. Para um conjunto de réplicas de produção, são recomendados pelo menos três nós.- Execute os seguintes comandos no seu terminal para criar um diretório para os arquivos do banco de dados e iniciar o processo mongod na porta
27017
:1 mkdir -p /data/test-change-streams 2 mongod --replSet test-change-streams --logpath "mongodb.log" --dbpath /data/test-change-streams --port 27017 --fork - Agora abra um mongo shell na porta
27017
:1 mongo --port 27017 - No mongo shell que você acabou de abrir, configure seu conjunto de réplicas:
1 config = { 2 _id: "test-change-streams", 3 members: [{ _id : 0, host : "localhost:27017"}] 4 }; 5 rs.initiate(config); - Ainda dentro do mongo shell, agora você pode verificar se seu conjunto de réplicas está funcionando executando:
rs.status();
. A saída deve indicar que seu nó se tornou primário. Pode levar alguns segundos para mostrar isso, portanto, se você não estiver vendo isso imediatamente, execute o comando novamente após alguns segundos.