ストリーミング モードでの MongoDB からの読み取り
Overview
MongoDB databaseからストリームを読み取る場合、MongoDB Spark Connector はマイクロバッチ処理と連続処理の両方をサポートします。 デフォルトの処理エンジンであるマイクロバッチ処理は、厳密に 1 回のフォールトトレランスを保証し、エンドツーエンドのレイテンシを 100 ミリ秒未満に実現します。 継続的な処理は、Spark バージョン 2.3 で導入された実験的な機能であり、エンドツーエンドのレイテンシを 1 ミリ秒未満で実現し、少なくとも 1 回の保証で終了します。
継続的処理の詳細については、 Spark のドキュメントを参照してください。
注意
connector は、MongoDB 配置の変更ストリームから読み取りを行います。 変更ストリーム上で変更イベントを生成するには、データベースに対してアップデート操作を実行します。
Change Streams変更ストリームの詳細については、MongoDB マニュアルの 「 ストリーム」 を参照してください。
MongoDB からデータを読み取るには、 SparkSession
オブジェクトで readStream()
メソッドを呼び出します。 このメソッドはDataStreamReader
オブジェクトを返します。このオブジェクトを使用して、ストリーミング読み取り操作の形式やその他の構成設定を指定できます。
MongoDB から読み取るには、次の構成設定を指定する必要があります。
設定 | 説明 |
---|---|
readStream.format() | 基礎となる入力データソースの形式を指定します。 MongoDB から読み取るには、 mongodb を使用します。 |
readStream.option() | MongoDB配置接続string 、 MongoDB database とコレクション、および 集計パイプライン ステージ などのストリーム設定を指定します。 読み取りストリーム構成オプションのリストについては、 ストリーミング読み取り構成オプションのガイドを参照してください。 |
readStream.schema() | 入力スキーマを指定します。 |
次のコード スニペットは、前述の構成設定を使用して、MongoDB からストリーミングされたデータを継続的に処理する方法を示しています。 connectorはすべての新しいデータを既存のデータに追加し、1 秒ごとに 1 回チェックポイントを非同期に/tmp/checkpointDir
に非同期に書込みます。 Trigger.Continuous
パラメータをtrigger()
メソッドに渡すと、継続的な処理が可能になります。
import org.apache.spark.sql.streaming.Trigger; Dataset<Row> streamingDataset = <local SparkSession>.readStream() .format("mongodb") .load(); DataStreamWriter<Row> dataStreamWriter = streamingDataset.writeStream() .trigger(Trigger.Continuous("1 second")) .format("memory") .option("checkpointLocation", "/tmp/checkpointDir") .outputMode("append"); StreamingQuery query = dataStreamWriter.start();
注意
Spark では、ストリーミング クエリでstart()
メソッドを呼び出すまでストリーミングは開始されません。
メソッドの完全なリストについては、 Java Structured Streaming リファレンスを参照してください。
MongoDB からデータを読み取るには、 SparkSession
オブジェクトでreadStream
関数を呼び出します。 この関数はDataStreamReader
オブジェクトを返します。このオブジェクトを使用して、ストリーミング読み取り操作の形式やその他の構成設定を指定できます。
MongoDB から読み取るには、次の構成設定を指定する必要があります。
設定 | 説明 |
---|---|
readStream.format() | 基礎となる入力データソースの形式を指定します。 MongoDB から読み取るには、 mongodb を使用します。 |
readStream.option() | MongoDB配置接続string 、 MongoDB database とコレクション、および 集計パイプライン ステージ などのストリーム設定を指定します。 読み取りストリーム構成オプションのリストについては、「ストリーミング読み取り構成オプション 」のガイドを参照してください。 |
readStream.schema() | 入力スキーマを指定します。 |
次のコード スニペットは、前述の構成設定を使用して、MongoDB からストリーミングされたデータを継続的に処理する方法を示しています。 connectorはすべての新しいデータを既存のデータに追加し、1 秒ごとに 1 回チェックポイントを非同期に/tmp/checkpointDir
に非同期に書込みます。 continuous
パラメータをtrigger()
メソッドに渡すと、継続的な処理が可能になります。
streamingDataFrame = (<local SparkSession>.readStream .format("mongodb") .load() ) dataStreamWriter = (streamingDataFrame.writeStream .trigger(continuous="1 second") .format("memory") .option("checkpointLocation", "/tmp/checkpointDir") .outputMode("append") ) query = dataStreamWriter.start()
注意
Spark では、ストリーミング クエリでstart()
メソッドを呼び出すまでストリーミングは開始されません。
メソッドの完全なリストについては、 pyspark Structured Streaming リファレンスを参照してください。
MongoDB からデータを読み取るには、 SparkSession
オブジェクトでreadStream
メソッドを呼び出します。 このメソッドはDataStreamReader
オブジェクトを返します。このオブジェクトを使用して、ストリーミング読み取り操作の形式やその他の構成設定を指定できます。
MongoDB から読み取るには、次の構成設定を指定する必要があります。
設定 | 説明 |
---|---|
readStream.format() | 基礎となる入力データソースの形式を指定します。 MongoDB から読み取るには、 mongodb を使用します。 |
readStream.option() | MongoDB配置接続string 、 MongoDB database とコレクション、および 集計パイプライン ステージ などのストリーム設定を指定します。 読み取りストリーム構成オプションのリストについては、「ストリーミング読み取り構成オプション 」のガイドを参照してください。 |
readStream.schema() | 入力スキーマを指定します。 |
次のコード スニペットは、前述の構成設定を使用して、MongoDB からストリーミングされたデータを継続的に処理する方法を示しています。 connectorはすべての新しいデータを既存のデータに追加し、1 秒ごとに 1 回チェックポイントを非同期に/tmp/checkpointDir
に非同期に書込みます。 Trigger.Continuous
パラメータをtrigger()
メソッドに渡すと、継続的な処理が可能になります。
import org.apache.spark.sql.streaming.Trigger val streamingDataFrame = <local SparkSession>.readStream .format("mongodb") .load() val dataStreamWriter = streamingDataFrame.writeStream .trigger(Trigger.Continuous("1 second")) .format("memory") .option("checkpointLocation", "/tmp/checkpointDir") .outputMode("append") val query = dataStreamWriter.start()
注意
Spark では、ストリーミング クエリでstart()
メソッドを呼び出すまでストリーミングは開始されません。
メソッドの完全なリストについては、 Scala Structured Streaming リファレンスを参照してください。
例
次の例は、MongoDB からコンソールにデータをストリーミングする方法を示しています。
MongoDB から読み取る
DataStreamReader
オブジェクトを作成します。DataStreamReader
で作成したストリーミングDataset
オブジェクトでwriteStream()
メソッドを呼び出して、DataStreamWriter
オブジェクトを作成します。format()
メソッドを使用して形式console
を指定します。ストリームを開始するには、
DataStreamWriter
インスタンスでstart()
メソッドを呼び出します。
新しいデータが MongoDB に挿入されると、MongoDB は指定されたoutputMode
に従ってそのデータをコンソールにストリーミングします。
重要
大規模なデータセットをコンソールにストリーミングしないでください。 コンソールへのストリーミングはメモリを集中的に消費するため、テスト目的のみで使用します。
// create a local SparkSession SparkSession spark = SparkSession.builder() .appName("readExample") .master("spark://spark-master:<port>") .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") .getOrCreate(); // define the schema of the source collection StructType readSchema = new StructType() .add("company_symbol", DataTypes.StringType) .add("company_name", DataTypes.StringType) .add("price", DataTypes.DoubleType) .add("tx_time", DataTypes.TimestampType); // define a streaming query DataStreamWriter<Row> dataStreamWriter = spark.readStream() .format("mongodb") .option("spark.mongodb.connection.uri", "<mongodb-connection-string>") .option("spark.mongodb.database", "<database-name>") .option("spark.mongodb.collection", "<collection-name>") .schema(readSchema) .load() // manipulate your streaming data .writeStream() .format("console") .trigger(Trigger.Continuous("1 second")) .outputMode("append"); // run the query StreamingQuery query = dataStreamWriter.start();
MongoDB から読み取る
DataStreamReader
オブジェクトを作成します。DataStreamReader
で作成したストリーミングDataFrame
でwriteStream()
メソッドを呼び出して、DataStreamWriter
オブジェクトを作成します。format()
メソッドを使用して形式console
を指定します。ストリームを開始するには、
DataStreamWriter
インスタンスでstart()
メソッドを呼び出します。
新しいデータが MongoDB に挿入されると、MongoDB は指定されたoutputMode
に従ってそのデータをコンソールにストリーミングします。
重要
大規模なデータセットをコンソールにストリーミングしないでください。 コンソールへのストリーミングはメモリを集中的に消費するため、テスト目的のみで使用します。
# create a local SparkSession spark = SparkSession.builder \ .appName("readExample") \ .master("spark://spark-master:<port>") \ .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") \ .getOrCreate() # define the schema of the source collection readSchema = (StructType() .add('company_symbol', StringType()) .add('company_name', StringType()) .add('price', DoubleType()) .add('tx_time', TimestampType()) ) # define a streaming query dataStreamWriter = (spark.readStream .format("mongodb") .option("spark.mongodb.connection.uri", <mongodb-connection-string>) .option('spark.mongodb.database', <database-name>) .option('spark.mongodb.collection', <collection-name>) .schema(readSchema) .load() # manipulate your streaming data .writeStream .format("console") .trigger(continuous="1 second") .outputMode("append") ) # run the query query = dataStreamWriter.start()
MongoDB から読み取る
DataStreamReader
オブジェクトを作成します。DataStreamReader
を使用して作成したストリーミングDataFrame
オブジェクトでwriteStream()
メソッドを呼び出して、DataStreamWriter
オブジェクトを作成します。format()
メソッドを使用して形式console
を指定します。ストリームを開始するには、
DataStreamWriter
インスタンスでstart()
メソッドを呼び出します。
新しいデータが MongoDB に挿入されると、MongoDB は指定されたoutputMode
に従ってそのデータをコンソールにストリーミングします。
重要
大規模なデータセットをコンソールにストリーミングしないでください。 コンソールへのストリーミングはメモリを集中的に消費するため、テスト目的のみで使用します。
// create a local SparkSession val spark = SparkSession.builder .appName("readExample") .master("spark://spark-master:<port>") .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") .getOrCreate() // define the schema of the source collection val readSchema = StructType() .add("company_symbol", StringType()) .add("company_name", StringType()) .add("price", DoubleType()) .add("tx_time", TimestampType()) // define a streaming query val dataStreamWriter = spark.readStream .format("mongodb") .option("spark.mongodb.connection.uri", <mongodb-connection-string>) .option("spark.mongodb.database", <database-name>) .option("spark.mongodb.collection", <collection-name>) .schema(readSchema) .load() // manipulate your streaming data .writeStream .format("console") .trigger(Trigger.Continuous("1 second")) .outputMode("append") // run the query val query = dataStreamWriter.start()
重要
変更ストリームのスキーマの推論
change.stream.publish.full.document.only
オプションをtrue
に設定すると、Spark Connector はスキャンされたドキュメントのスキーマを使用してDataFrame
のスキーマを推論します。 オプションをfalse
に設定する場合は、スキーマを指定する必要があります。
この設定の詳細と、変更ストリーム構成オプションの完全なリストについては、読み取り構成オプションのガイドを参照してください。
API ドキュメント
これらの例で使用されている型の詳細については、次の Apache Spark API ドキュメントを参照してください。