Docs Menu
Docs Home
/
Spark Connector
/

ストリーミング モードでの MongoDB からの読み取り

項目一覧

  • Overview
  • API ドキュメント

MongoDB databaseからストリームを読み取る場合、MongoDB Spark Connector はマイクロバッチ処理連続処理の両方をサポートします。 デフォルトの処理エンジンであるマイクロバッチ処理は、厳密に 1 回のフォールトトレランスを保証し、エンドツーエンドのレイテンシを 100 ミリ秒未満に実現します。 継続的な処理は、Spark バージョン 2.3 で導入された実験的な機能であり、エンドツーエンドのレイテンシを 1 ミリ秒未満で実現し、少なくとも 1 回の保証で終了します。

継続的処理の詳細については、 Spark のドキュメントを参照してください。

注意

connector は、MongoDB 配置の変更ストリームから読み取りを行います。 変更ストリーム上で変更イベントを生成するには、データベースに対してアップデート操作を実行します。

Change Streams変更ストリームの詳細については、MongoDB マニュアルの 「 ストリーム」 を参照してください。

MongoDB からデータを読み取るには、 SparkSessionオブジェクトで readStream()メソッドを呼び出します。 このメソッドはDataStreamReaderオブジェクトを返します。このオブジェクトを使用して、ストリーミング読み取り操作の形式やその他の構成設定を指定できます。

MongoDB から読み取るには、次の構成設定を指定する必要があります。

設定
説明
readStream.format()
基礎となる入力データソースの形式を指定します。 MongoDB から読み取るには、 mongodbを使用します。
readStream.option()

MongoDB配置接続string 、 MongoDB database とコレクション、および 集計パイプライン ステージ などのストリーム設定を指定します。

読み取りストリーム構成オプションのリストについては、 ストリーミング読み取り構成オプションのガイドを参照してください。

readStream.schema()
入力スキーマを指定します。

次のコード スニペットは、前述の構成設定を使用して、MongoDB からストリーミングされたデータを継続的に処理する方法を示しています。 connectorはすべての新しいデータを既存のデータに追加し、1 秒ごとに 1 回チェックポイントを非同期に/tmp/checkpointDirに非同期に書込みます。 Trigger.Continuousパラメータをtrigger()メソッドに渡すと、継続的な処理が可能になります。

import org.apache.spark.sql.streaming.Trigger;
Dataset<Row> streamingDataset = <local SparkSession>.readStream()
.format("mongodb")
.load();
DataStreamWriter<Row> dataStreamWriter = streamingDataset.writeStream()
.trigger(Trigger.Continuous("1 second"))
.format("memory")
.option("checkpointLocation", "/tmp/checkpointDir")
.outputMode("append");
StreamingQuery query = dataStreamWriter.start();

注意

Spark では、ストリーミング クエリでstart()メソッドを呼び出すまでストリーミングは開始されません。

メソッドの完全なリストについては、 Java Structured Streaming リファレンスを参照してください。

MongoDB からデータを読み取るには、 SparkSessionオブジェクトでreadStream関数を呼び出します。 この関数はDataStreamReaderオブジェクトを返します。このオブジェクトを使用して、ストリーミング読み取り操作の形式やその他の構成設定を指定できます。

MongoDB から読み取るには、次の構成設定を指定する必要があります。

設定
説明
readStream.format()
基礎となる入力データソースの形式を指定します。 MongoDB から読み取るには、 mongodbを使用します。
readStream.option()

MongoDB配置接続string 、 MongoDB database とコレクション、および 集計パイプライン ステージ などのストリーム設定を指定します。

読み取りストリーム構成オプションのリストについては、「ストリーミング読み取り構成オプション 」のガイドを参照してください。

readStream.schema()
入力スキーマを指定します。

次のコード スニペットは、前述の構成設定を使用して、MongoDB からストリーミングされたデータを継続的に処理する方法を示しています。 connectorはすべての新しいデータを既存のデータに追加し、1 秒ごとに 1 回チェックポイントを非同期に/tmp/checkpointDirに非同期に書込みます。 continuousパラメータをtrigger()メソッドに渡すと、継続的な処理が可能になります。

