バッチ モードでの MongoDB からの読み取り
Overview
MongoDB からデータを読み取るには、 SparkSession
オブジェクトで read()
メソッドを呼び出します。 このメソッドはDataFrameReader
オブジェクトを返します。このオブジェクトを使用して、バッチ読み取り操作の形式やその他の構成設定を指定できます。
MongoDB から読み取るには、次の構成設定を指定する必要があります。
設定 | 説明 |
---|---|
dataFrame.read.format() | 基礎となる入力データソースの形式を指定します。 MongoDB から読み取るには、 mongodb を使用します。 |
dataFrame.read.option() |
バッチ読み取り構成オプションのリストについては、 バッチ読み取り構成オプションのガイドを参照してください。 |
次のコード例は、前の構成設定を使用して MongoDB のpeople.contacts
からデータを読み取る方法を示しています。
Dataset<Row> dataFrame = spark.read() .format("mongodb") .option("database", "people") .option("collection", "contacts") .load();
Tip
DataFrame 型
DataFrame
は、Java API のクラスとして存在しません。 DataFrame を参照するには、 Dataset<Row>
を使用します。
MongoDB からデータを読み取るには、 SparkSession
オブジェクトでread
関数を呼び出します。 この関数はDataFrameReader
オブジェクトを返します。このオブジェクトを使用して、バッチ読み取り操作の形式やその他の構成設定を指定できます。
MongoDB から読み取るには、次の構成設定を指定する必要があります。
設定 | 説明 |
---|---|
dataFrame.read.format() | 基礎となる入力データソースの形式を指定します。 MongoDB から読み取るには、 mongodb を使用します。 |
dataFrame.read.option() |
バッチ読み取り構成オプションのリストについては、バッチ読み取り構成オプションのガイドを参照してください。 |
次のコード例は、前の構成設定を使用して MongoDB のpeople.contacts
からデータを読み取る方法を示しています。
dataFrame = spark.read .format("mongodb") .option("database", "people") .option("collection", "contacts") .load()
MongoDB からデータを読み取るには、 SparkSession
オブジェクトでread
メソッドを呼び出します。 このメソッドはDataFrameReader
オブジェクトを返します。このオブジェクトを使用して、バッチ読み取り操作の形式やその他の構成設定を指定できます。
MongoDB から読み取るには、次の構成設定を指定する必要があります。
設定 | 説明 |
---|---|
dataFrame.read.format() | 基礎となる入力データソースの形式を指定します。 MongoDB から読み取るには、 mongodb を使用します。 |
dataFrame.read.option() |
バッチ読み取り構成オプションのリストについては、バッチ読み取り構成オプションのガイドを参照してください。 |
次のコード例は、前の構成設定を使用して MongoDB のpeople.contacts
からデータを読み取る方法を示しています。
val dataFrame = spark.read .format("mongodb") .option("database", "people") .option("collection", "contacts") .load()
Tip
DataFrame 型
Data Frame は、 Row
オブジェクトのDataset
によって表されます。 DataFrame
型はDataset[Row]
のエイリアスです。
スキーマ推論
スキーマなしで Dataset または DataFrame をロードすると、Spark はレコードをサンプリングしてコレクションのスキーマを推測します。
MongoDB コレクションpeople.contacts
に次のドキュメントが含まれているとします。
{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 } { "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 } { "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 } { "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 } { "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 } { "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 } { "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 } { "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 } { "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 } { "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }
次の操作は、 people.contacts
からデータを読み込み、DataFrame のスキーマを推論します。
Dataset<Row> dataFrame = spark.read() .format("mongodb") .option("database", "people") .option("collection", "contacts") .load();
推論されたスキーマを確認するには、次の例に示すように、 Dataset<Row>
オブジェクトでprintSchema()
メソッドを使用します。
dataFrame.printSchema();
DataFrame のデータを表示するには、次の例に示すように、 DataFrame
オブジェクトでshow()
メソッドを使用します。
dataFrame.show();
スキーマなしで Dataset または DataFrame をロードすると、Spark はレコードをサンプリングしてコレクションのスキーマを推測します。
MongoDB コレクションpeople.contacts
に次のドキュメントが含まれているとします。
{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 } { "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 } { "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 } { "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 } { "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 } { "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 } { "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 } { "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 } { "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 } { "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }
次の操作は、 people.contacts
からデータを読み込み、DataFrame のスキーマを推論します。
dataFrame = spark.read .format("mongodb") .option("database", "people") .option("collection", "contacts") .load()
推論されたスキーマを確認するには、次の例に示すように、 DataFrame
オブジェクトでprintSchema()
関数を使用します。
dataFrame.printSchema()
DataFrame のデータを表示するには、次の例に示すように、 DataFrame
オブジェクトでshow()
関数を使用します。
dataFrame.show()
スキーマなしで Dataset または DataFrame をロードすると、Spark はレコードをサンプリングしてコレクションのスキーマを推測します。
MongoDB コレクションpeople.contacts
に次のドキュメントが含まれているとします。
{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 } { "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 } { "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 } { "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 } { "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 } { "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 } { "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 } { "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 } { "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 } { "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }
次の操作は、 people.contacts
からデータを読み込み、DataFrame のスキーマを推論します。
val dataFrame = spark.read() .format("mongodb") .option("database", "people") .option("collection", "contacts") .load()
推論されたスキーマを確認するには、次の例に示すように、 DataFrame
オブジェクトでprintSchema()
メソッドを使用します。
dataFrame.printSchema()
DataFrame のデータを表示するには、次の例に示すように、 DataFrame
オブジェクトでshow()
メソッドを使用します。
dataFrame.show()
スキーマ ヒントで既知のフィールドを指定する
schemaHint
構成オプションを指定することで、スキーマ推論中に使用する既知のフィールド値を含むスキーマを指定できます。 次の Spark 形式のいずれかでschemaHint
オプションを指定できます。
タイプ | 形式 | |||
---|---|---|---|---|
DDL | <field one name> <FIELD ONE TYPE>, <field two name> <FIELD TWO TYPE> | |||
SQL DDL | STRUCT<<field one name>: <FIELD ONE TYPE>, <field two name>: <FIELD TWO TYPE> | |||
JSON |
|
次の例は、Spark shell を使用して、各形式でschemaHint
オプションを指定する方法を示しています。 この例では、 "value"
という名前の string 値のフィールドと、 "count"
という名前の整数値のフィールドを指定しています。
import org.apache.spark.sql.types._ val mySchema = StructType(Seq( StructField("value", StringType), StructField("count", IntegerType)) // Generate DDL format mySchema.toDDL // Generate SQL DDL format mySchema.sql // Generate Simple String DDL format mySchema.simpleString // Generate JSON format mySchema.json
次の例に示すように、PySpark を使用して Simple string DDL 形式またはJSON形式で schemaHint
オプションを指定することもできます。
from pyspark.sql.types import StructType, StructField, StringType, IntegerType mySchema = StructType([ StructField('value', StringType(), True), StructField('count', IntegerType(), True)]) # Generate Simple String DDL format mySchema.simpleString() # Generate JSON format mySchema.json()
フィルター
DataFrames または Datasets でフィルターを使用する場合、基礎の MongoDB Connector コードは集計パイプラインを構築し、MongoDB のデータを Spark に送信する前にフィルタリングします。 これにより、必要なデータのみを取得して処理することで、Spark のパフォーマンスが向上します。
MongoDB Spark Connector は、次のフィルターを集計パイプライン ステージに変換します。
および
EqualNullセーフ
EqualTo
greaterThan
greaterThanOrEqual
で
IsNull
未満
LegsThanOrEqual
ではない
または
StringContains
StringEndsWith
StringStartsWith
MongoDB コレクションからデータのサブセットを読み取るには、 filter()
を使用します。
次のドキュメントを含むfruit
という名前のコレクションについて考えてみます。
{ "_id" : 1, "type" : "apple", "qty" : 5 } { "_id" : 2, "type" : "orange", "qty" : 10 } { "_id" : 3, "type" : "banana", "qty" : 15 }
まず、デフォルトの MongoDB データソースに接続するためのDataFrame
オブジェクトを設定します。
df = spark.read.format("mongodb").load()
次の例には、 qty
フィールドが10
以上であるレコードのみが含まれます。
df.filter(df['qty'] >= 10).show()
この操作では、次の出力が印刷されます。
+---+----+------+ |_id| qty| type| +---+----+------+ |2.0|10.0|orange| |3.0|15.0|banana| +---+----+------+
DataFrames または Datasets でフィルターを使用する場合、基礎の MongoDB Connector コードは集計パイプラインを構築し、MongoDB のデータを Spark に送信する前にフィルタリングします。 これにより、必要なデータのみを取得して処理することで、Spark のパフォーマンスが向上します。
MongoDB Spark Connector は、次のフィルターを集計パイプライン ステージに変換します。
および
EqualNullセーフ
EqualTo
greaterThan
greaterThanOrEqual
で
IsNull
未満
LegsThanOrEqual
ではない
または
StringContains
StringEndsWith
StringStartsWith
次の例では、期限が100の下の文字をフィルタリングして出力します。
df.filter(df("age") < 100).show()
この操作では、以下の結果が出力されます。
+--------------------+---+-------------+ | _id|age| name| +--------------------+---+-------------+ |[5755d7b4566878c9...| 50|Bilbo Baggins| |[5755d7b4566878c9...| 82| Fíli| |[5755d7b4566878c9...| 77| Kíli| +--------------------+---+-------------+
SQL クエリ
データセットで SQL クエリを実行する前に、データセットの一時ビューを登録する必要があります。
次の操作では、 characters
テーブルを登録し、クエリを実行して 100 以上の文字をすべて検索します。
implicitDS.createOrReplaceTempView("characters"); Dataset<Row> centenarians = spark.sql("SELECT name, age FROM characters WHERE age >= 100"); centenarians.show();
centenarians.show()
は、以下を出力します。
+-------+----+ | name| age| +-------+----+ |Gandalf|1000| | Thorin| 195| | Balin| 178| | Dwalin| 169| | Óin| 167| | Glóin| 158| +-------+----+
DataFrame に対して SQL クエリを実行する前に、一時テーブルを登録する必要があります。
次の例では、 temp
という一時テーブルを登録し、SQL を使用してtype
フィールドに文字e
が含まれるレコードをクエリします。
df.createOrReplaceTempView("temp") some_fruit = spark.sql("SELECT type, qty FROM temp WHERE type LIKE '%e%'") some_fruit.show()
pyspark
shell では、この操作によって次の出力が印刷されます。
+------+----+ | type| qty| +------+----+ | apple| 5.0| |orange|10.0| +------+----+
データセットで SQL クエリを実行する前に、データセットの一時ビューを登録する必要があります。
次の操作では、 characters
テーブルを登録し、クエリを実行して 100 以上の文字をすべて検索します。
val characters = spark.read.format("mongodb").as[Character] characters.createOrReplaceTempView("characters") val centenarians = spark.sql("SELECT name, age FROM characters WHERE age >= 100") centenarians.show()
API ドキュメント
これらの例で使用されている型の詳細については、次の Apache Spark API ドキュメントを参照してください。