Docs Menu
Docs Home
/
MongoDB Kafka Connector
/

기존 컬렉션을 시계열 컬렉션으로 마이그레이션Migrate an existing collection to a time series collection

이 페이지의 내용

  • collection을 time series 컬렉션으로 마이그레이션하기
  • 요약
  • 자세히 알아보기

이 튜토리얼을 따라 MongoDB Kafka Connector를 사용하여 기존 MongoDB collection을 time series 컬렉션 으로 변환하는 방법을 알아보세요.

time series 컬렉션은 time-series 데이터를 효율적으로 저장합니다. time-series 데이터는 시간 간격으로 수행한 측정값, 측정값을 설명하는 메타데이터 및 측정 시간으로 구성됩니다.

를 사용하여 컬렉션에서 Time Series 컬렉션으로 데이터를 변환하려면 MongoDB connector 다음 작업을 수행해야 합니다.

  1. 컬렉션의 모든 문서에 공통된 시간 필드를 식별합니다.

  2. 기존 collection 데이터를 Kafka 주제에 복사하도록 source connector를 구성합니다.

  3. connector를 구성하여 Kafka를 time series 컬렉션에 복사합니다.

이 튜토리얼에서는 이러한 선행 작업을 수행하여 collection에서 time series 컬렉션으로 스톡 데이터를 마이그레이션합니다. time series 컬렉션은 데이터를 보다 효율적으로 저장하고 인덱싱하며 aggregation 연산자를 사용하여 시간 경과에 따른 주식 성과를 분석하는 기능을 유지합니다.

1

Kafka Connector 튜토리얼 설정 의 단계를 완료하여 Confluent Kafka Connect 및 MongoDB 환경을 시작합니다.

2

다음 명령을 실행하여 Docker 환경에서 튜토리얼 MongoDB 복제본 세트에 조작된 주식 기호와 해당 가격이 포함된 샘플 collection을 생성하는 스크립트를 시작합니다.

docker exec -ti mongo1 /bin/bash -c "cd /stockgenmongo/ && python3 stockgen.py -db Stocks -col PriceData"

데이터 생성기가 실행되기 시작하면 다음과 유사한 데이터가 생성된 것을 볼 수 있습니다.

...
1 _id=528e9... MSP MASSIVE SUBMARINE PARTNERS traded at 31.08 2022-05-25 21:15:15
2 _id=528e9... RWH RESPONSIVE_WHOLESALER HOLDINGS traded at 18.42 2022-05-25 21:15:15
3 _id=528e9... FAV FUZZY ATTACK VENTURES traded at 31.08 2022-05-25 21:15:15
...
3

별도의 터미널 창에서 다음 명령을 사용하여 튜토리얼 설정을 위해 다운로드한 튜토리얼 Docker container에 대해 대화형 shell 세션을 생성합니다.

docker exec -it mongo1 /bin/bash

다음 명령을 사용하여 stock-source.json(이)라는 소스 구성 파일을 만듭니다.

nano stock-source.json

다음 구성 정보를 파일에 붙여넣고 변경 사항을 저장합니다.

{
"name": "mongo-source-marketdata",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"publish.full.document.only": "true",
"connection.uri": "mongodb://mongo1",
"topic.prefix": "marketdata",
"database": "Stocks",
"collection": "PriceData",
"copy.existing": "true"
}
}

이 구성은 connector가 PriceData MongoDB collection의 기존 데이터를 marketdata.Stocks.PriceData Kafka topic으로 복사하고, 완료되면 해당 collection에 삽입되는 모든 향후 데이터를 복사하도록 지시합니다.

셸에서 다음 명령을 실행하여 만든 구성 파일을 사용하여 싱크 커넥터를 시작합니다.

cx stock-source.json

참고

cx 명령은 튜토리얼 개발 환경에 포함된 맞춤 스크립트입니다. 이 스크립트는 Kafka Connect REST API에 대해 다음과 같은 동일한 요청을 실행하여 새 커넥터를 생성합니다.

curl -X POST -H "Content-Type: application/json" -d @stock-source.json http://connect:8083/connectors -w "\n"