streamingDataFrame = (<local SparkSession>.readStream
.format("mongodb")
.load()
)
dataStreamWriter = (streamingDataFrame.writeStream
.trigger(continuous="1 second")
.format("memory")
.option("checkpointLocation", "/tmp/checkpointDir")
.outputMode("append")
)
query = dataStreamWriter.start()

注意

Spark では、ストリーミング クエリでstart()メソッドを呼び出すまでストリーミングは開始されません。

メソッドの完全なリストについては、 pyspark Structured Streaming リファレンスを参照してください。

MongoDB からデータを読み取るには、 SparkSessionオブジェクトでreadStreamメソッドを呼び出します。 このメソッドはDataStreamReaderオブジェクトを返します。このオブジェクトを使用して、ストリーミング読み取り操作の形式やその他の構成設定を指定できます。

MongoDB から読み取るには、次の構成設定を指定する必要があります。

設定
説明
readStream.format()
基礎となる入力データソースの形式を指定します。 MongoDB から読み取るには、 mongodbを使用します。
readStream.option()

MongoDB配置接続string 、 MongoDB database とコレクション、および 集計パイプライン ステージ などのストリーム設定を指定します。

読み取りストリーム構成オプションのリストについては、「ストリーミング読み取り構成オプション 」のガイドを参照してください。

readStream.schema()
入力スキーマを指定します。

次のコード スニペットは、前述の構成設定を使用して、MongoDB からストリーミングされたデータを継続的に処理する方法を示しています。 connectorはすべての新しいデータを既存のデータに追加し、1 秒ごとに 1 回チェックポイントを非同期に/tmp/checkpointDirに非同期に書込みます。 Trigger.Continuousパラメータをtrigger()メソッドに渡すと、継続的な処理が可能になります。

import org.apache.spark.sql.streaming.Trigger
val streamingDataFrame = <local SparkSession>.readStream
.format("mongodb")
.load()
val dataStreamWriter = streamingDataFrame.writeStream
.trigger(Trigger.Continuous("1 second"))
.format("memory")
.option("checkpointLocation", "/tmp/checkpointDir")
.outputMode("append")
val query = dataStreamWriter.start()

注意

Spark では、ストリーミング クエリでstart()メソッドを呼び出すまでストリーミングは開始されません。

メソッドの完全なリストについては、 Scala Structured Streaming リファレンスを参照してください。

次の例は、MongoDB からコンソールにデータをストリーミングする方法を示しています。

  1. MongoDB から読み取るDataStreamReaderオブジェクトを作成します。

  2. DataStreamReaderで作成したストリーミングDatasetオブジェクトでwriteStream()メソッドを呼び出して、 DataStreamWriterオブジェクトを作成します。 format()メソッドを使用して形式consoleを指定します。

  3. ストリームを開始するには、 DataStreamWriterインスタンスでstart()メソッドを呼び出します。

新しいデータが MongoDB に挿入されると、MongoDB は指定されたoutputModeに従ってそのデータをコンソールにストリーミングします。

重要

大規模なデータセットをコンソールにストリーミングしないでください。 コンソールへのストリーミングはメモリを集中的に消費するため、テスト目的のみで使用します。

