Menu Docs
Página inicial do Docs
/
MongoDB Kafka Connector
/

Migrar uma coleção existente para uma coleção de série temporal

Nesta página

  • Migrar uma coleção para uma coleção de série temporal
  • Resumo
  • Saiba mais

Siga este tutorial para saber como converter uma collection MongoDB existente em uma time-series collection usando o Kafka Connector do MongoDB.

A coleção de séries temporais armazena dados de time-series de forma eficiente. Os dados de time-series consistem em medições feitas em intervalos de tempo, metadados que descrevem a medição e o tempo da medição.

Para converter dados de uma coleção MongoDB para uma coleção de séries temporais utilizando o conector, você precisa executar as seguintes tarefas:

  1. Identifique o campo de tempo comum a todos os documento na collection.

  2. Configure um connector de origem para copiar os dados de collection existentes para um tópico do Kafka.

  3. Configure um conector de pia para copiar os dados do tópico Kafka para a coleção de séries temporais.

Neste tutorial, você executa as tarefas anteriores para migrar dados de estoque de uma collection para uma time series collection. A time series collection armazena e indexa os dados de forma mais eficiente e mantém a capacidade de analisar o desempenho das ações ao longo do tempo usando operadores de agregação.

1
2

Execute o seguinte comando para iniciar um script no seu Docker que gera uma collection de amostras contendo símbolos de ações fabricados e seus preços em seu tutorial MongoDB replica set:

docker exec -ti mongo1 /bin/bash -c "cd /stockgenmongo/ && python3 stockgen.py -db Stocks -col PriceData"

Depois que o gerador de dados começar a ser executado, você deverá ver os dados gerados que se assemelham aos seguintes:

...
1 _id=528e9... MSP MASSIVE SUBMARINE PARTNERS traded at 31.08 2022-05-25 21:15:15
2 _id=528e9... RWH RESPONSIVE_WHOLESALER HOLDINGS traded at 18.42 2022-05-25 21:15:15
3 _id=528e9... FAV FUZZY ATTACK VENTURES traded at 31.08 2022-05-25 21:15:15
...
3

Em uma janela de terminal separada, crie uma sessão de shell interativa no tutorial Docker container baixado para a configuração do tutorial usando o seguinte comando:

docker exec -it mongo1 /bin/bash

Crie um arquivo de configuração de origem denominado stock-source.json com o seguinte comando:

nano stock-source.json

Cole as seguintes informações de configuração no arquivo e salve suas alterações:

{
"name": "mongo-source-marketdata",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"publish.full.document.only": "true",
"connection.uri": "mongodb://mongo1",
"topic.prefix": "marketdata",
"database": "Stocks",
"collection": "PriceData",
"copy.existing": "true"
}
}

Essa configuração instrui o connector a copiar os dados existentes da collection PriceData MongoDB para o tópico marketdata.Stocks.PriceData do Kafka e, quando concluído, quaisquer dados futuros inseridos nessa collection.

Execute o seguinte comando na shell para iniciar o conector de origem utilizando o arquivo de configuração que você criou:

cx stock-source.json

Observação

O comando cx é um roteiro personalizado incluído no ambiente de desenvolvimento do tutorial. Este roteiro executa a seguinte solicitação equivalente à API REST do Kafka Connect para criar um novo conector:

curl -X POST -H "Content-Type: application/json" -d @stock-source.json http://connect:8083/connectors -w "\n"

Execute o seguinte comando na shell para verificar o status dos conectores:

status

Se o conector de origem foi iniciado com sucesso, você deverá ver a seguinte saída:

Kafka topics:
...
The status of the connectors:
source | mongo-source-marketdata | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector
Currently configured connectors
[
"mongo-source-marketdata"
]
...

Depois que o connector for iniciado, confirme se o Kafka recebeu os dados da collection executando o seguinte comando:

kafkacat -b broker:29092 -C -t marketdata.Stocks.PriceData

A saída deve mostrar dados de tópico conforme publicados pelo connector de origem, semelhante ao seguinte:

{"schema":{ ... }, "payload": "{ "_id": { "$oid": "628e9..."}, "company_symbol": "MSP", "Company_name": "MASSIVE SUBMARINE PARTNERS", "price": 309.98, "tx_time": { "$date": 16535..." }"}

Você pode sair de kafkacat digitando CTRL+C.

4

Configure um connector para ler dados do tópico Kafka e registrá-los em uma time-series collection denominada StockDataMigrate em um reconhecimento de data center denominado Stocks.

Crie um arquivo de configuração do coletor chamado stock-sink.json com o seguinte comando:

nano stock-sink.json

Cole as seguintes informações de configuração no arquivo e salve suas alterações:

{
"name": "mongo-sink-marketdata",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"topics": "marketdata.Stocks.PriceData",
"connection.uri": "mongodb://mongo1",
"database": "Stocks",
"collection": "StockDataMigrate",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"timeseries.timefield": "tx_time",
"timeseries.timefield.auto.convert": "true",
"timeseries.timefield.auto.convert.date.format": "yyyy-MM-dd'T'HH:mm:ss'Z'"
}
}

Dica

A configuração do conector do coletor acima usa o conversor de formato de data do campo de tempo. Como alternativa, você pode usar a Transformação de mensagem única (SMT) TimestampConverter para converter o campo tx_time de String para ISODate . Ao usar o TimestampConverter SMT, você deve definir um esquema para os dados no tópico Kafka.

Para obter informações sobre como usar o TimestampConverter SMT, consulte o TimestampConverter Documentação do Confluent.

Execute o seguinte comando no shell para iniciar o connector do coletor usando o arquivo de configuração que você atualizou:

cx stock-sink.json

Depois que o connector do coletor terminar de processar os dados do tópico, o documento na collection de time-series StockDataMigrate conterão o campo tx_time com um valor do tipo ISODate .

5

Depois que o connector do coletor concluir o processamento dos dados do tópico, a time-series collection StockDataMigrate deverá conter todos os dados de mercado da sua collection PriceData .

Para visualizar os dados no MongoDB, execute o seguinte comando para conectar ao seu conjunto de réplica utilizando o mongosh:

mongosh "mongodb://mongo1"

No prompt, digite os seguintes comandos para recuperar todos os documentos no namespace MongoDB do Stocks.StockDataMigrate:

use Stocks
db.StockDataMigrate.find()

Você deve ver uma lista de documentos retornados do comando que se assemelham ao seguinte documento:

{
tx_time: ISODate("2022-05-25T21:16:35.983Z"),
_id: ObjectId("628e9..."),
symbol: 'FAV',
price: 18.43,
company_name: 'FUZZY ATTACK VENTURES'
}

Neste tutorial, você criou um gerador de dados de código de ações que gravava dados periodicamente em uma collection do MongoDB. Você configurou um connector para copiar os dados em um Kafka e configurou um connector para gravar esses dados em uma nova coleção de séries temporais MongoDB.

Leia os seguintes recursos para saber mais sobre os conceitos mencionados neste tutorial:

Voltar

Replicar dados com um manipulador de captura de dados de alteração