Docs Menu
Docs Home
/
Spark 커넥터
/

스트리밍 모드로 MongoDB에 쓰기

MongoDB 에 데이터를 쓰기 (write) 려면 Dataset<Row> 객체 에서 writeStream() 메서드를 호출합니다. 이 메서드는 스트리밍 쓰기 (write) 작업의 형식 및 기타 구성 설정을 지정하는 데 사용할 수 있는 DataStreamWriter 객체 를 반환합니다.

MongoDB 에 쓰기 (write) 려면 다음 구성 설정을 지정해야 합니다.

설정
설명
writeStream.format()
기본 출력 데이터 소스 의 형식을 지정합니다. mongodb 을(를) 사용하여 MongoDB 에 쓰기 (write) .
writeStream.option()

MongoDB 배포서버 연결 string, MongoDB 데이터베이스 및 컬렉션, 체크포인트 디렉토리 등의 스트림 설정을 지정합니다.

쓰기 (write) 스트림 구성 옵션 목록은 스트리밍 쓰기 구성 옵션 가이드 를 참조하세요.

writeStream.outputMode()
스트리밍 DataFrame의 데이터가 스트리밍 싱크에 기록되는 방법을 지정합니다. 지원되는 모든 출력 모드의 목록을 보려면 Java outputMode 설명서를 참조하세요.
writeStream.trigger()

Spark Connector 가 스트리밍 싱크에 결과를 쓰는 빈도를 지정합니다. 구성한 DataStreamReader 에서 생성한 DataStreamWriter 객체 에 대해 이 메서드를 호출합니다.

연속 처리 를 사용하려면 Trigger.Continuous(<time value>) 을 인수로 전달합니다. 여기서 <time value> 은 Spark Connector 가 비동기적으로 체크포인트 하도록 할 빈도입니다. Trigger 클래스의 다른 정적 메서드를 전달하거나 writeStream.trigger()를 호출하지 않는 경우 Spark Connector 는 대신 마이크로 배치 처리 를 사용합니다.

지원되는 모든 처리 정책 목록을 보려면 Java trigger 설명서를 참조하세요.

다음 코드 스니펫은 이전 구성 설정을 사용하여 데이터를 MongoDB 로 스트림 하는 방법을 보여줍니다.

<streaming DataFrame>.writeStream()
.format("mongodb")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option("spark.mongodb.database", <database-name>)
.option("spark.mongodb.collection", <collection-name>)
.outputMode("append");

전체 메서드 목록은 Java 구조화된 스트리밍 참조를 확인하세요.

MongoDB 에 데이터를 쓰기 (write) 려면 DataFrame 객체 에서 writeStream 함수를 호출합니다. 이 함수는 스트리밍 쓰기 (write) 작업의 형식 및 기타 구성 설정을 지정하는 데 사용할 수 있는 DataStreamWriter 객체 를 반환합니다.

MongoDB 에 쓰기 (write) 려면 다음 구성 설정을 지정해야 합니다.

설정
설명
writeStream.format()
기본 출력 데이터 소스 의 형식을 지정합니다. mongodb 을(를) 사용하여 MongoDB 에 쓰기 (write) .
writeStream.option()

MongoDB 배포서버 연결 string, MongoDB 데이터베이스 및 컬렉션, 체크포인트 디렉토리 등의 스트림 설정을 지정합니다.

쓰기 (write) 스트림 구성 옵션 목록은 스트리밍 쓰기 구성 옵션 가이드 를 참조하세요.

writeStream.outputMode()
Spark Connector 가 스트리밍 DataFrame을 스트리밍 싱크에 쓰는 방법을 지정합니다. 지원되는 모든 출력 모드의 목록을 보려면 pyspark outputMode 문서를 참조하세요.
writeStream.trigger()

Spark Connector 가 스트리밍 싱크에 결과를 쓰는 빈도를 지정합니다. 구성한 DataStreamReader 에서 생성한 DataStreamWriter 객체 에 대해 이 메서드를 호출합니다.

연속 처리 를 사용하려면 continuous 매개변수를 사용하여 함수에 시간 값을 전달합니다. 다른 명명된 매개변수를 전달하거나 writeStream.trigger() 을(를) 호출하지 않는 경우 Spark Connector 는 대신 마이크로 배치 처리 를 사용합니다.

지원되는 모든 처리 정책 목록을 보려면 pyspark trigger 설명서를 참조하세요.

다음 코드 스니펫은 이전 구성 설정을 사용하여 데이터를 MongoDB 로 스트림 하는 방법을 보여줍니다.

<streaming DataFrame>.writeStream \
.format("mongodb") \
.option("spark.mongodb.connection.uri", <mongodb-connection-string>) \
.option("spark.mongodb.database", <database-name>) \
.option("spark.mongodb.collection", <collection-name>) \
.outputMode("append")

전체 함수 목록은 pyspark 구조화된 스트리밍 참조를 확인하세요.

MongoDB 에 데이터를 쓰기 (write) 려면 DataFrame 객체 에서 write 메서드를 호출합니다. 이 메서드는 스트리밍 쓰기 (write) 작업의 형식 및 기타 구성 설정을 지정하는 데 사용할 수 있는 DataStreamWriter 객체 를 반환합니다.

MongoDB 에 쓰기 (write) 려면 다음 구성 설정을 지정해야 합니다.

설정
설명
writeStream.format()
기본 출력 데이터 소스 의 형식을 지정합니다. mongodb 을(를) 사용하여 MongoDB 에 쓰기 (write) .
writeStream.option()

MongoDB 배포서버 연결 string, MongoDB 데이터베이스 및 컬렉션, 체크포인트 디렉토리 등의 스트림 설정을 지정합니다.

쓰기 (write) 스트림 구성 옵션 목록은 스트리밍 쓰기 구성 옵션 가이드 를 참조하세요.

writeStream.outputMode()
Spark Connector 가 스트리밍 DataFrame을 스트리밍 싱크에 쓰는 방법을 지정합니다. 지원되는 모든 출력 모드의 목록을 보려면 Scala 출력 모드 문서를 참조하세요.
writeStream.trigger()

Spark Connector 가 스트리밍 싱크에 결과를 쓰는 빈도를 지정합니다. 구성한 DataStreamReader 에서 생성한 DataStreamWriter 객체 에 대해 이 메서드를 호출합니다.

연속 처리 를 사용하려면 Trigger.Continuous(<time value>) 을 인수로 전달합니다. 여기서 <time value> 은 Spark Connector 가 비동기적으로 체크포인트 하도록 할 빈도입니다. Trigger 클래스의 다른 정적 메서드를 전달하거나 writeStream.trigger()를 호출하지 않는 경우 Spark Connector 는 대신 마이크로 배치 처리 를 사용합니다.

지원되는 모든 처리 정책 목록을 보려면 Scala trigger 문서를 참조하세요.

다음 코드 스니펫은 이전 구성 설정을 사용하여 데이터를 MongoDB 로 스트림 하는 방법을 보여줍니다.

<streaming DataFrame>.writeStream
.format("mongodb")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option("spark.mongodb.database", <database-name>)
.option("spark.mongodb.collection", <collection-name>)
.outputMode("append")

전체 Scala 메서드 목록은 구조화된 스트리밍 참조를 확인하세요.

다음 예제는 CSV 파일에서 MongoDB로 데이터를 스트리밍하는 방법을 보여줍니다.

  1. CSV 파일에서 읽는 DataStreamReader 객체를 만듭니다.

  2. DataStreamWriter 객체 를 만들려면 DataStreamReader 으로 만든 스트리밍 Dataset<Row> 에서 writeStream() 메서드를 호출합니다. format() 메서드를 사용하여 mongodb 을 기본 데이터 형식으로 지정합니다.

  3. DataStreamWriter 객체 에서 start() 메서드를 호출하여 스트림 을 시작합니다.

connector 는 CSV 파일 에서 데이터를 읽을 때 지정한 outputMode 에 따라 해당 데이터를 MongoDB 에 추가합니다.

// create a local SparkSession
SparkSession spark = SparkSession.builder()
.appName("writeExample")
.master("spark://spark-master:<port>")
.config("spark.jars", "<mongo-spark-connector-JAR-file-name>")
.getOrCreate();
// define a streaming query
DataStreamWriter<Row> dataStreamWriter = spark.readStream()
.format("csv")
.option("header", "true")
.schema("<csv-schema>")
.load("<csv-file-name>")
// manipulate your streaming data
.writeStream()
.format("mongodb")
.option("checkpointLocation", "/tmp/")
.option("forceDeleteTempCheckpointLocation", "true")
.option("spark.mongodb.connection.uri", "<mongodb-connection-string>")
.option("spark.mongodb.database", "<database-name>")
.option("spark.mongodb.collection", "<collection-name>")
.outputMode("append");
// run the query
StreamingQuery query = dataStreamWriter.start();
  1. CSV 파일에서 읽는 DataStreamReader 객체를 만듭니다.

  2. DataStreamWriter 객체 를 만들려면 DataStreamReader 으로 만든 스트리밍 DataFrame 에서 writeStream 함수를 호출합니다. format() 함수를 사용하여 mongodb 을 기본 데이터 형식으로 지정합니다.

  3. DataStreamWriter 인스턴스 에서 start() 함수를 호출하여 스트림 을 시작합니다.

connector 는 CSV 파일 에서 데이터를 읽을 때 지정한 outputMode 에 따라 해당 데이터를 MongoDB 에 추가합니다.

# create a local SparkSession
spark = SparkSession.builder \
.appName("writeExample") \
.master("spark://spark-master:<port>") \
.config("spark.jars", "<mongo-spark-connector-JAR-file-name>") \
.getOrCreate()
# define a streaming query
dataStreamWriter = (spark.readStream
.format("csv")
.option("header", "true")
.schema(<csv-schema>)
.load(<csv-file-name>)
# manipulate your streaming data
.writeStream
.format("mongodb")
.option("checkpointLocation", "/tmp/pyspark/")
.option("forceDeleteTempCheckpointLocation", "true")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option("spark.mongodb.database", <database-name>)
.option("spark.mongodb.collection", <collection-name>)
.outputMode("append")
)
# run the query
query = dataStreamWriter.start()
  1. CSV 파일에서 읽는 DataStreamReader 객체를 만듭니다.

  2. DataStreamWriter 객체 를 만들려면 DataStreamReader 으로 만든 스트리밍 DataFrame 에서 writeStream 메서드를 호출합니다. format() 메서드를 사용하여 mongodb 을 기본 데이터 형식으로 지정합니다.

  3. DataStreamWriter 인스턴스에서 start() 메서드를 호출하여 스트림을 시작합니다.

connector 는 CSV 파일 에서 데이터를 읽을 때 지정한 outputMode 에 따라 해당 데이터를 MongoDB 에 추가합니다.

// create a local SparkSession
val spark = SparkSession.builder
.appName("writeExample")
.master("spark://spark-master:<port>")
.config("spark.jars", "<mongo-spark-connector-JAR-file-name>")
.getOrCreate()
// define a streaming query
val dataStreamWriter = spark.readStream
.format("csv")
.option("header", "true")
.schema(<csv-schema>)
.load(<csv-file-name>)
// manipulate your streaming data
.writeStream
.format("mongodb")
.option("checkpointLocation", "/tmp/")
.option("forceDeleteTempCheckpointLocation", "true")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option("spark.mongodb.database", <database-name>)
.option("spark.mongodb.collection", <collection-name>)
.outputMode("append")
// run the query
val query = dataStreamWriter.start()

이 예시에 사용된 유형에 대해 자세히 알아보려면 다음 Apache Spark API 문서를 참조하세요.

돌아가기

구성