バッチ モードでの MongoDB からの読み取り
Overview
MongoDB からデータを読み取るには、 SparkSession
オブジェクトでread()
メソッドを呼び出します。 このメソッドはDataFrameReader
オブジェクトを返します。このオブジェクトを使用して、バッチ読み取り操作の形式やその他の構成設定を指定できます。
MongoDB から読み取るには、次の構成設定を指定する必要があります。
設定 | 説明 |
---|---|
| 基礎となる入力データソースの形式を指定します。 MongoDB から読み取るには、 |
|
バッチ読み取り構成オプションのリストについては、バッチ読み取り構成オプションのガイドを参照してください。 |
次のコード例は、前の構成設定を使用して MongoDB のpeople.contacts
からデータを読み取る方法を示しています。
Dataset<Row> dataFrame = spark.read() .format("mongodb") .option("database", "people") .option("collection", "contacts") .load();
Tip
DataFrame 型
DataFrame
は Java API にクラスとして存在しません。 DataFrame を参照するには、 Dataset<Row>
を使用します。
MongoDB からデータを読み取るには、 SparkSession
オブジェクトでread
関数を呼び出します。 この関数はDataFrameReader
オブジェクトを返します。このオブジェクトを使用して、バッチ読み取り操作の形式やその他の構成設定を指定できます。
MongoDB から読み取るには、次の構成設定を指定する必要があります。
設定 | 説明 |
---|---|
| 基礎となる入力データソースの形式を指定します。 MongoDB から読み取るには、 |
|
バッチ読み取り構成オプションのリストについては、バッチ読み取り構成オプションのガイドを参照してください。 |
次のコード例は、前の構成設定を使用して MongoDB のpeople.contacts
からデータを読み取る方法を示しています。
dataFrame = spark.read .format("mongodb") .option("database", "people") .option("collection", "contacts") .load()
MongoDB からデータを読み取るには、 SparkSession
オブジェクトでread
メソッドを呼び出します。 このメソッドはDataFrameReader
オブジェクトを返します。このオブジェクトを使用して、バッチ読み取り操作の形式やその他の構成設定を指定できます。
MongoDB から読み取るには、次の構成設定を指定する必要があります。
設定 | 説明 |
---|---|
| 基礎となる入力データソースの形式を指定します。 MongoDB から読み取るには、 |
|
バッチ読み取り構成オプションのリストについては、バッチ読み取り構成オプションのガイドを参照してください。 |
次のコード例は、前の構成設定を使用して MongoDB のpeople.contacts
からデータを読み取る方法を示しています。
val dataFrame = spark.read .format("mongodb") .option("database", "people") .option("collection", "contacts") .load()
Tip
DataFrame 型
Data Frame は、 Row
オブジェクトのDataset
によって表されます。 DataFrame
型はDataset[Row]
のエイリアスです。
スキーマ推論
スキーマなしで Dataset または DataFrame をロードすると、Spark はレコードをサンプリングしてコレクションのスキーマを推測します。
MongoDB コレクションpeople.contacts
に次のドキュメントが含まれているとします。
{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 } { "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 } { "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 } { "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 } { "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 } { "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 } { "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 } { "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 } { "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 } { "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }
次の操作は、 people.contacts
からデータを読み込み、DataFrame のスキーマを推論します。
Dataset<Row> dataFrame = spark.read() .format("mongodb") .option("database", "people") .option("collection", "contacts") .load();
推論されたスキーマを確認するには、次の例に示すように、 Dataset<Row>
オブジェクトでprintSchema()
メソッドを使用します。
dataFrame.printSchema();
root |-- _id: struct (nullable = true) | |-- oid: string (nullable = true) |-- age: integer (nullable = true) |-- name: string (nullable = true)
DataFrame のデータを表示するには、次の例に示すように、 DataFrame
オブジェクトでshow()
メソッドを使用します。
dataFrame.show();
+--------------------+----+-------------+ | _id| age| name| +--------------------+----+-------------+ |[585024d558bef808...| 50|Bilbo Baggins| |[585024d558bef808...|1000| Gandalf| |[585024d558bef808...| 195| Thorin| |[585024d558bef808...| 178| Balin| |[585024d558bef808...| 77| Kíli| |[585024d558bef808...| 169| Dwalin| |[585024d558bef808...| 167| Óin| |[585024d558bef808...| 158| Glóin| |[585024d558bef808...| 82| Fíli| |[585024d558bef808...|null| Bombur| +--------------------+----+-------------+
スキーマなしで Dataset または DataFrame をロードすると、Spark はレコードをサンプリングしてコレクションのスキーマを推測します。
MongoDB コレクションpeople.contacts
に次のドキュメントが含まれているとします。
{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 } { "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 } { "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 } { "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 } { "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 } { "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 } { "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 } { "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 } { "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 } { "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }
次の操作は、 people.contacts
からデータを読み込み、DataFrame のスキーマを推論します。
dataFrame = spark.read .format("mongodb") .option("database", "people") .option("collection", "contacts") .load()
推論されたスキーマを確認するには、次の例に示すように、 DataFrame
オブジェクトでprintSchema()
関数を使用します。
dataFrame.printSchema()
root |-- _id: struct (nullable = true) | |-- oid: string (nullable = true) |-- age: integer (nullable = true) |-- name: string (nullable = true)
DataFrame のデータを表示するには、次の例に示すように、 DataFrame
オブジェクトでshow()
関数を使用します。
dataFrame.show()
+--------------------+----+-------------+ | _id| age| name| +--------------------+----+-------------+ |[585024d558bef808...| 50|Bilbo Baggins| |[585024d558bef808...|1000| Gandalf| |[585024d558bef808...| 195| Thorin| |[585024d558bef808...| 178| Balin| |[585024d558bef808...| 77| Kíli| |[585024d558bef808...| 169| Dwalin| |[585024d558bef808...| 167| Óin| |[585024d558bef808...| 158| Glóin| |[585024d558bef808...| 82| Fíli| |[585024d558bef808...|null| Bombur| +--------------------+----+-------------+
スキーマなしで Dataset または DataFrame をロードすると、Spark はレコードをサンプリングしてコレクションのスキーマを推測します。
MongoDB コレクションpeople.contacts
に次のドキュメントが含まれているとします。
{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 } { "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 } { "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 } { "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 } { "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 } { "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 } { "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 } { "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 } { "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 } { "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }
次の操作は、 people.contacts
からデータを読み込み、DataFrame のスキーマを推論します。
val dataFrame = spark.read() .format("mongodb") .option("database", "people") .option("collection", "contacts") .load()
推論されたスキーマを確認するには、次の例に示すように、 DataFrame
オブジェクトでprintSchema()
メソッドを使用します。
dataFrame.printSchema()
root |-- _id: struct (nullable = true) | |-- oid: string (nullable = true) |-- age: integer (nullable = true) |-- name: string (nullable = true)
DataFrame のデータを表示するには、次の例に示すように、 DataFrame
オブジェクトでshow()
メソッドを使用します。
dataFrame.show()
+--------------------+----+-------------+ | _id| age| name| +--------------------+----+-------------+ |[585024d558bef808...| 50|Bilbo Baggins| |[585024d558bef808...|1000| Gandalf| |[585024d558bef808...| 195| Thorin| |[585024d558bef808...| 178| Balin| |[585024d558bef808...| 77| Kíli| |[585024d558bef808...| 169| Dwalin| |[585024d558bef808...| 167| Óin| |[585024d558bef808...| 158| Glóin| |[585024d558bef808...| 82| Fíli| |[585024d558bef808...|null| Bombur| +--------------------+----+-------------+
スキーマ ヒントで既知のフィールドを指定する
schemaHint
構成オプションを指定することで、スキーマ推論中に使用する既知のフィールド値を含むスキーマを指定できます。 次の Spark 形式のいずれかでschemaHint
オプションを指定できます。
タイプ | 形式 | |||
---|---|---|---|---|
DDL |
| |||
SQL DDL |
| |||
JSON |
|
次の例は、Spark shell を使用して、各形式でschemaHint
オプションを指定する方法を示しています。 この例では、 "value"
という名前の string 値のフィールドと、 "count"
という名前の整数値のフィールドを指定しています。
import org.apache.spark.sql.types._ val mySchema = StructType(Seq( StructField("value", StringType), StructField("count", IntegerType)) // Generate DDL format mySchema.toDDL // Generate SQL DDL format mySchema.sql // Generate Simple String DDL format mySchema.simpleString // Generate JSON format mySchema.json
次の例に示すように、PySpark を使用して Simple string DDL 形式またはJSON形式で schemaHint
オプションを指定することもできます。
from pyspark.sql.types import StructType, StructField, StringType, IntegerType mySchema = StructType([ StructField('value', StringType(), True), StructField('count', IntegerType(), True)]) # Generate Simple String DDL format mySchema.simpleString() # Generate JSON format mySchema.json()
フィルター
DataFrames または Datasets でフィルターを使用する場合、基礎の MongoDB Connector コードは集計パイプラインを構築し、MongoDB のデータを Spark に送信する前にフィルタリングします。 これにより、必要なデータのみを取得して処理することで、Spark のパフォーマンスが向上します。
MongoDB Spark Connector は、次のフィルターを集計パイプライン ステージに変換します。
および
EqualNullセーフ
EqualTo
greaterThan
greaterThanOrEqual
In
IsNull
未満
LegsThanOrEqual
ではない
または
StringContains
StringEndsWith
StringStartsWith
MongoDB コレクションからデータのサブセットを読み取るには、 filter()
を使用します。
次のドキュメントを含むfruit
という名前のコレクションについて考えてみます。
{ "_id" : 1, "type" : "apple", "qty" : 5 } { "_id" : 2, "type" : "orange", "qty" : 10 } { "_id" : 3, "type" : "banana", "qty" : 15 }
まず、デフォルトの MongoDB データソースに接続するためのDataFrame
オブジェクトを設定します。
df = spark.read.format("mongodb").load()
次の例には、 qty
フィールドが10
以上であるレコードのみが含まれます。
df.filter(df['qty'] >= 10).show()
この操作では、次の出力が印刷されます。
+---+----+------+ |_id| qty| type| +---+----+------+ |2.0|10.0|orange| |3.0|15.0|banana| +---+----+------+
DataFrames または Datasets でフィルターを使用する場合、基礎の MongoDB Connector コードは集計パイプラインを構築し、MongoDB のデータを Spark に送信する前にフィルタリングします。 これにより、必要なデータのみを取得して処理することで、Spark のパフォーマンスが向上します。
MongoDB Spark Connector は、次のフィルターを集計パイプライン ステージに変換します。
および
EqualNullセーフ
EqualTo
greaterThan
greaterThanOrEqual
In
IsNull
未満
LegsThanOrEqual
ではない
または
StringContains
StringEndsWith
StringStartsWith
次の例では、期限が100の下の文字をフィルタリングして出力します。
df.filter(df("age") < 100).show()
この操作では、以下の結果が出力されます。
+--------------------+---+-------------+ | _id|age| name| +--------------------+---+-------------+ |[5755d7b4566878c9...| 50|Bilbo Baggins| |[5755d7b4566878c9...| 82| Fíli| |[5755d7b4566878c9...| 77| Kíli| +--------------------+---+-------------+
SQL クエリ
データセットで SQL クエリを実行する前に、データセットの一時ビューを登録する必要があります。
次の操作では、 characters
テーブルを登録し、クエリを実行して 100 以上の文字をすべて検索します。
implicitDS.createOrReplaceTempView("characters"); Dataset<Row> centenarians = spark.sql("SELECT name, age FROM characters WHERE age >= 100"); centenarians.show();
centenarians.show()
は次を出力します。
+-------+----+ | name| age| +-------+----+ |Gandalf|1000| | Thorin| 195| | Balin| 178| | Dwalin| 169| | Óin| 167| | Glóin| 158| +-------+----+
DataFrame に対して SQL クエリを実行する前に、一時テーブルを登録する必要があります。
次の例では、 temp
という一時テーブルを登録し、SQL を使用してtype
フィールドに文字e
が含まれるレコードをクエリします。
df.createOrReplaceTempView("temp") some_fruit = spark.sql("SELECT type, qty FROM temp WHERE type LIKE '%e%'") some_fruit.show()
pyspark
shell では、この操作によって次の出力が印刷されます。
+------+----+ | type| qty| +------+----+ | apple| 5.0| |orange|10.0| +------+----+
データセットで SQL クエリを実行する前に、データセットの一時ビューを登録する必要があります。
次の操作では、 characters
テーブルを登録し、クエリを実行して 100 以上の文字をすべて検索します。
val characters = spark.read.format("mongodb").as[Character] characters.createOrReplaceTempView("characters") val centenarians = spark.sql("SELECT name, age FROM characters WHERE age >= 100") centenarians.show()
API ドキュメント
これらの例で使用されている型の詳細については、次の Apache Spark API ドキュメントを参照してください。