// create a local SparkSession
SparkSession spark = SparkSession.builder()
.appName("readExample")
.master("spark://spark-master:<port>")
.config("spark.jars", "<mongo-spark-connector-JAR-file-name>")
.getOrCreate();
// define the schema of the source collection
StructType readSchema = new StructType()
.add("company_symbol", DataTypes.StringType)
.add("company_name", DataTypes.StringType)
.add("price", DataTypes.DoubleType)
.add("tx_time", DataTypes.TimestampType);
// define a streaming query
DataStreamWriter<Row> dataStreamWriter = spark.readStream()
.format("mongodb")
.option("spark.mongodb.connection.uri", "<mongodb-connection-string>")
.option("spark.mongodb.database", "<database-name>")
.option("spark.mongodb.collection", "<collection-name>")
.schema(readSchema)
.load()
// manipulate your streaming data
.writeStream()
.format("console")
.trigger(Trigger.Continuous("1 second"))
.outputMode("append");
// run the query
StreamingQuery query = dataStreamWriter.start();
  1. MongoDB から読み取るDataStreamReaderオブジェクトを作成します。

  2. DataStreamReaderで作成したストリーミングDataFramewriteStream()メソッドを呼び出して、 DataStreamWriterオブジェクトを作成します。 format()メソッドを使用して形式consoleを指定します。

  3. ストリームを開始するには、 DataStreamWriterインスタンスでstart()メソッドを呼び出します。

新しいデータが MongoDB に挿入されると、MongoDB は指定されたoutputModeに従ってそのデータをコンソールにストリーミングします。

重要

大規模なデータセットをコンソールにストリーミングしないでください。 コンソールへのストリーミングはメモリを集中的に消費するため、テスト目的のみで使用します。

# create a local SparkSession
spark = SparkSession.builder \
.appName("readExample") \
.master("spark://spark-master:<port>") \
.config("spark.jars", "<mongo-spark-connector-JAR-file-name>") \
.getOrCreate()
# define the schema of the source collection
readSchema = (StructType()
.add('company_symbol', StringType())
.add('company_name', StringType())
.add('price', DoubleType())
.add('tx_time', TimestampType())
)
# define a streaming query
dataStreamWriter = (spark.readStream
.format("mongodb")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option('spark.mongodb.database', <database-name>)
.option('spark.mongodb.collection', <collection-name>)
.schema(readSchema)
.load()
# manipulate your streaming data
.writeStream
.format("console")
.trigger(continuous="1 second")
.outputMode("append")
)
# run the query
query = dataStreamWriter.start()
  1. MongoDB から読み取るDataStreamReaderオブジェクトを作成します。

  2. DataStreamReaderを使用して作成したストリーミングDataFrameオブジェクトでwriteStream()メソッドを呼び出して、 DataStreamWriterオブジェクトを作成します。 format()メソッドを使用して形式consoleを指定します。

  3. ストリームを開始するには、 DataStreamWriterインスタンスでstart()メソッドを呼び出します。

新しいデータが MongoDB に挿入されると、MongoDB は指定されたoutputModeに従ってそのデータをコンソールにストリーミングします。

重要

大規模なデータセットをコンソールにストリーミングしないでください。 コンソールへのストリーミングはメモリを集中的に消費するため、テスト目的のみで使用します。

// create a local SparkSession
val spark = SparkSession.builder
.appName("readExample")
.master("spark://spark-master:<port>")
.config("spark.jars", "<mongo-spark-connector-JAR-file-name>")
.getOrCreate()
// define the schema of the source collection
val readSchema = StructType()
.add("company_symbol", StringType())
.add("company_name", StringType())
.add("price", DoubleType())
.add("tx_time", TimestampType())
// define a streaming query
val dataStreamWriter = spark.readStream
.format("mongodb")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option("spark.mongodb.database", <database-name>)
.option("spark.mongodb.collection", <collection-name>)
.schema(readSchema)
.load()
// manipulate your streaming data
.writeStream
.format("console")
.trigger(Trigger.Continuous("1 second"))
.outputMode("append")
// run the query
val query = dataStreamWriter.start()

重要

変更ストリームのスキーマの推論

change.stream.publish.full.document.onlyオプションをtrueに設定すると、Spark Connector はスキャンされたドキュメントのスキーマを使用してDataFrameのスキーマを推論します。 オプションをfalseに設定する場合は、スキーマを指定する必要があります。

この設定の詳細と、変更ストリーム構成オプションの完全なリストについては、読み取り構成オプションのガイドを参照してください。

これらの例で使用されている型の詳細については、次の Apache Spark API ドキュメントを参照してください。

戻る

ストリーミング モード