Menu Docs

Explorar Change Streams do MongoDB

Siga este tutorial para saber como criar um change stream em uma collection do MongoDB e observar os eventos de alteração que ele cria.

1

Conclua as etapas daConfiguração do tutorial doKafka Connector para iniciar o Kafka Connect da Confluent e o ambiente do MongoDB .

2

Crie duas sessões de shell interativas no tutorial Docker Container, cada uma em uma janela separada.

Execute o seguinte comando a partir de um terminal para iniciar uma shell interativa.

docker exec -it mongo1 /bin/bash

Iremos referir-se a este shell interactivo como ChangeStreamShell1 durante este tutorial.

Execute o seguinte comando em um segundo terminal para iniciar uma shell interativa:

docker exec -it mongo1 /bin/bash

Iremos referir-se a este shell interactivo como ChangeStreamShell2 durante este tutorial.

3

Em ChangeStreamShell1, crie um script Python para abrir um fluxo de alteração usando o driver PyMongo.

nano openchangestream.py

Cole o código abaixo no arquivo e salve as alterações:

import pymongo
from bson.json_util import dumps
client = pymongo.MongoClient('mongodb://mongo1')
db = client.get_database(name='Tutorial1')
with db.orders.watch() as stream:
print('\nA change stream is open on the Tutorial1.orders namespace. Currently watching ...\n\n')
for change in stream:
print(dumps(change, indent = 2))

Execute o roteiro do Python:

python3 openchangestream.py

O script gera a seguinte mensagem após ser iniciado com sucesso:

Change Stream is opened on the Tutorial1.orders namespace. Currently watching ...
4

Em ChangeStreamShell2, conecte-se ao MongoDB usando o mongosh, o shell do MongoDB, usando o seguinte comando:

mongosh "mongodb://mongo1"

Depois de se conectar com sucesso, você deverá ver o seguinte prompt do shell do MongoDB:

rs0 [direct: primary] test>

No prompt, digite os seguintes comandos:

use Tutorial1
db.orders.insertOne( { 'test' : 1 } )

Depois de inserir os comandos anteriores, alterne para ChangeStreamShell1 para visualizar a saída do fluxo de alterações, que deve ser semelhante ao seguinte:

{
"_id": {
"_data": "826264..."
},
"operationType": "insert",
"clusterTime": {
"$timestamp": {
"t": 1650754657,
"i": 1
}
},
"wallTime": {
"$date": "2022-10-13T17:06:23.409Z"
},
"fullDocument": {
"_id": {
"$oid": "<_id value of document>"
},
"test": 1
},
"ns": {
"db": "Tutorial1",
"coll": "orders"
},
"documentKey": {
"_id": {
"$oid": "<_id value of document>"
}
}
}

Para interromper o script, pressione Ctrl+C .

Ao final desta etapa, você acionará e observará com sucesso um evento de fluxo de alterações.

5

Você pode aplicar um filtro a um change stream passando para ele um agregação pipeline.

Em ChangeStreamShell1, crie um novo script Python para abrir um fluxo de alteração filtrado usando o driver PyMongo.

nano pipeline.py

Cole o código abaixo no arquivo e salve as alterações:

import pymongo
from bson.json_util import dumps
client = pymongo.MongoClient('mongodb://mongo1')
db = client.get_database(name='Tutorial1')
pipeline = [ { "$match": { "$and": [ { "fullDocument.type": "temp" }, { "fullDocument.value": { "$gte": 100 } } ] } } ]
with db.sensors.watch(pipeline=pipeline) as stream:
print('\nChange Stream is opened on the Tutorial1.sensors namespace. Currently watching for values > 100...\n\n')
for change in stream:
print(dumps(change, indent = 2))

Execute o roteiro do Python:

python3 pipeline.py

O script gera a seguinte mensagem após ser iniciado com sucesso:

Change Stream is opened on the Tutorial1.sensors namespace. Currently watching for values > 100...
6

Retorne à sua sessão do ChangeStreamShell2 , que deve estar conectada ao MongoDB usando mongosh.

No prompt, digite os seguintes comandos:

use Tutorial1
db.sensors.insertOne( { 'type' : 'temp', 'value':101 } )

Conforme indicado pela saída do script, o fluxo de alterações cria um evento de alteração porque corresponde ao seguinte pipeline:

[ { "$match": { "$and": [ { "fullDocument.type": "temp" }, { "fullDocument.value": { "$gte": 100 } } ] } } ]

Tente inserir os seguintes documentos no ChangeStreamShell2 para verificar se o fluxo de alterações só produz eventos quando os documentos corresponderem ao filtro:

db.sensors.insertOne( { 'type' : 'temp', 'value': 99 } )
db.sensors.insertOne( { 'type' : 'pressure', 'value': 22 } )
7

Depois de concluir este tutorial, libere recursos em seu computador interrompendo ou removendo os ativos do Docker. Você pode remover os containers e as imagens do Docker, ou somente os containers. Se você remover os containers e as imagens, será necessário baixá-los novamente para reiniciar seu ambiente de desenvolvimento do Conector Kafka MongoDB, que tem aproximadamente 2,4 GB. Se você remover somente os containers, poderá reutilizar as imagens sem precisar baixar a maioria dos arquivos grandes na pipeline de dados de amostra.

Dica

Mais tutoriais

Se você pretende concluir mais tutoriais do Conector Kafka MongoDB, considere remover apenas os containers. Se você não pretende concluir mais tutoriais do Conector Kafka MongoDB, considere remover containers e imagens.

Selecione a aba que corresponde à tarefa de remoção que deseja executar.

Execute o seguinte comando shell para remover os containers e as imagens do Docker para o ambiente de desenvolvimento:

docker-compose -p mongo-kafka down --rmi all

Execute o seguinte comando shell para remover os containers do Docker, mas mantenha as imagens para o ambiente de desenvolvimento:

docker-compose -p mongo-kafka down

Para reiniciar os containers, siga as mesmas etapas necessárias para iniciá-los na configuração do tutorial.

Neste tutorial, você criou um change stream no MongoDB eobservou a saída. O de MongoDB Kafka origem do connector lê os eventos de alteração de um change stream que você configura e os grava em um Kafka tópico do .

Para saber como configurar um change stream e o Kafka tópico para um de connector origem, consulte o tutorial de Introdução ao de MongoDB Kafka origem do .connector

Leia os seguintes recursos para saber mais sobre os conceitos mencionados neste tutorial: