Docs Menu
Docs Home
/
Spark 커넥터
/

스트리밍 모드로 MongoDB에서 읽기

이 페이지의 내용

  • 개요
  • 예시
  • API 문서

MongoDB 데이터베이스에서 스트림을 읽을 때, MongoDB Spark Connector는 마이크로 배치 처리연속 처리 를 모두 지원합니다. 기본 처리 엔진인 마이크로 배치 처리는 정확히 한 번의 내결함성을 보장하여 100밀리초의 짧은 지연 시간을 달성합니다. 연속 처리는 Spark 버전 2.3에 도입된 실험적 기능으로, 최소 한 번은 보장하면서 엔드 투 엔드 지연 시간을 1밀리초만큼 낮게 달성합니다.

연속 처리 에 학습 보려면 Spark 문서를 참조하세요.

참고

커넥터는 MongoDB 배포의 변경 스트림에서 읽습니다. 변경 스트림에서 변경 이벤트를 생성하려면 데이터베이스에서 업데이트 작업을 수행합니다.

변경 스트림에 대해 자세히 알아보려면 MongoDB 매뉴얼에서 변경 스트림을 참조하세요.

MongoDB 에서 데이터를 읽으려면 SparkSession 객체 에서 readStream() 메서드를 호출합니다. 이 메서드는 스트리밍 읽기 작업의 형식 및 기타 구성 설정을 지정하는 데 사용할 수 있는 DataStreamReader 객체 를 반환합니다.

MongoDB에서 읽으려면 다음 구성 설정을 지정해야 합니다.

설정
설명
readStream.format()
기본 입력 데이터 데이터 소스의 형식을 지정합니다. mongodb을(를) 사용하여 MongoDB에서 읽습니다.
readStream.option()

MongoDB 배포서버 연결 string, MongoDB 데이터베이스 및 컬렉션, 집계 파이프라인 단계 등의 스트림 설정을 지정합니다.

읽기 스트림 구성 옵션 목록은 스트리밍 읽기 구성 옵션 가이드 를 참조하세요.

readStream.schema()
입력 스키마를 지정합니다.

다음 코드 스니펫은 앞의 구성 설정을 사용하여 MongoDB에서 스트리밍된 데이터를 지속적으로 처리하는 방법을 보여줍니다. connector는 기존 데이터에 모든 새 데이터를 추가하고 초당 한 번씩 체크포인트를 /tmp/checkpointDir 에 비동기적으로 씁니다. Trigger.Continuous 매개변수를 trigger() 메서드에 전달하면 연속 처리가 가능해집니다.

import org.apache.spark.sql.streaming.Trigger;
Dataset<Row> streamingDataset = <local SparkSession>.readStream()
.format("mongodb")
.load();
DataStreamWriter<Row> dataStreamWriter = streamingDataset.writeStream()
.trigger(Trigger.Continuous("1 second"))
.format("memory")
.option("checkpointLocation", "/tmp/checkpointDir")
.outputMode("append");
StreamingQuery query = dataStreamWriter.start();

참고

스트리밍 쿼리에서 start() 메서드를 호출할 때까지 Spark는 스트리밍을 시작하지 않습니다.

전체 메서드 목록은 Java 구조화된 스트리밍 참조를 확인하세요.

MongoDB에서 데이터를 읽으려면 SparkSession 객체에서 readStream 함수를 호출합니다. 이 함수는 스트리밍 읽기 작업의 형식 및 기타 구성 설정을 지정하는 데 사용할 수 있는 DataStreamReader 객체를 반환합니다.

MongoDB에서 읽으려면 다음 구성 설정을 지정해야 합니다.

설정
설명
readStream.format()
기본 입력 데이터 데이터 소스의 형식을 지정합니다. mongodb을(를) 사용하여 MongoDB에서 읽습니다.
readStream.option()

MongoDB 배포서버 연결 string, MongoDB 데이터베이스 및 컬렉션, 집계 파이프라인 단계 등의 스트림 설정을 지정합니다.

읽기 스트림 구성 옵션 목록은 스트리밍 읽기 구성 옵션 가이드를 참조하세요.

readStream.schema()
입력 스키마를 지정합니다.

