Assine o MongoDB Change Streams via WebSockets
Aaron Bassett3 min read • Published Jan 28, 2022 • Updated Sep 23, 2022
Avalie esse Tutorial
Oschange stream permitem que os aplicativos acessem alterações de dados em tempo real sem a complexidade e o risco de afetar o oplog. Os aplicativos podem usar change stream para se inscrever em todas as alterações de dados em uma única collection, um banco de dados ou um sistema inteiro e React imediatamente a elas. Como os change stream usam o framework de aggregation, os aplicativos também podem filtrar por alterações específicas ou transformar as notificações à vontade.
O código de exemplo neste artigo presumirá que você tenha um cluster em execução no MongoDB Atlas, mas funcionará com qualquer versão do MongoDB do 3.6 em diante.
1 pip install motor 2 pip install tornado 3 pip install dnspython 4 pip install logzero
Para permitir que os clientes se inscrevam em seu change stream via WebSockets, você deve primeiro criar um servidor WebSocket. Esse servidor WebSocket, escrito em Python e usando o Tornado, faz proxy de quaisquer novos dados do change stream para seus clientes conectados.
1 class ChangesHandler(tornado.websocket.WebSocketHandler): 2 3 connected_clients = set() 4 5 def open(self): 6 ChangesHandler.connected_clients.add(self) 7 8 def on_close(self): 9 ChangesHandler.connected_clients.remove(self) 10 11 12 def send_updates(cls, message): 13 for connected_client in cls.connected_clients: 14 connected_client.write_message(message) 15 16 17 def on_change(cls, change): 18 message = f"{change['operationType']}: {change['fullDocument']['name']}" 19 ChangesHandler.send_updates(message)
À medida que os clientes se conectam e desconectam do servidor WebSocket, eles trigger os
open
on_close
métodos e .1 connected_clients = set() 2 3 def open(self): 4 ChangesHandler.connected_clients.add(self) 5 6 def on_close(self): 7 ChangesHandler.connected_clients.remove(self)
Quando um cliente se conecta, seu servidor armazena uma referência a ele no conjunto
connected_clients
. Isso permite que ele envie novos dados para o cliente quando eles forem recebidos do change stream. Da mesma forma, quando um cliente se desconecta do servidor, ele é removido do conjunto de clientes conectados, de modo que o servidor não tenta enviar atualizações em uma conexão que não existe mais.Vale a pena observar que o servidor não tem um manipulador
on_message
. Como os WebSockets são bidirecionais, normalmente um servidor WebSocket tem um métodoon_message
. Quando o cliente envia dados para o servidor, ele invoca esse método para tratar a mensagem recebida. No entanto, como você só está usando a conexão WebSocket para enviar dados de change stream para os clientes conectados, sua conexão WebSocket é essencialmente monodirecional e seu servidor não tem um método para tratar os dados de entrada.1 2 def send_updates(cls, message): 3 for connected_client in cls.connected_clients: 4 connected_client.write_message(message) 5 6 7 def on_change(cls, change): 8 message = f"{change['operationType']}: {change['fullDocument']['name']}" 9 ChangesHandler.send_updates(message)
Quando você tem novos dados do change stream, você os passa para o servidor WebSocket usando o método
on_change
. Esse método formata os dados do change stream em uma string pronta para ser enviada aos clientes conectados. Esse push ocorre no métodosend_updates
. Nesse método, você faz um loop de todos os clientes no conjuntoconnected_clients
e usa a açãowrite_message
para escrever os dados na conexão WebSocket do cliente.No Motor, ascollections do MongoDB têm um método
watch()
que você pode usar para monitorar a collection em busca de quaisquer alterações. Então, sempre que houver uma nova alteração no fluxo, você poderá usar o métodoon_change
do servidor WebSocket para proxy dos dados para os clientes do WebSocket.1 change_stream = None 2 3 async def watch(collection): 4 global change_stream 5 6 async with collection.watch(full_document='updateLookup') as change_stream: 7 async for change in change_stream: 8 ChangesHandler.on_change(change)
Essa função
watch
é anexada ao loop do Tornado como um retorno de chamada.1 def main(): 2 client = MotorClient(os.environ["MONGO_SRV"]) 3 collection = client["sample_airbnb"]["listingsAndReviews"] 4 5 loop = tornado.ioloop.IOLoop.current() 6 loop.add_callback(watch, collection) 7 8 try: 9 loop.start() 10 except KeyboardInterrupt: 11 pass 12 finally: 13 if change_stream is not None: 14 change_stream.close()
Para este exemplo, seu cliente WebSocket é uma página da Web simples e registra todas as mensagens do WebSocket no console JavaScript do navegador.
1 <html> 2 <head> 3 <script> 4 const websocket = new WebSocket('ws://127.0.0.1:8000/socket') 5 websocket.onmessage = function(evt) { 6 console.log(evt.data) 7 } 8 </script> 9 </head> 10 <body></body> 11 </html>
Você também pode usar o Tornado para veicular esta página da web.
1 class WebpageHandler(tornado.web.RequestHandler): 2 def get(self): 3 self.render("templates/index.html")
Para tentar o exemplo por si mesmo:
- instalar os requisitos.
- Defina as variáveis de ambiente necessárias.
- Execute o script do Python.
1 git clone git@github.com:aaronbassett/mongodb-changestreams-tornado-example.git 2 cd mongodb-changestreams-tornado-example 3 pip install -r requirements.txt 4 export MONGO_SRV= 5 python server.py
Depois que o servidor WebSocket estiver em execução em seu terminal, abra uma janela do navegador para localhost:8000 e visualize seu console JavaScript. Em seguida, faça algumas alterações em sua Coleção por meio doCompass ou do MongoDB Shell.
Neste artigo, você se inscreveu para todas as alterações em uma única collection. No entanto, você pode usar change streams para assinar todas as alterações de dados em uma única collection, um banco de dados ou até mesmo um sistema inteiro. E, como os change streams usam a framework de aggregation, os aplicativos também podem filtrar por alterações específicas ou transformar as notificações.