Explore o novo chatbot do Developer Center! O MongoDB AI chatbot pode ser acessado na parte superior da sua navegação para responder a todas as suas perguntas sobre o MongoDB .

Junte-se a nós no Amazon Web Services re:Invent 2024! Saiba como usar o MongoDB para casos de uso de AI .
Desenvolvedor do MongoDB
Central de desenvolvedor do MongoDBchevron-right
Produtoschevron-right
MongoDBchevron-right

Assine o MongoDB Change Streams via WebSockets

Aaron Bassett3 min read • Published Jan 28, 2022 • Updated Sep 23, 2022
MongoDBFluxos de alteraçõesPython
Ícone do FacebookÍcone do Twitterícone do linkedin
Avalie esse Tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
Oschange stream permitem que os aplicativos acessem alterações de dados em tempo real sem a complexidade e o risco de afetar o oplog. Os aplicativos podem usar change stream para se inscrever em todas as alterações de dados em uma única collection, um banco de dados ou um sistema inteiro e React imediatamente a elas. Como os change stream usam o framework de aggregation, os aplicativos também podem filtrar por alterações específicas ou transformar as notificações à vontade.

Pré-requisitos

O código de exemplo neste artigo presumirá que você tenha um cluster em execução no MongoDB Atlas, mas funcionará com qualquer versão do MongoDB do 3.6 em diante.
Você também precisará do Python 3.6+ com Motor e Tornado instalados.
1pip install motor
2pip install tornado
3pip install dnspython
4pip install logzero

Criando um servidor WebSocket

Para permitir que os clientes se inscrevam em seu change stream via WebSockets, você deve primeiro criar um servidor WebSocket. Esse servidor WebSocket, escrito em Python e usando o Tornado, faz proxy de quaisquer novos dados do change stream para seus clientes conectados.
1class ChangesHandler(tornado.websocket.WebSocketHandler):
2
3 connected_clients = set()
4
5 def open(self):
6 ChangesHandler.connected_clients.add(self)
7
8 def on_close(self):
9 ChangesHandler.connected_clients.remove(self)
10
11 @classmethod
12 def send_updates(cls, message):
13 for connected_client in cls.connected_clients:
14 connected_client.write_message(message)
15
16 @classmethod
17 def on_change(cls, change):
18 message = f"{change['operationType']}: {change['fullDocument']['name']}"
19 ChangesHandler.send_updates(message)

Abrir e fechar conexões do WebSocket

À medida que os clientes se conectam e desconectam do servidor WebSocket, eles trigger os open on_close métodos e .
1connected_clients = set()
2
3def open(self):
4 ChangesHandler.connected_clients.add(self)
5
6def on_close(self):
7 ChangesHandler.connected_clients.remove(self)
Quando um cliente se conecta, seu servidor armazena uma referência a ele no conjuntoconnected_clients. Isso permite que ele envie novos dados para o cliente quando eles forem recebidos do change stream. Da mesma forma, quando um cliente se desconecta do servidor, ele é removido do conjunto de clientes conectados, de modo que o servidor não tenta enviar atualizações em uma conexão que não existe mais.
Vale a pena observar que o servidor não tem um manipulador on_message. Como os WebSockets são bidirecionais, normalmente um servidor WebSocket tem um métodoon_message. Quando o cliente envia dados para o servidor, ele invoca esse método para tratar a mensagem recebida. No entanto, como você só está usando a conexão WebSocket para enviar dados de change stream para os clientes conectados, sua conexão WebSocket é essencialmente monodirecional e seu servidor não tem um método para tratar os dados de entrada.

Enviando mensagens para clientes conectados

1@classmethod
2def send_updates(cls, message):
3 for connected_client in cls.connected_clients:
4 connected_client.write_message(message)
5
6@classmethod
7def on_change(cls, change):
8 message = f"{change['operationType']}: {change['fullDocument']['name']}"
9 ChangesHandler.send_updates(message)
Quando você tem novos dados do change stream, você os passa para o servidor WebSocket usando o métodoon_change. Esse método formata os dados do change stream em uma string pronta para ser enviada aos clientes conectados. Esse push ocorre no métodosend_updates. Nesse método, você faz um loop de todos os clientes no conjuntoconnected_clients e usa a açãowrite_message para escrever os dados na conexão WebSocket do cliente.

Monitorando um change stream para alterações

No Motor, ascollections do MongoDB têm um métodowatch() que você pode usar para monitorar a collection em busca de quaisquer alterações. Então, sempre que houver uma nova alteração no fluxo, você poderá usar o métodoon_changedo servidor WebSocket para proxy dos dados para os clientes do WebSocket.
1change_stream = None
2
3async def watch(collection):
4 global change_stream
5
6 async with collection.watch(full_document='updateLookup') as change_stream:
7 async for change in change_stream:
8 ChangesHandler.on_change(change)
Essa funçãowatch é anexada ao loop do Tornado como um retorno de chamada.
1def main():
2 client = MotorClient(os.environ["MONGO_SRV"])
3 collection = client["sample_airbnb"]["listingsAndReviews"]
4
5 loop = tornado.ioloop.IOLoop.current()
6 loop.add_callback(watch, collection)
7
8 try:
9 loop.start()
10 except KeyboardInterrupt:
11 pass
12 finally:
13 if change_stream is not None:
14 change_stream.close()

Assinando alterações no navegador via WebSockets

Para este exemplo, seu cliente WebSocket é uma página da Web simples e registra todas as mensagens do WebSocket no console JavaScript do navegador.
1<html>
2 <head>
3 <script>
4 const websocket = new WebSocket('ws://127.0.0.1:8000/socket')
5 websocket.onmessage = function(evt) {
6 console.log(evt.data)
7 }
8 </script>
9 </head>
10 <body></body>
11</html>
Você também pode usar o Tornado para veicular esta página da web.
1class WebpageHandler(tornado.web.RequestHandler):
2 def get(self):
3 self.render("templates/index.html")

Juntando tudo

Screencast mostrando a alteração sendo enviada em tempo real por meio de um WebSocket
Para tentar o exemplo por si mesmo:
  • clone o código de exemplo do Github.
  • instalar os requisitos.
  • Defina as variáveis de ambiente necessárias.
  • Execute o script do Python.
1git clone git@github.com:aaronbassett/mongodb-changestreams-tornado-example.git
2cd mongodb-changestreams-tornado-example
3pip install -r requirements.txt
4export MONGO_SRV=
5python server.py
Depois que o servidor WebSocket estiver em execução em seu terminal, abra uma janela do navegador para localhost:8000 e visualize seu console JavaScript. Em seguida, faça algumas alterações em sua Coleção por meio doCompass ou do MongoDB Shell.

Resumo

Neste artigo, você se inscreveu para todas as alterações em uma única collection. No entanto, você pode usar change streams para assinar todas as alterações de dados em uma única collection, um banco de dados ou até mesmo um sistema inteiro. E, como os change streams usam a framework de aggregation, os aplicativos também podem filtrar por alterações específicas ou transformar as notificações.
Para obter mais informações, consulte a documentação do MotorChangeStream.

Ícone do FacebookÍcone do Twitterícone do linkedin
Avalie esse Tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
Relacionado
Início rápido

Introdução a pipelines de agregação em Rust


Oct 01, 2024 | 15 min read
Tutorial

Otimizando o desempenho de $lookup usando o poder da indexação


Aug 30, 2024 | 7 min read
Tutorial

Introdução ao MongoDB Kotlin Driver


Sep 09, 2024 | 9 min read
Artigo

ORMs, ODMs e bibliotecas do MongoDB


Aug 28, 2024 | 3 min read
Sumário