다음 코드 스니펫은 앞의 구성 설정을 사용하여 MongoDB에서 스트리밍된 데이터를 지속적으로 처리하는 방법을 보여줍니다. connector는 기존 데이터에 모든 새 데이터를 추가하고 초당 한 번씩 체크포인트를 /tmp/checkpointDir 에 비동기적으로 씁니다. continuous 매개변수를 trigger() 메서드에 전달하면 연속 처리가 가능해집니다.

streamingDataFrame = (<local SparkSession>.readStream
.format("mongodb")
.load()
)
dataStreamWriter = (streamingDataFrame.writeStream
.trigger(continuous="1 second")
.format("memory")
.option("checkpointLocation", "/tmp/checkpointDir")
.outputMode("append")
)
query = dataStreamWriter.start()

참고

스트리밍 쿼리에서 start() 메서드를 호출할 때까지 Spark는 스트리밍을 시작하지 않습니다.

전체 메서드 목록은 pyspark 구조화된 스트리밍 참조를 확인하세요.

MongoDB에서 데이터를 읽으려면 SparkSession 객체에서 readStream 메서드를 호출합니다. 이 메서드는 스트리밍 읽기 작업의 형식 및 기타 구성 설정을 지정하는 데 사용할 수 있는 DataStreamReader 객체를 반환합니다.

MongoDB에서 읽으려면 다음 구성 설정을 지정해야 합니다.

설정
설명
readStream.format()
기본 입력 데이터 데이터 소스의 형식을 지정합니다. mongodb을(를) 사용하여 MongoDB에서 읽습니다.
readStream.option()

MongoDB 배포서버 연결 string, MongoDB 데이터베이스 및 컬렉션, 집계 파이프라인 단계 등의 스트림 설정을 지정합니다.

읽기 스트림 구성 옵션 목록은 스트리밍 읽기 구성 옵션 가이드를 참조하세요.

readStream.schema()
입력 스키마를 지정합니다.

다음 코드 스니펫은 앞의 구성 설정을 사용하여 MongoDB에서 스트리밍된 데이터를 지속적으로 처리하는 방법을 보여줍니다. connector는 기존 데이터에 모든 새 데이터를 추가하고 초당 한 번씩 체크포인트를 /tmp/checkpointDir 에 비동기적으로 씁니다. Trigger.Continuous 매개변수를 trigger() 메서드에 전달하면 연속 처리가 가능해집니다.

import org.apache.spark.sql.streaming.Trigger
val streamingDataFrame = <local SparkSession>.readStream
.format("mongodb")
.load()
val dataStreamWriter = streamingDataFrame.writeStream
.trigger(Trigger.Continuous("1 second"))
.format("memory")
.option("checkpointLocation", "/tmp/checkpointDir")
.outputMode("append")
val query = dataStreamWriter.start()

참고

스트리밍 쿼리에서 start() 메서드를 호출할 때까지 Spark는 스트리밍을 시작하지 않습니다.

전체 Scala 메서드 목록은 구조화된 스트리밍 참조를 확인하세요.

다음 예제는 MongoDB에서 콘솔로 데이터를 스트리밍하는 방법을 보여줍니다.

  1. MongoDB에서 읽는 DataStreamReader 객체를 만듭니다.

  2. DataStreamReader 으로 생성한 스트리밍 Dataset 객체에서 writeStream() 메서드를 호출하여 DataStreamWriter 객체를 생성합니다. format() 메서드를 사용하여 console 형식을 지정합니다.

  3. DataStreamWriter 인스턴스에서 start() 메서드를 호출하여 스트림을 시작합니다.

새 데이터가 MongoDB에 삽입되면 MongoDB는 지정한 outputMode 에 따라 해당 데이터를 콘솔로 스트리밍합니다.

중요

대용량 데이터 세트를 콘솔로 스트리밍하지 않도록 합니다. 콘솔로의 스트리밍은 메모리 집약적이며 테스트 목적으로만 사용됩니다.

