ストリーミング モードでの MongoDB への書込み (write)
MongoDB にデータを書き込むには、 Dataset<Row>
オブジェクトで writeStream()
メソッドを呼び出します。 このメソッドはDataStreamWriter
オブジェクトを返します。このオブジェクトを使用して、ストリーミング書き込み操作の形式やその他の構成設定を指定できます。
MongoDB に書き込むには、次の構成設定を指定する必要があります。
設定 | 説明 |
---|---|
writeStream.format() | 基礎の出力データソースの形式を指定します。 MongoDB に書き込むには、 mongodb を使用します。 |
writeStream.option() | MongoDB配置 接続string 、 MongoDBのデータベースとコレクション、チェックポイント ディレクトリなどのストリーム設定を指定します。 書き込みストリーム構成オプションのリストについては、 ストリーミング書き込みオプションのガイドを参照してください。 |
writeStream.outputMode() | ストリーミング DataFrame のデータをストリーミング シンクに書き込む方法を指定します。 To view a list of all supported output modes, see the Java outputMode documentation. |
writeStream.trigger() | Spark Connectorが結果をストリーミング シンクに書き込む頻度を指定します。 構成した 継続的な処理を使用するには、 サポートされているすべての処理ポリシーのリストを表示するには、 Java trigger のドキュメント を参照してください。 |
次のコード スニペットは、前の構成設定を使用して MongoDB にデータをストリーミングする方法を示しています。
<streaming DataFrame>.writeStream() .format("mongodb") .option("spark.mongodb.connection.uri", <mongodb-connection-string>) .option("spark.mongodb.database", <database-name>) .option("spark.mongodb.collection", <collection-name>) .outputMode("append");
メソッドの完全なリストについては、 Java Structured Streaming リファレンスを参照してください。
MongoDB にデータを書き込むには、 DataFrame
オブジェクトでwriteStream
関数を呼び出します。 この関数はDataStreamWriter
オブジェクトを返します。このオブジェクトを使用して、ストリーミング書き込み操作の形式やその他の構成設定を指定できます。
MongoDB に書き込むには、次の構成設定を指定する必要があります。
設定 | 説明 |
---|---|
writeStream.format() | 基礎の出力データソースの形式を指定します。 MongoDB に書き込むには、 mongodb を使用します。 |
writeStream.option() | MongoDB配置接続string 、 MongoDB databaseとコレクション、チェックポイント ディレクトリなどのストリーム設定を指定します。 書込みストリーム構成オプションのリストについては、「ストリーミング書込み構成オプション 」のガイドを参照してください。 |
writeStream.outputMode() | Spark Connectorがストリーミング DataFrame をストリーミング Sink に書き込む方法を指定します。 サポートされているすべての出力モードのリストを表示するには、 pyspark outputMode のドキュメントを参照してください。 |
writeStream.trigger() | Spark Connectorが結果をストリーミング シンクに書き込む頻度を指定します。 構成した 継続的な処理を使用するには、 サポートされているすべてのプロセシング ポリシーのリストを表示するには、 pyspark trigger のドキュメント を参照してください。 |
次のコード スニペットは、前の構成設定を使用して MongoDB にデータをストリーミングする方法を示しています。
<streaming DataFrame>.writeStream \ .format("mongodb") \ .option("spark.mongodb.connection.uri", <mongodb-connection-string>) \ .option("spark.mongodb.database", <database-name>) \ .option("spark.mongodb.collection", <collection-name>) \ .outputMode("append")
関数の完全なリストについては、 pyspark Structured Streaming リファレンスを参照してください。
MongoDB にデータを書き込むには、 DataFrame
オブジェクトでwrite
メソッドを呼び出します。 このメソッドはDataStreamWriter
オブジェクトを返します。このオブジェクトを使用して、ストリーミング書き込み操作の形式やその他の構成設定を指定できます。
MongoDB に書き込むには、次の構成設定を指定する必要があります。
設定 | 説明 |
---|---|
writeStream.format() | 基礎の出力データソースの形式を指定します。 MongoDB に書き込むには、 mongodb を使用します。 |
writeStream.option() | MongoDB配置接続string 、 MongoDB databaseとコレクション、チェックポイント ディレクトリなどのストリーム設定を指定します。 書込みストリーム構成オプションのリストについては、「ストリーミング書込み構成オプション 」のガイドを参照してください。 |
writeStream.outputMode() | Spark Connectorがストリーミング DataFrame をストリーミング Sink に書き込む方法を指定します。 サポートされているすべての出力モードの一覧を表示するには、 Scala 出力モード のドキュメントを参照してください。 |
writeStream.trigger() | Spark Connectorが結果をストリーミング シンクに書き込む頻度を指定します。 構成した 継続的な処理を使用するには、 サポートされているすべてのプロセシング ポリシーのリストを表示するには、Scalatrigger のドキュメント を参照してください。 |
次のコード スニペットは、前の構成設定を使用して MongoDB にデータをストリーミングする方法を示しています。
<streaming DataFrame>.writeStream .format("mongodb") .option("spark.mongodb.connection.uri", <mongodb-connection-string>) .option("spark.mongodb.database", <database-name>) .option("spark.mongodb.collection", <collection-name>) .outputMode("append")
メソッドの完全なリストについては、 Scala Structured Streaming リファレンスを参照してください。
例
次の例は、 CSVファイルから MongoDB にデータをストリーミングする方法を示しています。
CSV ファイルから読み取りを行う
DataStreamReader
オブジェクトを作成します。DataStreamWriter
オブジェクトを作成するには、DataStreamReader
で作成したストリーミングDataset<Row>
でwriteStream()
メソッドを呼び出します。format()
メソッドを使用して、基礎のデータ形式としてmongodb
を指定します。ストリームを開始するには、
DataStreamWriter
オブジェクトでstart()
メソッドを呼び出します。
コネクタは CSV ファイルからデータを読み取る際に、指定されたoutputMode
に従ってそのデータを MongoDB に追加します。
// create a local SparkSession SparkSession spark = SparkSession.builder() .appName("writeExample") .master("spark://spark-master:<port>") .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") .getOrCreate(); // define a streaming query DataStreamWriter<Row> dataStreamWriter = spark.readStream() .format("csv") .option("header", "true") .schema("<csv-schema>") .load("<csv-file-name>") // manipulate your streaming data .writeStream() .format("mongodb") .option("checkpointLocation", "/tmp/") .option("forceDeleteTempCheckpointLocation", "true") .option("spark.mongodb.connection.uri", "<mongodb-connection-string>") .option("spark.mongodb.database", "<database-name>") .option("spark.mongodb.collection", "<collection-name>") .outputMode("append"); // run the query StreamingQuery query = dataStreamWriter.start();
CSV ファイルから読み取りを行う
DataStreamReader
オブジェクトを作成します。DataStreamWriter
オブジェクトを作成するには、DataStreamReader
で作成したストリーミングDataFrame
でwriteStream
関数を呼び出します。format()
関数を使用して、基礎のデータ形式としてmongodb
を指定します。ストリームを開始するには、
DataStreamWriter
インスタンスでstart()
関数を呼び出します。
コネクタは CSV ファイルからデータを読み取る際に、指定されたoutputMode
に従ってそのデータを MongoDB に追加します。
# create a local SparkSession spark = SparkSession.builder \ .appName("writeExample") \ .master("spark://spark-master:<port>") \ .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") \ .getOrCreate() # define a streaming query dataStreamWriter = (spark.readStream .format("csv") .option("header", "true") .schema(<csv-schema>) .load(<csv-file-name>) # manipulate your streaming data .writeStream .format("mongodb") .option("checkpointLocation", "/tmp/pyspark/") .option("forceDeleteTempCheckpointLocation", "true") .option("spark.mongodb.connection.uri", <mongodb-connection-string>) .option("spark.mongodb.database", <database-name>) .option("spark.mongodb.collection", <collection-name>) .outputMode("append") ) # run the query query = dataStreamWriter.start()
CSV ファイルから読み取りを行う
DataStreamReader
オブジェクトを作成します。DataStreamWriter
オブジェクトを作成するには、DataStreamReader
で作成したストリーミングDataFrame
でwriteStream
メソッドを呼び出します。format()
メソッドを使用して、基礎のデータ形式としてmongodb
を指定します。ストリームを開始するには、
DataStreamWriter
インスタンスでstart()
メソッドを呼び出します。
コネクタは CSV ファイルからデータを読み取る際に、指定されたoutputMode
に従ってそのデータを MongoDB に追加します。
// create a local SparkSession val spark = SparkSession.builder .appName("writeExample") .master("spark://spark-master:<port>") .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") .getOrCreate() // define a streaming query val dataStreamWriter = spark.readStream .format("csv") .option("header", "true") .schema(<csv-schema>) .load(<csv-file-name>) // manipulate your streaming data .writeStream .format("mongodb") .option("checkpointLocation", "/tmp/") .option("forceDeleteTempCheckpointLocation", "true") .option("spark.mongodb.connection.uri", <mongodb-connection-string>) .option("spark.mongodb.database", <database-name>) .option("spark.mongodb.collection", <collection-name>) .outputMode("append") // run the query val query = dataStreamWriter.start()
API ドキュメント
これらの例で使用されている型の詳細については、次の Apache Spark API ドキュメントを参照してください。