스트리밍 모드로 MongoDB에서 읽기
개요
MongoDB 데이터베이스에서 스트림을 읽을 때, MongoDB Spark Connector는 마이크로 배치 처리 와 연속 처리 를 모두 지원합니다. 기본 처리 엔진인 마이크로 배치 처리는 정확히 한 번의 내결함성을 보장하여 100밀리초의 짧은 지연 시간을 달성합니다. 연속 처리는 Spark 버전 2.3에 도입된 실험적 기능으로, 최소 한 번은 보장하면서 엔드 투 엔드 지연 시간을 1밀리초만큼 낮게 달성합니다.
연속 처리 에 학습 보려면 Spark 문서를 참조하세요.
참고
커넥터는 MongoDB 배포의 변경 스트림에서 읽습니다. 변경 스트림에서 변경 이벤트를 생성하려면 데이터베이스에서 업데이트 작업을 수행합니다.
변경 스트림에 대해 자세히 알아보려면 MongoDB 매뉴얼에서 변경 스트림을 참조하세요.
MongoDB 에서 데이터를 읽으려면 SparkSession
객체 에서 readStream()
메서드를 호출합니다. 이 메서드는 스트리밍 읽기 작업의 형식 및 기타 구성 설정을 지정하는 데 사용할 수 있는 DataStreamReader
객체 를 반환합니다.
MongoDB에서 읽으려면 다음 구성 설정을 지정해야 합니다.
설정 | 설명 |
---|---|
readStream.format() | 기본 입력 데이터 데이터 소스의 형식을 지정합니다. mongodb 을(를) 사용하여 MongoDB에서 읽습니다. |
readStream.option() | MongoDB 배포서버 연결 string, MongoDB 데이터베이스 및 컬렉션, 집계 파이프라인 단계 등의 스트림 설정을 지정합니다. 읽기 스트림 구성 옵션 목록은 스트리밍 읽기 구성 옵션 가이드 를 참조하세요. |
readStream.schema() | 입력 스키마를 지정합니다. |
다음 코드 스니펫은 앞의 구성 설정을 사용하여 MongoDB에서 스트리밍된 데이터를 지속적으로 처리하는 방법을 보여줍니다. connector는 기존 데이터에 모든 새 데이터를 추가하고 초당 한 번씩 체크포인트를 /tmp/checkpointDir
에 비동기적으로 씁니다. Trigger.Continuous
매개변수를 trigger()
메서드에 전달하면 연속 처리가 가능해집니다.
import org.apache.spark.sql.streaming.Trigger; Dataset<Row> streamingDataset = <local SparkSession>.readStream() .format("mongodb") .load(); DataStreamWriter<Row> dataStreamWriter = streamingDataset.writeStream() .trigger(Trigger.Continuous("1 second")) .format("memory") .option("checkpointLocation", "/tmp/checkpointDir") .outputMode("append"); StreamingQuery query = dataStreamWriter.start();
참고
스트리밍 쿼리에서 start()
메서드를 호출할 때까지 Spark는 스트리밍을 시작하지 않습니다.
MongoDB에서 데이터를 읽으려면 SparkSession
객체에서 readStream
함수를 호출합니다. 이 함수는 스트리밍 읽기 작업의 형식 및 기타 구성 설정을 지정하는 데 사용할 수 있는 DataStreamReader
객체를 반환합니다.
MongoDB에서 읽으려면 다음 구성 설정을 지정해야 합니다.
설정 | 설명 |
---|---|
readStream.format() | 기본 입력 데이터 데이터 소스의 형식을 지정합니다. mongodb 을(를) 사용하여 MongoDB에서 읽습니다. |
readStream.option() | MongoDB 배포서버 연결 string, MongoDB 데이터베이스 및 컬렉션, 집계 파이프라인 단계 등의 스트림 설정을 지정합니다. 읽기 스트림 구성 옵션 목록은 스트리밍 읽기 구성 옵션 가이드를 참조하세요. |
readStream.schema() | 입력 스키마를 지정합니다. |
다음 코드 스니펫은 앞의 구성 설정을 사용하여 MongoDB에서 스트리밍된 데이터를 지속적으로 처리하는 방법을 보여줍니다. connector는 기존 데이터에 모든 새 데이터를 추가하고 초당 한 번씩 체크포인트를 /tmp/checkpointDir
에 비동기적으로 씁니다. continuous
매개변수를 trigger()
메서드에 전달하면 연속 처리가 가능해집니다.
streamingDataFrame = (<local SparkSession>.readStream .format("mongodb") .load() ) dataStreamWriter = (streamingDataFrame.writeStream .trigger(continuous="1 second") .format("memory") .option("checkpointLocation", "/tmp/checkpointDir") .outputMode("append") ) query = dataStreamWriter.start()
참고
스트리밍 쿼리에서 start()
메서드를 호출할 때까지 Spark는 스트리밍을 시작하지 않습니다.
MongoDB에서 데이터를 읽으려면 SparkSession
객체에서 readStream
메서드를 호출합니다. 이 메서드는 스트리밍 읽기 작업의 형식 및 기타 구성 설정을 지정하는 데 사용할 수 있는 DataStreamReader
객체를 반환합니다.
MongoDB에서 읽으려면 다음 구성 설정을 지정해야 합니다.
설정 | 설명 |
---|---|
readStream.format() | 기본 입력 데이터 데이터 소스의 형식을 지정합니다. mongodb 을(를) 사용하여 MongoDB에서 읽습니다. |
readStream.option() | MongoDB 배포서버 연결 string, MongoDB 데이터베이스 및 컬렉션, 집계 파이프라인 단계 등의 스트림 설정을 지정합니다. 읽기 스트림 구성 옵션 목록은 스트리밍 읽기 구성 옵션 가이드를 참조하세요. |
readStream.schema() | 입력 스키마를 지정합니다. |
다음 코드 스니펫은 앞의 구성 설정을 사용하여 MongoDB에서 스트리밍된 데이터를 지속적으로 처리하는 방법을 보여줍니다. connector는 기존 데이터에 모든 새 데이터를 추가하고 초당 한 번씩 체크포인트를 /tmp/checkpointDir
에 비동기적으로 씁니다. Trigger.Continuous
매개변수를 trigger()
메서드에 전달하면 연속 처리가 가능해집니다.
import org.apache.spark.sql.streaming.Trigger val streamingDataFrame = <local SparkSession>.readStream .format("mongodb") .load() val dataStreamWriter = streamingDataFrame.writeStream .trigger(Trigger.Continuous("1 second")) .format("memory") .option("checkpointLocation", "/tmp/checkpointDir") .outputMode("append") val query = dataStreamWriter.start()
참고
스트리밍 쿼리에서 start()
메서드를 호출할 때까지 Spark는 스트리밍을 시작하지 않습니다.
예시
다음 예제는 MongoDB에서 콘솔로 데이터를 스트리밍하는 방법을 보여줍니다.
MongoDB에서 읽는
DataStreamReader
객체를 만듭니다.DataStreamReader
으로 생성한 스트리밍Dataset
객체에서writeStream()
메서드를 호출하여DataStreamWriter
객체를 생성합니다.format()
메서드를 사용하여console
형식을 지정합니다.DataStreamWriter
인스턴스에서start()
메서드를 호출하여 스트림을 시작합니다.
새 데이터가 MongoDB에 삽입되면 MongoDB는 지정한 outputMode
에 따라 해당 데이터를 콘솔로 스트리밍합니다.
중요
대용량 데이터 세트를 콘솔로 스트리밍하지 않도록 합니다. 콘솔로의 스트리밍은 메모리 집약적이며 테스트 목적으로만 사용됩니다.
// create a local SparkSession SparkSession spark = SparkSession.builder() .appName("readExample") .master("spark://spark-master:<port>") .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") .getOrCreate(); // define the schema of the source collection StructType readSchema = new StructType() .add("company_symbol", DataTypes.StringType) .add("company_name", DataTypes.StringType) .add("price", DataTypes.DoubleType) .add("tx_time", DataTypes.TimestampType); // define a streaming query DataStreamWriter<Row> dataStreamWriter = spark.readStream() .format("mongodb") .option("spark.mongodb.connection.uri", "<mongodb-connection-string>") .option("spark.mongodb.database", "<database-name>") .option("spark.mongodb.collection", "<collection-name>") .schema(readSchema) .load() // manipulate your streaming data .writeStream() .format("console") .trigger(Trigger.Continuous("1 second")) .outputMode("append"); // run the query StreamingQuery query = dataStreamWriter.start();
MongoDB에서 읽는
DataStreamReader
객체를 만듭니다.DataStreamReader
로 생성한 스트리밍DataFrame
에서writeStream()
메서드를 호출하여DataStreamWriter
객체를 생성합니다.format()
메서드를 사용하여console
형식을 지정합니다.DataStreamWriter
인스턴스에서start()
메서드를 호출하여 스트림을 시작합니다.
새 데이터가 MongoDB에 삽입되면 MongoDB는 지정한 outputMode
에 따라 해당 데이터를 콘솔로 스트리밍합니다.
중요
대용량 데이터 세트를 콘솔로 스트리밍하지 않도록 합니다. 콘솔로의 스트리밍은 메모리 집약적이며 테스트 목적으로만 사용됩니다.
# create a local SparkSession spark = SparkSession.builder \ .appName("readExample") \ .master("spark://spark-master:<port>") \ .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") \ .getOrCreate() # define the schema of the source collection readSchema = (StructType() .add('company_symbol', StringType()) .add('company_name', StringType()) .add('price', DoubleType()) .add('tx_time', TimestampType()) ) # define a streaming query dataStreamWriter = (spark.readStream .format("mongodb") .option("spark.mongodb.connection.uri", <mongodb-connection-string>) .option('spark.mongodb.database', <database-name>) .option('spark.mongodb.collection', <collection-name>) .schema(readSchema) .load() # manipulate your streaming data .writeStream .format("console") .trigger(continuous="1 second") .outputMode("append") ) # run the query query = dataStreamWriter.start()
MongoDB에서 읽는
DataStreamReader
객체를 만듭니다.DataStreamReader
를 사용하여 생성한 스트리밍DataFrame
객체에서writeStream()
메서드를 호출하여DataStreamWriter
객체를 생성합니다.format()
메서드를 사용하여console
형식을 지정합니다.DataStreamWriter
인스턴스에서start()
메서드를 호출하여 스트림을 시작합니다.
새 데이터가 MongoDB에 삽입되면 MongoDB는 지정한 outputMode
에 따라 해당 데이터를 콘솔로 스트리밍합니다.
중요
대용량 데이터 세트를 콘솔로 스트리밍하지 않도록 합니다. 콘솔로의 스트리밍은 메모리 집약적이며 테스트 목적으로만 사용됩니다.
// create a local SparkSession val spark = SparkSession.builder .appName("readExample") .master("spark://spark-master:<port>") .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") .getOrCreate() // define the schema of the source collection val readSchema = StructType() .add("company_symbol", StringType()) .add("company_name", StringType()) .add("price", DoubleType()) .add("tx_time", TimestampType()) // define a streaming query val dataStreamWriter = spark.readStream .format("mongodb") .option("spark.mongodb.connection.uri", <mongodb-connection-string>) .option("spark.mongodb.database", <database-name>) .option("spark.mongodb.collection", <collection-name>) .schema(readSchema) .load() // manipulate your streaming data .writeStream .format("console") .trigger(Trigger.Continuous("1 second")) .outputMode("append") // run the query val query = dataStreamWriter.start()
중요
변경 스트림의 스키마 추론하기
change.stream.publish.full.document.only
옵션을 true
로 설정하면 Spark Connector는 스캔한 문서의 스키마를 사용하여 DataFrame
의 스키마를 추론합니다. 옵션을 false
으로 설정하는 경우 스키마를 지정해야 합니다.
이 설정에 대한 자세한 내용과 change stream 구성 옵션의 전체 목록을 보려면 읽기 구성 옵션 가이드를 참조하세요.
API 문서
이 예시에 사용된 유형에 대해 자세히 알아보려면 다음 Apache Spark API 문서를 참조하세요.