Migrar uma coleção existente para uma coleção de série temporal
Siga este tutorial para saber como converter uma collection MongoDB existente em uma time-series collection usando o Kafka Connector do MongoDB.
A coleção de séries temporais armazena dados de time-series de forma eficiente. Os dados de time-series consistem em medições feitas em intervalos de tempo, metadados que descrevem a medição e o tempo da medição.
Para converter dados de uma collection do MongoDB para uma collection de séries temporais usando o connector, você deve executar as seguintes tarefas:
Identifique o campo de tempo comum a todos os documento na collection.
Configure um connector de origem para copiar os dados de collection existentes para um tópico do Kafka.
Configure um conector de pia para copiar os dados do tópico Kafka para a coleção de séries temporais.
Neste tutorial, você executa as tarefas anteriores para migrar dados de estoque de uma collection para uma time series collection. A time series collection armazena e indexa os dados de forma mais eficiente e mantém a capacidade de analisar o desempenho das ações ao longo do tempo usando operadores de agregação.
Migrar uma coleção para uma coleção de série temporal
Concluir a configuração do tutorial
Conclua as etapas daConfiguração do tutorial doKafka Connector para iniciar o Kafka Connect da Confluent e o ambiente do MongoDB .
Gerar dados de amostra
Execute o seguinte comando para iniciar um script no seu Docker que gera uma collection de amostras contendo símbolos de ações fabricados e seus preços em seu tutorial MongoDB replica set:
docker exec -ti mongo1 /bin/bash -c "cd /stockgenmongo/ && python3 stockgen.py -db Stocks -col PriceData"
Depois que o gerador de dados começar a ser executado, você deverá ver os dados gerados que se assemelham aos seguintes:
... 1 _id=528e9... MSP MASSIVE SUBMARINE PARTNERS traded at 31.08 2022-05-25 21:15:15 2 _id=528e9... RWH RESPONSIVE_WHOLESALER HOLDINGS traded at 18.42 2022-05-25 21:15:15 3 _id=528e9... FAV FUZZY ATTACK VENTURES traded at 31.08 2022-05-25 21:15:15 ...
Configurar o conector de origem
Em uma janela de terminal separada, crie uma sessão de shell interativa no tutorial Docker container baixado para a configuração do tutorial usando o seguinte comando:
docker exec -it mongo1 /bin/bash
Crie um arquivo de configuração de origem denominado stock-source.json
com o seguinte comando:
nano stock-source.json
Cole as seguintes informações de configuração no arquivo e salve suas alterações:
{ "name": "mongo-source-marketdata", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "publish.full.document.only": "true", "connection.uri": "mongodb://mongo1", "topic.prefix": "marketdata", "database": "Stocks", "collection": "PriceData", "copy.existing": "true" } }
Essa configuração instrui o connector a copiar os dados existentes da collection PriceData
MongoDB para o tópico marketdata.Stocks.PriceData
do Kafka e, quando concluído, quaisquer dados futuros inseridos nessa collection.
Execute o seguinte comando na shell para iniciar o conector de origem utilizando o arquivo de configuração que você criou:
cx stock-source.json
Observação
O comando cx
é um roteiro personalizado incluído no ambiente de desenvolvimento do tutorial. Este roteiro executa a seguinte solicitação equivalente à API REST do Kafka Connect para criar um novo conector:
curl -X POST -H "Content-Type: application/json" -d @stock-source.json http://connect:8083/connectors -w "\n"
Execute o seguinte comando na shell para verificar o status dos conectores:
status
Se o conector de origem foi iniciado com sucesso, você deverá ver a seguinte saída:
Kafka topics: ... The status of the connectors: source | mongo-source-marketdata | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector Currently configured connectors [ "mongo-source-marketdata" ] ...
Depois que o connector for iniciado, confirme se o Kafka recebeu os dados da collection executando o seguinte comando:
kafkacat -b broker:29092 -C -t marketdata.Stocks.PriceData
A saída deve mostrar dados de tópico conforme publicados pelo connector de origem, semelhante ao seguinte:
{"schema":{ ... }, "payload": "{ "_id": { "$oid": "628e9..."}, "company_symbol": "MSP", "Company_name": "MASSIVE SUBMARINE PARTNERS", "price": 309.98, "tx_time": { "$date": 16535..." }"}
Você pode sair de kafkacat
digitando CTRL+C
.
Configurar o conector do coletor
Configure um connector para ler dados do tópico Kafka e registrá-los em uma time-series collection denominada StockDataMigrate
em um reconhecimento de data center denominado Stocks
.
Crie um arquivo de configuração do coletor chamado stock-sink.json
com o seguinte comando:
nano stock-sink.json
Cole as seguintes informações de configuração no arquivo e salve suas alterações:
{ "name": "mongo-sink-marketdata", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector", "topics": "marketdata.Stocks.PriceData", "connection.uri": "mongodb://mongo1", "database": "Stocks", "collection": "StockDataMigrate", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "timeseries.timefield": "tx_time", "timeseries.timefield.auto.convert": "true", "timeseries.timefield.auto.convert.date.format": "yyyy-MM-dd'T'HH:mm:ss'Z'" } }
Dica
A configuração do conector do coletor acima usa o conversor de formato de data do campo de tempo. Como alternativa, você pode usar a Transformação de mensagem única (SMT) TimestampConverter
para converter o campo tx_time
de String
para ISODate
. Ao usar o TimestampConverter
SMT, você deve definir um esquema para os dados no tópico Kafka.
Para obter informações sobre como usar o TimestampConverter
SMT, consulte o TimestampConverter Documentação do Confluent.
Execute o seguinte comando no shell para iniciar o connector do coletor usando o arquivo de configuração que você atualizou:
cx stock-sink.json
Depois que o connector do coletor terminar de processar os dados do tópico, o documento na collection de time-series StockDataMigrate
conterão o campo tx_time
com um valor do tipo ISODate
.
Verifique os dados da coleção de séries temporais
Depois que o connector do coletor concluir o processamento dos dados do tópico, a time-series collection StockDataMigrate
deverá conter todos os dados de mercado da sua collection PriceData
.
Para visualizar os dados no MongoDB, execute o seguinte comando para conectar ao seu conjunto de réplica utilizando o mongosh
:
mongosh "mongodb://mongo1"
No prompt, digite os seguintes comandos para recuperar todos os documentos no namespace MongoDB do Stocks.StockDataMigrate
:
use Stocks db.StockDataMigrate.find()
Você deve ver uma lista de documentos retornados do comando que se assemelham ao seguinte documento:
{ tx_time: ISODate("2022-05-25T21:16:35.983Z"), _id: ObjectId("628e9..."), symbol: 'FAV', price: 18.43, company_name: 'FUZZY ATTACK VENTURES' }
Resumo
Neste tutorial, você criou um gerador de dados de código de ações que gravava dados periodicamente em uma collection do MongoDB. Você configurou um connector para copiar os dados em um Kafka e configurou um connector para gravar esses dados em uma nova coleção de séries temporais MongoDB.
Saiba mais
Leia os seguintes recursos para saber mais sobre os conceitos mencionados neste tutorial: