스트리밍 모드로 MongoDB에 쓰기
MongoDB 에 데이터를 쓰기 (write) 려면 Dataset<Row>
객체 에서 writeStream()
메서드를 호출합니다. 이 메서드는 스트리밍 쓰기 (write) 작업의 형식 및 기타 구성 설정을 지정하는 데 사용할 수 있는 DataStreamWriter
객체 를 반환합니다.
MongoDB 에 쓰기 (write) 려면 다음 구성 설정을 지정해야 합니다.
설정 | 설명 |
---|---|
writeStream.format() | 기본 출력 데이터 소스 의 형식을 지정합니다. mongodb 을(를) 사용하여 MongoDB 에 쓰기 (write) . |
writeStream.option() | MongoDB 배포서버 연결 string, MongoDB 데이터베이스 및 컬렉션, 체크포인트 디렉토리 등의 스트림 설정을 지정합니다. 쓰기 (write) 스트림 구성 옵션 목록은 스트리밍 쓰기 구성 옵션 가이드 를 참조하세요. |
writeStream.outputMode() | 스트리밍 DataFrame의 데이터가 스트리밍 싱크에 기록되는 방법을 지정합니다. 지원되는 모든 출력 모드의 목록을 보려면 Java outputMode 설명서를 참조하세요. |
writeStream.trigger() | Spark Connector 가 스트리밍 싱크에 결과를 쓰는 빈도를 지정합니다. 구성한 연속 처리 를 사용하려면 지원되는 모든 처리 정책 목록을 보려면 Java trigger 설명서를 참조하세요. |
다음 코드 스니펫은 이전 구성 설정을 사용하여 데이터를 MongoDB 로 스트림 하는 방법을 보여줍니다.
<streaming DataFrame>.writeStream() .format("mongodb") .option("spark.mongodb.connection.uri", <mongodb-connection-string>) .option("spark.mongodb.database", <database-name>) .option("spark.mongodb.collection", <collection-name>) .outputMode("append");
MongoDB 에 데이터를 쓰기 (write) 려면 DataFrame
객체 에서 writeStream
함수를 호출합니다. 이 함수는 스트리밍 쓰기 (write) 작업의 형식 및 기타 구성 설정을 지정하는 데 사용할 수 있는 DataStreamWriter
객체 를 반환합니다.
MongoDB 에 쓰기 (write) 려면 다음 구성 설정을 지정해야 합니다.
설정 | 설명 |
---|---|
writeStream.format() | 기본 출력 데이터 소스 의 형식을 지정합니다. mongodb 을(를) 사용하여 MongoDB 에 쓰기 (write) . |
writeStream.option() | MongoDB 배포서버 연결 string, MongoDB 데이터베이스 및 컬렉션, 체크포인트 디렉토리 등의 스트림 설정을 지정합니다. 쓰기 (write) 스트림 구성 옵션 목록은 스트리밍 쓰기 구성 옵션 가이드 를 참조하세요. |
writeStream.outputMode() | Spark Connector 가 스트리밍 DataFrame을 스트리밍 싱크에 쓰는 방법을 지정합니다. 지원되는 모든 출력 모드의 목록을 보려면 pyspark outputMode 문서를 참조하세요. |
writeStream.trigger() | Spark Connector 가 스트리밍 싱크에 결과를 쓰는 빈도를 지정합니다. 구성한 연속 처리 를 사용하려면 지원되는 모든 처리 정책 목록을 보려면 pyspark trigger 설명서를 참조하세요. |
다음 코드 스니펫은 이전 구성 설정을 사용하여 데이터를 MongoDB 로 스트림 하는 방법을 보여줍니다.
<streaming DataFrame>.writeStream \ .format("mongodb") \ .option("spark.mongodb.connection.uri", <mongodb-connection-string>) \ .option("spark.mongodb.database", <database-name>) \ .option("spark.mongodb.collection", <collection-name>) \ .outputMode("append")
전체 함수 목록은 pyspark 구조화된 스트리밍 참조를 확인하세요.
MongoDB 에 데이터를 쓰기 (write) 려면 DataFrame
객체 에서 write
메서드를 호출합니다. 이 메서드는 스트리밍 쓰기 (write) 작업의 형식 및 기타 구성 설정을 지정하는 데 사용할 수 있는 DataStreamWriter
객체 를 반환합니다.
MongoDB 에 쓰기 (write) 려면 다음 구성 설정을 지정해야 합니다.
설정 | 설명 |
---|---|
writeStream.format() | 기본 출력 데이터 소스 의 형식을 지정합니다. mongodb 을(를) 사용하여 MongoDB 에 쓰기 (write) . |
writeStream.option() | MongoDB 배포서버 연결 string, MongoDB 데이터베이스 및 컬렉션, 체크포인트 디렉토리 등의 스트림 설정을 지정합니다. 쓰기 (write) 스트림 구성 옵션 목록은 스트리밍 쓰기 구성 옵션 가이드 를 참조하세요. |
writeStream.outputMode() | Spark Connector 가 스트리밍 DataFrame을 스트리밍 싱크에 쓰는 방법을 지정합니다. 지원되는 모든 출력 모드의 목록을 보려면 Scala 출력 모드 문서를 참조하세요. |
writeStream.trigger() | Spark Connector 가 스트리밍 싱크에 결과를 쓰는 빈도를 지정합니다. 구성한 연속 처리 를 사용하려면 지원되는 모든 처리 정책 목록을 보려면 Scala trigger 문서를 참조하세요. |
다음 코드 스니펫은 이전 구성 설정을 사용하여 데이터를 MongoDB 로 스트림 하는 방법을 보여줍니다.
<streaming DataFrame>.writeStream .format("mongodb") .option("spark.mongodb.connection.uri", <mongodb-connection-string>) .option("spark.mongodb.database", <database-name>) .option("spark.mongodb.collection", <collection-name>) .outputMode("append")
예시
다음 예제는 CSV 파일에서 MongoDB로 데이터를 스트리밍하는 방법을 보여줍니다.
CSV 파일에서 읽는
DataStreamReader
객체를 만듭니다.DataStreamWriter
객체 를 만들려면DataStreamReader
으로 만든 스트리밍Dataset<Row>
에서writeStream()
메서드를 호출합니다.format()
메서드를 사용하여mongodb
을 기본 데이터 형식으로 지정합니다.DataStreamWriter
객체 에서start()
메서드를 호출하여 스트림 을 시작합니다.
connector 는 CSV 파일 에서 데이터를 읽을 때 지정한 outputMode
에 따라 해당 데이터를 MongoDB 에 추가합니다.
// create a local SparkSession SparkSession spark = SparkSession.builder() .appName("writeExample") .master("spark://spark-master:<port>") .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") .getOrCreate(); // define a streaming query DataStreamWriter<Row> dataStreamWriter = spark.readStream() .format("csv") .option("header", "true") .schema("<csv-schema>") .load("<csv-file-name>") // manipulate your streaming data .writeStream() .format("mongodb") .option("checkpointLocation", "/tmp/") .option("forceDeleteTempCheckpointLocation", "true") .option("spark.mongodb.connection.uri", "<mongodb-connection-string>") .option("spark.mongodb.database", "<database-name>") .option("spark.mongodb.collection", "<collection-name>") .outputMode("append"); // run the query StreamingQuery query = dataStreamWriter.start();
CSV 파일에서 읽는
DataStreamReader
객체를 만듭니다.DataStreamWriter
객체 를 만들려면DataStreamReader
으로 만든 스트리밍DataFrame
에서writeStream
함수를 호출합니다.format()
함수를 사용하여mongodb
을 기본 데이터 형식으로 지정합니다.DataStreamWriter
인스턴스 에서start()
함수를 호출하여 스트림 을 시작합니다.
connector 는 CSV 파일 에서 데이터를 읽을 때 지정한 outputMode
에 따라 해당 데이터를 MongoDB 에 추가합니다.
# create a local SparkSession spark = SparkSession.builder \ .appName("writeExample") \ .master("spark://spark-master:<port>") \ .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") \ .getOrCreate() # define a streaming query dataStreamWriter = (spark.readStream .format("csv") .option("header", "true") .schema(<csv-schema>) .load(<csv-file-name>) # manipulate your streaming data .writeStream .format("mongodb") .option("checkpointLocation", "/tmp/pyspark/") .option("forceDeleteTempCheckpointLocation", "true") .option("spark.mongodb.connection.uri", <mongodb-connection-string>) .option("spark.mongodb.database", <database-name>) .option("spark.mongodb.collection", <collection-name>) .outputMode("append") ) # run the query query = dataStreamWriter.start()
CSV 파일에서 읽는
DataStreamReader
객체를 만듭니다.DataStreamWriter
객체 를 만들려면DataStreamReader
으로 만든 스트리밍DataFrame
에서writeStream
메서드를 호출합니다.format()
메서드를 사용하여mongodb
을 기본 데이터 형식으로 지정합니다.DataStreamWriter
인스턴스에서start()
메서드를 호출하여 스트림을 시작합니다.
connector 는 CSV 파일 에서 데이터를 읽을 때 지정한 outputMode
에 따라 해당 데이터를 MongoDB 에 추가합니다.
// create a local SparkSession val spark = SparkSession.builder .appName("writeExample") .master("spark://spark-master:<port>") .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") .getOrCreate() // define a streaming query val dataStreamWriter = spark.readStream .format("csv") .option("header", "true") .schema(<csv-schema>) .load(<csv-file-name>) // manipulate your streaming data .writeStream .format("mongodb") .option("checkpointLocation", "/tmp/") .option("forceDeleteTempCheckpointLocation", "true") .option("spark.mongodb.connection.uri", <mongodb-connection-string>) .option("spark.mongodb.database", <database-name>) .option("spark.mongodb.collection", <collection-name>) .outputMode("append") // run the query val query = dataStreamWriter.start()
API 문서
이 예시에 사용된 유형에 대해 자세히 알아보려면 다음 Apache Spark API 문서를 참조하세요.