Explorar Change Streams do MongoDB
Nesta página
Siga este tutorial para saber como criar um change stream em uma collection do MongoDB e observar os eventos de alteração que ele cria.
Explorar Change Streams
Concluir a configuração do tutorial
Conclua as etapas daConfiguração do tutorial doKafka Connector para iniciar o Kafka Connect da Confluent e o ambiente do MongoDB .
Conecte-se ao Docker Container
Crie duas sessões de shell interativas no tutorial Docker Container, cada uma em uma janela separada.
Execute o seguinte comando a partir de um terminal para iniciar uma shell interativa.
docker exec -it mongo1 /bin/bash
Iremos referir-se a este shell interactivo como ChangeStreamShell1 durante este tutorial.
Execute o seguinte comando em um segundo terminal para iniciar uma shell interativa:
docker exec -it mongo1 /bin/bash
Iremos referir-se a este shell interactivo como ChangeStreamShell2 durante este tutorial.
Abrir um fluxo de alterações
Em ChangeStreamShell1, crie um script Python para abrir um fluxo de alteração usando o driver PyMongo.
nano openchangestream.py
Cole o código abaixo no arquivo e salve as alterações:
import pymongo from bson.json_util import dumps client = pymongo.MongoClient('mongodb://mongo1') db = client.get_database(name='Tutorial1') with db.orders.watch() as stream: print('\nA change stream is open on the Tutorial1.orders namespace. Currently watching ...\n\n') for change in stream: print(dumps(change, indent = 2))
Execute o roteiro do Python:
python3 openchangestream.py
O script gera a seguinte mensagem após ser iniciado com sucesso:
Change Stream is opened on the Tutorial1.orders namespace. Currently watching ...
trigger um evento de alteração
Em ChangeStreamShell2, conecte-se ao MongoDB usando o mongosh
, o shell do MongoDB, usando o seguinte comando:
mongosh "mongodb://mongo1"
Depois de se conectar com sucesso, você deverá ver o seguinte prompt do shell do MongoDB:
rs0 [direct: primary] test>
No prompt, digite os seguintes comandos:
use Tutorial1 db.orders.insertOne( { 'test' : 1 } )
Depois de inserir os comandos anteriores, alterne para ChangeStreamShell1 para visualizar a saída do fluxo de alterações, que deve ser semelhante ao seguinte:
{ "_id": { "_data": "826264..." }, "operationType": "insert", "clusterTime": { "$timestamp": { "t": 1650754657, "i": 1 } }, "wallTime": { "$date": "2022-10-13T17:06:23.409Z" }, "fullDocument": { "_id": { "$oid": "<_id value of document>" }, "test": 1 }, "ns": { "db": "Tutorial1", "coll": "orders" }, "documentKey": { "_id": { "$oid": "<_id value of document>" } } }
Para interromper o script, pressione Ctrl+C
.
Ao final desta etapa, você acionará e observará com sucesso um evento de fluxo de alterações.
Abrir um fluxo de alterações filtrado
Você pode aplicar um filtro a um change stream passando para ele um agregação pipeline.
Em ChangeStreamShell1, crie um novo script Python para abrir um fluxo de alteração filtrado usando o driver PyMongo.
nano pipeline.py
Cole o código abaixo no arquivo e salve as alterações:
import pymongo from bson.json_util import dumps client = pymongo.MongoClient('mongodb://mongo1') db = client.get_database(name='Tutorial1') pipeline = [ { "$match": { "$and": [ { "fullDocument.type": "temp" }, { "fullDocument.value": { "$gte": 100 } } ] } } ] with db.sensors.watch(pipeline=pipeline) as stream: print('\nChange Stream is opened on the Tutorial1.sensors namespace. Currently watching for values > 100...\n\n') for change in stream: print(dumps(change, indent = 2))
Execute o roteiro do Python:
python3 pipeline.py
O script gera a seguinte mensagem após ser iniciado com sucesso:
Change Stream is opened on the Tutorial1.sensors namespace. Currently watching for values > 100...
Observe o change stream filtrado
Retorne à sua sessão do ChangeStreamShell2 , que deve estar conectada ao MongoDB usando mongosh
.
No prompt, digite os seguintes comandos:
use Tutorial1 db.sensors.insertOne( { 'type' : 'temp', 'value':101 } )
Conforme indicado pela saída do script, o fluxo de alterações cria um evento de alteração porque corresponde ao seguinte pipeline:
[ { "$match": { "$and": [ { "fullDocument.type": "temp" }, { "fullDocument.value": { "$gte": 100 } } ] } } ]
Tente inserir os seguintes documentos no ChangeStreamShell2 para verificar se o fluxo de alterações só produz eventos quando os documentos corresponderem ao filtro:
db.sensors.insertOne( { 'type' : 'temp', 'value': 99 } ) db.sensors.insertOne( { 'type' : 'pressure', 'value': 22 } )
(Opcional) Interromper os containers do Docker
Depois de concluir este tutorial, libere recursos em seu computador interrompendo ou removendo os ativos do Docker. Você pode remover os containers e as imagens do Docker, ou somente os containers. Se você remover os containers e as imagens, será necessário baixá-los novamente para reiniciar seu ambiente de desenvolvimento do Conector Kafka MongoDB, que tem aproximadamente 2,4 GB. Se você remover somente os containers, poderá reutilizar as imagens sem precisar baixar a maioria dos arquivos grandes na pipeline de dados de amostra.
Dica
Mais tutoriais
Se você pretende concluir mais tutoriais do Conector Kafka MongoDB, considere remover apenas os containers. Se você não pretende concluir mais tutoriais do Conector Kafka MongoDB, considere remover containers e imagens.
Selecione a aba que corresponde à tarefa de remoção que deseja executar.
Execute o seguinte comando shell para remover os containers e as imagens do Docker para o ambiente de desenvolvimento:
docker-compose -p mongo-kafka down --rmi all
Execute o seguinte comando shell para remover os containers do Docker, mas mantenha as imagens para o ambiente de desenvolvimento:
docker-compose -p mongo-kafka down
Para reiniciar os containers, siga as mesmas etapas necessárias para iniciá-los na configuração do tutorial.
Resumo
Neste tutorial, você criou um change stream no MongoDB eobservou a saída. O de MongoDB Kafka origem do connector lê os eventos de alteração de um change stream que você configura e os grava em um Kafka tópico do .
Para saber como configurar um change stream e o Kafka tópico para um de connector origem, consulte o tutorial de Introdução ao de MongoDB Kafka origem do .connector
Saiba mais
Leia os seguintes recursos para saber mais sobre os conceitos mencionados neste tutorial: