Docs Menu
Docs Home
/
Spark Connector
/

ストリーミング モードでの MongoDB への書込み (write)

MongoDB にデータを書き込むには、 Dataset<Row>オブジェクトで writeStream()メソッドを呼び出します。 このメソッドはDataStreamWriterオブジェクトを返します。このオブジェクトを使用して、ストリーミング書き込み操作の形式やその他の構成設定を指定できます。

MongoDB に書き込むには、次の構成設定を指定する必要があります。

設定
説明
writeStream.format()
基礎の出力データソースの形式を指定します。 MongoDB に書き込むには、 mongodbを使用します。
writeStream.option()

MongoDB配置 接続string 、 MongoDBのデータベースとコレクション、チェックポイント ディレクトリなどのストリーム設定を指定します。

書き込みストリーム構成オプションのリストについては、 ストリーミング書き込みオプションのガイドを参照してください。

writeStream.outputMode()
ストリーミング DataFrame のデータをストリーミング シンクに書き込む方法を指定します。 To view a list of all supported output modes, see the Java outputMode documentation.
writeStream.trigger()

Spark Connectorが結果をストリーミング シンクに書き込む頻度を指定します。 構成したDataStreamReaderから作成したDataStreamWriterオブジェクトで、このメソッドを呼び出します。

継続的な処理を使用するには、 Trigger.Continuous(<time value>)を引数として渡します。ここで、 <time value>は、Spark Connector が非同期にチェックポイントを実行する頻度を表します。 Trigger クラスの他の静的メソッドを渡す場合、または writeStream.trigger() を呼び出しない場合、 Spark Connectorは代わりにマイクロバッチ処理を使用します。

サポートされているすべての処理ポリシーのリストを表示するには、 Java trigger のドキュメント を参照してください。

次のコード スニペットは、前の構成設定を使用して MongoDB にデータをストリーミングする方法を示しています。

<streaming DataFrame>.writeStream()
.format("mongodb")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option("spark.mongodb.database", <database-name>)
.option("spark.mongodb.collection", <collection-name>)
.outputMode("append");

メソッドの完全なリストについては、 Java Structured Streaming リファレンスを参照してください。

MongoDB にデータを書き込むには、 DataFrameオブジェクトでwriteStream関数を呼び出します。 この関数はDataStreamWriterオブジェクトを返します。このオブジェクトを使用して、ストリーミング書き込み操作の形式やその他の構成設定を指定できます。

MongoDB に書き込むには、次の構成設定を指定する必要があります。

設定
説明
writeStream.format()
基礎の出力データソースの形式を指定します。 MongoDB に書き込むには、 mongodbを使用します。
writeStream.option()

MongoDB配置接続string 、 MongoDB databaseとコレクション、チェックポイント ディレクトリなどのストリーム設定を指定します。

書込みストリーム構成オプションのリストについては、「ストリーミング書込み構成オプション 」のガイドを参照してください。

writeStream.outputMode()
Spark Connectorがストリーミング DataFrame をストリーミング Sink に書き込む方法を指定します。 サポートされているすべての出力モードのリストを表示するには、 pyspark outputMode のドキュメントを参照してください。
writeStream.trigger()

Spark Connectorが結果をストリーミング シンクに書き込む頻度を指定します。 構成したDataStreamReaderから作成したDataStreamWriterオブジェクトで、このメソッドを呼び出します。

継続的な処理を使用するには、 continuousパラメーターを使用して関数に時間値を渡します。 他の 名前付きパラメータ を渡すか、 writeStream.trigger()を呼び出しない場合、Spark Connector は代わりにマイクロバッチ処理を使用します。

サポートされているすべてのプロセシング ポリシーのリストを表示するには、 pyspark trigger のドキュメント を参照してください。

次のコード スニペットは、前の構成設定を使用して MongoDB にデータをストリーミングする方法を示しています。

<streaming DataFrame>.writeStream \
.format("mongodb") \
.option("spark.mongodb.connection.uri", <mongodb-connection-string>) \
.option("spark.mongodb.database", <database-name>) \
.option("spark.mongodb.collection", <collection-name>) \
.outputMode("append")

関数の完全なリストについては、 pyspark Structured Streaming リファレンスを参照してください。

MongoDB にデータを書き込むには、 DataFrameオブジェクトでwriteメソッドを呼び出します。 このメソッドはDataStreamWriterオブジェクトを返します。このオブジェクトを使用して、ストリーミング書き込み操作の形式やその他の構成設定を指定できます。

MongoDB に書き込むには、次の構成設定を指定する必要があります。

設定
説明
writeStream.format()
基礎の出力データソースの形式を指定します。 MongoDB に書き込むには、 mongodbを使用します。
writeStream.option()

MongoDB配置接続string 、 MongoDB databaseとコレクション、チェックポイント ディレクトリなどのストリーム設定を指定します。

書込みストリーム構成オプションのリストについては、「ストリーミング書込み構成オプション 」のガイドを参照してください。

writeStream.outputMode()
Spark Connectorがストリーミング DataFrame をストリーミング Sink に書き込む方法を指定します。 サポートされているすべての出力モードの一覧を表示するには、 Scala 出力モード のドキュメントを参照してください。
writeStream.trigger()

Spark Connectorが結果をストリーミング シンクに書き込む頻度を指定します。 構成したDataStreamReaderから作成したDataStreamWriterオブジェクトで、このメソッドを呼び出します。

継続的な処理を使用するには、 Trigger.Continuous(<time value>)を引数として渡します。ここで、 <time value>は、Spark Connector が非同期にチェックポイントを実行する頻度を表します。 Trigger クラスの他の静的メソッドを渡す場合、または writeStream.trigger() を呼び出しない場合、 Spark Connectorは代わりにマイクロバッチ処理を使用します。

サポートされているすべてのプロセシング ポリシーのリストを表示するには、Scalatrigger のドキュメント を参照してください。

次のコード スニペットは、前の構成設定を使用して MongoDB にデータをストリーミングする方法を示しています。

<streaming DataFrame>.writeStream
.format("mongodb")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option("spark.mongodb.database", <database-name>)
.option("spark.mongodb.collection", <collection-name>)
.outputMode("append")

メソッドの完全なリストについては、 Scala Structured Streaming リファレンスを参照してください。

次の例は、 CSVファイルから MongoDB にデータをストリーミングする方法を示しています。

  1. CSV ファイルから読み取りを行うDataStreamReaderオブジェクトを作成します。

  2. DataStreamWriterオブジェクトを作成するには、 DataStreamReaderで作成したストリーミングDataset<Row>writeStream()メソッドを呼び出します。 format()メソッドを使用して、基礎のデータ形式としてmongodbを指定します。

  3. ストリームを開始するには、 DataStreamWriterオブジェクトでstart()メソッドを呼び出します。

コネクタは CSV ファイルからデータを読み取る際に、指定されたoutputModeに従ってそのデータを MongoDB に追加します。

// create a local SparkSession
SparkSession spark = SparkSession.builder()
.appName("writeExample")
.master("spark://spark-master:<port>")
.config("spark.jars", "<mongo-spark-connector-JAR-file-name>")
.getOrCreate();
// define a streaming query
DataStreamWriter<Row> dataStreamWriter = spark.readStream()
.format("csv")
.option("header", "true")
.schema("<csv-schema>")
.load("<csv-file-name>")
// manipulate your streaming data
.writeStream()
.format("mongodb")
.option("checkpointLocation", "/tmp/")
.option("forceDeleteTempCheckpointLocation", "true")
.option("spark.mongodb.connection.uri", "<mongodb-connection-string>")
.option("spark.mongodb.database", "<database-name>")
.option("spark.mongodb.collection", "<collection-name>")
.outputMode("append");
// run the query
StreamingQuery query = dataStreamWriter.start();
  1. CSV ファイルから読み取りを行うDataStreamReaderオブジェクトを作成します。

  2. DataStreamWriterオブジェクトを作成するには、 DataStreamReaderで作成したストリーミングDataFramewriteStream関数を呼び出します。 format()関数を使用して、基礎のデータ形式としてmongodbを指定します。

  3. ストリームを開始するには、 DataStreamWriterインスタンスでstart()関数を呼び出します。

コネクタは CSV ファイルからデータを読み取る際に、指定されたoutputModeに従ってそのデータを MongoDB に追加します。

# create a local SparkSession
spark = SparkSession.builder \
.appName("writeExample") \
.master("spark://spark-master:<port>") \
.config("spark.jars", "<mongo-spark-connector-JAR-file-name>") \
.getOrCreate()
# define a streaming query
dataStreamWriter = (spark.readStream
.format("csv")
.option("header", "true")
.schema(<csv-schema>)
.load(<csv-file-name>)
# manipulate your streaming data
.writeStream
.format("mongodb")
.option("checkpointLocation", "/tmp/pyspark/")
.option("forceDeleteTempCheckpointLocation", "true")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option("spark.mongodb.database", <database-name>)
.option("spark.mongodb.collection", <collection-name>)
.outputMode("append")
)
# run the query
query = dataStreamWriter.start()
  1. CSV ファイルから読み取りを行うDataStreamReaderオブジェクトを作成します。

  2. DataStreamWriterオブジェクトを作成するには、 DataStreamReaderで作成したストリーミングDataFramewriteStreamメソッドを呼び出します。 format()メソッドを使用して、基礎のデータ形式としてmongodbを指定します。

  3. ストリームを開始するには、 DataStreamWriterインスタンスでstart()メソッドを呼び出します。

コネクタは CSV ファイルからデータを読み取る際に、指定されたoutputModeに従ってそのデータを MongoDB に追加します。

// create a local SparkSession
val spark = SparkSession.builder
.appName("writeExample")
.master("spark://spark-master:<port>")
.config("spark.jars", "<mongo-spark-connector-JAR-file-name>")
.getOrCreate()
// define a streaming query
val dataStreamWriter = spark.readStream
.format("csv")
.option("header", "true")
.schema(<csv-schema>)
.load(<csv-file-name>)
// manipulate your streaming data
.writeStream
.format("mongodb")
.option("checkpointLocation", "/tmp/")
.option("forceDeleteTempCheckpointLocation", "true")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option("spark.mongodb.database", <database-name>)
.option("spark.mongodb.collection", <collection-name>)
.outputMode("append")
// run the query
val query = dataStreamWriter.start()

これらの例で使用されている型の詳細については、次の Apache Spark API ドキュメントを参照してください。

戻る

ストリーミング読み取り構成オプション