// create a local SparkSession
SparkSession spark = SparkSession.builder()
.appName("readExample")
.master("spark://spark-master:<port>")
.config("spark.jars", "<mongo-spark-connector-JAR-file-name>")
.getOrCreate();
// define the schema of the source collection
StructType readSchema = new StructType()
.add("company_symbol", DataTypes.StringType)
.add("company_name", DataTypes.StringType)
.add("price", DataTypes.DoubleType)
.add("tx_time", DataTypes.TimestampType);
// define a streaming query
DataStreamWriter<Row> dataStreamWriter = spark.readStream()
.format("mongodb")
.option("spark.mongodb.connection.uri", "<mongodb-connection-string>")
.option("spark.mongodb.database", "<database-name>")
.option("spark.mongodb.collection", "<collection-name>")
.schema(readSchema)
.load()
// manipulate your streaming data
.writeStream()
.format("console")
.trigger(Trigger.Continuous("1 second"))
.outputMode("append");
// run the query
StreamingQuery query = dataStreamWriter.start();
  1. MongoDB에서 읽는 DataStreamReader 객체를 만듭니다.

  2. DataStreamReader 로 생성한 스트리밍 DataFrame 에서 writeStream() 메서드를 호출하여 DataStreamWriter 객체를 생성합니다. format() 메서드를 사용하여 console 형식을 지정합니다.

  3. DataStreamWriter 인스턴스에서 start() 메서드를 호출하여 스트림을 시작합니다.

새 데이터가 MongoDB에 삽입되면 MongoDB는 지정한 outputMode 에 따라 해당 데이터를 콘솔로 스트리밍합니다.

중요

대용량 데이터 세트를 콘솔로 스트리밍하지 않도록 합니다. 콘솔로의 스트리밍은 메모리 집약적이며 테스트 목적으로만 사용됩니다.

# create a local SparkSession
spark = SparkSession.builder \
.appName("readExample") \
.master("spark://spark-master:<port>") \
.config("spark.jars", "<mongo-spark-connector-JAR-file-name>") \
.getOrCreate()
# define the schema of the source collection
readSchema = (StructType()
.add('company_symbol', StringType())
.add('company_name', StringType())
.add('price', DoubleType())
.add('tx_time', TimestampType())
)
# define a streaming query
dataStreamWriter = (spark.readStream
.format("mongodb")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option('spark.mongodb.database', <database-name>)
.option('spark.mongodb.collection', <collection-name>)
.schema(readSchema)
.load()
# manipulate your streaming data
.writeStream
.format("console")
.trigger(continuous="1 second")
.outputMode("append")
)
# run the query
query = dataStreamWriter.start()
  1. MongoDB에서 읽는 DataStreamReader 객체를 만듭니다.

  2. DataStreamReader 를 사용하여 생성한 스트리밍 DataFrame 객체에서 writeStream() 메서드를 호출하여 DataStreamWriter 객체를 생성합니다. format() 메서드를 사용하여 console 형식을 지정합니다.

  3. DataStreamWriter 인스턴스에서 start() 메서드를 호출하여 스트림을 시작합니다.

새 데이터가 MongoDB에 삽입되면 MongoDB는 지정한 outputMode 에 따라 해당 데이터를 콘솔로 스트리밍합니다.

중요

대용량 데이터 세트를 콘솔로 스트리밍하지 않도록 합니다. 콘솔로의 스트리밍은 메모리 집약적이며 테스트 목적으로만 사용됩니다.

// create a local SparkSession
val spark = SparkSession.builder
.appName("readExample")
.master("spark://spark-master:<port>")
.config("spark.jars", "<mongo-spark-connector-JAR-file-name>")
.getOrCreate()
// define the schema of the source collection
val readSchema = StructType()
.add("company_symbol", StringType())
.add("company_name", StringType())
.add("price", DoubleType())
.add("tx_time", TimestampType())
// define a streaming query
val dataStreamWriter = spark.readStream
.format("mongodb")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option("spark.mongodb.database", <database-name>)
.option("spark.mongodb.collection", <collection-name>)
.schema(readSchema)
.load()
// manipulate your streaming data
.writeStream
.format("console")
.trigger(Trigger.Continuous("1 second"))
.outputMode("append")
// run the query
val query = dataStreamWriter.start()

중요

변경 스트림의 스키마 추론하기

change.stream.publish.full.document.only 옵션을 true 로 설정하면 Spark Connector는 스캔한 문서의 스키마를 사용하여 DataFrame 의 스키마를 추론합니다. 옵션을 false 으로 설정하는 경우 스키마를 지정해야 합니다.

이 설정에 대한 자세한 내용과 change stream 구성 옵션의 전체 목록을 보려면 읽기 구성 옵션 가이드를 참조하세요.

이 예시에 사용된 유형에 대해 자세히 알아보려면 다음 Apache Spark API 문서를 참조하세요.

돌아가기

스트리밍 모드

이 페이지의 내용