커넥터의 상태를 확인하려면 셸에서 다음 명령을 실행합니다.

status

싱크 커넥터가 성공적으로 시작되면 다음 출력이 표시됩니다.

Kafka topics:
...
The status of the connectors:
source | mongo-source-marketdata | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector
Currently configured connectors
[
"mongo-source-marketdata"
]
...

소스 connector가 시작되면 다음 명령을 실행하여 Kafka topic이 collection 데이터를 수신했는지 확인합니다.

kafkacat -b broker:29092 -C -t marketdata.Stocks.PriceData

출력에는 다음과 유사한 source connector에 의해 게시된 주제 데이터가 표시되어야 합니다.

{"schema":{ ... }, "payload": "{ "_id": { "$oid": "628e9..."}, "company_symbol": "MSP", "Company_name": "MASSIVE SUBMARINE PARTNERS", "price": 309.98, "tx_time": { "$date": 16535..." }"}

CTRL+C 를 입력하여 kafkacat 을 종료할 수 있습니다.

4

Kafka 주제에서 데이터를 읽고 이를 Stocks 데이터베이스의 StockDataMigrate time series 컬렉션에 쓰도록 connector를 구성합니다.

다음 명령을 사용하여 stock-sink.json (이)라는 싱크 구성 파일을 만듭니다.

nano stock-sink.json

다음 구성 정보를 파일에 붙여넣고 변경 사항을 저장합니다.

{
"name": "mongo-sink-marketdata",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"topics": "marketdata.Stocks.PriceData",
"connection.uri": "mongodb://mongo1",
"database": "Stocks",
"collection": "StockDataMigrate",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"timeseries.timefield": "tx_time",
"timeseries.timefield.auto.convert": "true",
"timeseries.timefield.auto.convert.date.format": "yyyy-MM-dd'T'HH:mm:ss'Z'"
}
}

위의 싱크 connector 구성에서는 시간 필드 날짜 형식 변환기를 사용합니다. 또는 TimestampConverter 단일 메시지 변환(SMT)을 사용하여 tx_time 필드를 String 에서 ISODate 로 변환할 수 있습니다. TimestampConverter SMT를 사용하는 경우 Kafka 주제의 데이터에 대한 스키마를 정의해야 합니다.

SMT를 사용하는 방법에 대한 자세한 내용은 TimestampConverter TimestampConverter 를 참조하세요. Confluent 문서.

shell에서 다음 명령을 실행하여 업데이트한 구성 파일을 사용하여 connector를 시작합니다.

cx stock-sink.json

connector가 주제 데이터 처리를 완료하면 StockDataMigrate time series 컬렉션의 문서에 tx_time ISODate 유형 값을 가진 필드가 포함됩니다.

5

connector가 주제 데이터 처리를 완료하면 StockDataMigrate time series 컬렉션에 PriceData collection의 모든 시장 데이터가 포함되어야 합니다.

MongoDB에서 데이터를 보려면 다음 명령을 실행하여 mongosh 을(를) 사용하여 복제본 세트에 연결합니다.

mongosh "mongodb://mongo1"

프롬프트에서 다음 명령을 입력하여 Stocks.StockDataMigrate MongoDB 네임스페이스의 모든 문서를 조회합니다.

use Stocks
db.StockDataMigrate.find()

다음 문서와 유사한 명령에서 반환된 문서 목록이 표시됩니다.

{
tx_time: ISODate("2022-05-25T21:16:35.983Z"),
_id: ObjectId("628e9..."),
symbol: 'FAV',
price: 18.43,
company_name: 'FUZZY ATTACK VENTURES'
}

이 튜토리얼에서는 MongoDB collection에 주기적으로 데이터를 쓰는 주식 시세 데이터 생성기를 만들었습니다. 데이터를 Kafka 주제로 복사하도록 소스 커넥터를 구성하고 해당 데이터를 새 MongoDB time series 컬렉션에 쓰도록 싱크 커넥터를 구성했습니다.

이 튜토리얼에서 언급된 개념에 대해 자세히 알아보려면 다음 리소스를 참조하세요.

돌아가기

변경 데이터 캡처 핸들러로 데이터 복제하기