Docs Menu
Docs Home
/
Spark Connector
/

バッチ モードでの MongoDB からの読み取り

項目一覧

  • Overview
  • スキーマ推論
  • フィルター
  • SQL クエリ
  • API ドキュメント

MongoDB からデータを読み取るには、 SparkSessionオブジェクトで read()メソッドを呼び出します。 このメソッドはDataFrameReaderオブジェクトを返します。このオブジェクトを使用して、バッチ読み取り操作の形式やその他の構成設定を指定できます。

MongoDB から読み取るには、次の構成設定を指定する必要があります。

設定
説明
dataFrame.read.format()
基礎となる入力データソースの形式を指定します。 MongoDB から読み取るには、 mongodbを使用します。
dataFrame.read.option()

option メソッドを使用して、 MongoDB配置 接続string 、 MongoDBデータベースとコレクション、パーティショニング構成などのバッチ読み取り設定を構成します。

バッチ読み取り構成オプションのリストについては、 バッチ読み取り構成オプションのガイドを参照してください。

次のコード例は、前の構成設定を使用して MongoDB のpeople.contactsからデータを読み取る方法を示しています。

Dataset<Row> dataFrame = spark.read()
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load();

Tip

DataFrame 型

DataFrame は、Java API のクラスとして存在しません。 DataFrame を参照するには、 Dataset<Row>を使用します。

MongoDB からデータを読み取るには、 SparkSessionオブジェクトでread関数を呼び出します。 この関数はDataFrameReaderオブジェクトを返します。このオブジェクトを使用して、バッチ読み取り操作の形式やその他の構成設定を指定できます。

MongoDB から読み取るには、次の構成設定を指定する必要があります。

設定
説明
dataFrame.read.format()
基礎となる入力データソースの形式を指定します。 MongoDB から読み取るには、 mongodbを使用します。
dataFrame.read.option()

option メソッドを使用して、 MongoDB配置接続string 、 MongoDBデータベースとコレクション、パーティショニング構成などのバッチ読み取り設定を構成します。

バッチ読み取り構成オプションのリストについては、バッチ読み取り構成オプションのガイドを参照してください。

次のコード例は、前の構成設定を使用して MongoDB のpeople.contactsからデータを読み取る方法を示しています。

dataFrame = spark.read
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load()

MongoDB からデータを読み取るには、 SparkSessionオブジェクトでreadメソッドを呼び出します。 このメソッドはDataFrameReaderオブジェクトを返します。このオブジェクトを使用して、バッチ読み取り操作の形式やその他の構成設定を指定できます。

MongoDB から読み取るには、次の構成設定を指定する必要があります。

設定
説明
dataFrame.read.format()
基礎となる入力データソースの形式を指定します。 MongoDB から読み取るには、 mongodbを使用します。
dataFrame.read.option()

option メソッドを使用して、 MongoDB配置接続string 、 MongoDBデータベースとコレクション、パーティショニング構成などのバッチ読み取り設定を構成します。

バッチ読み取り構成オプションのリストについては、バッチ読み取り構成オプションのガイドを参照してください。

次のコード例は、前の構成設定を使用して MongoDB のpeople.contactsからデータを読み取る方法を示しています。

val dataFrame = spark.read
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load()

Tip

DataFrame 型

Data Frame は、 RowオブジェクトのDatasetによって表されます。 DataFrame型はDataset[Row]のエイリアスです。

スキーマなしで Dataset または DataFrame をロードすると、Spark はレコードをサンプリングしてコレクションのスキーマを推測します。

MongoDB コレクションpeople.contactsに次のドキュメントが含まれているとします。

{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 }
{ "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 }
{ "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 }
{ "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 }
{ "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 }
{ "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 }
{ "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 }
{ "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 }
{ "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 }
{ "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }

次の操作は、 people.contactsからデータを読み込み、DataFrame のスキーマを推論します。

Dataset<Row> dataFrame = spark.read()
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load();

推論されたスキーマを確認するには、次の例に示すように、 Dataset<Row>オブジェクトでprintSchema()メソッドを使用します。

dataFrame.printSchema();
root
|-- _id: struct (nullable = true)
| |-- oid: string (nullable = true)
|-- age: integer (nullable = true)
|-- name: string (nullable = true)

DataFrame のデータを表示するには、次の例に示すように、 DataFrameオブジェクトでshow()メソッドを使用します。

dataFrame.show();
+--------------------+----+-------------+
| _id| age| name|
+--------------------+----+-------------+
|[585024d558bef808...| 50|Bilbo Baggins|
|[585024d558bef808...|1000| Gandalf|
|[585024d558bef808...| 195| Thorin|
|[585024d558bef808...| 178| Balin|
|[585024d558bef808...| 77| Kíli|
|[585024d558bef808...| 169| Dwalin|
|[585024d558bef808...| 167| Óin|
|[585024d558bef808...| 158| Glóin|
|[585024d558bef808...| 82| Fíli|
|[585024d558bef808...|null| Bombur|
+--------------------+----+-------------+

スキーマなしで Dataset または DataFrame をロードすると、Spark はレコードをサンプリングしてコレクションのスキーマを推測します。

MongoDB コレクションpeople.contactsに次のドキュメントが含まれているとします。

{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 }
{ "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 }
{ "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 }
{ "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 }
{ "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 }
{ "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 }
{ "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 }
{ "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 }
{ "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 }
{ "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }

次の操作は、 people.contactsからデータを読み込み、DataFrame のスキーマを推論します。

dataFrame = spark.read
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load()

推論されたスキーマを確認するには、次の例に示すように、 DataFrameオブジェクトでprintSchema()関数を使用します。

dataFrame.printSchema()
root
|-- _id: struct (nullable = true)
| |-- oid: string (nullable = true)
|-- age: integer (nullable = true)
|-- name: string (nullable = true)

DataFrame のデータを表示するには、次の例に示すように、 DataFrameオブジェクトでshow()関数を使用します。

dataFrame.show()
+--------------------+----+-------------+
| _id| age| name|
+--------------------+----+-------------+
|[585024d558bef808...| 50|Bilbo Baggins|
|[585024d558bef808...|1000| Gandalf|
|[585024d558bef808...| 195| Thorin|
|[585024d558bef808...| 178| Balin|
|[585024d558bef808...| 77| Kíli|
|[585024d558bef808...| 169| Dwalin|
|[585024d558bef808...| 167| Óin|
|[585024d558bef808...| 158| Glóin|
|[585024d558bef808...| 82| Fíli|
|[585024d558bef808...|null| Bombur|
+--------------------+----+-------------+

スキーマなしで Dataset または DataFrame をロードすると、Spark はレコードをサンプリングしてコレクションのスキーマを推測します。

MongoDB コレクションpeople.contactsに次のドキュメントが含まれているとします。

{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 }
{ "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 }
{ "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 }
{ "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 }
{ "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 }
{ "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 }
{ "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 }
{ "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 }
{ "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 }
{ "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }

次の操作は、 people.contactsからデータを読み込み、DataFrame のスキーマを推論します。

val dataFrame = spark.read()
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load()

推論されたスキーマを確認するには、次の例に示すように、 DataFrameオブジェクトでprintSchema()メソッドを使用します。

dataFrame.printSchema()
root
|-- _id: struct (nullable = true)
| |-- oid: string (nullable = true)
|-- age: integer (nullable = true)
|-- name: string (nullable = true)

DataFrame のデータを表示するには、次の例に示すように、 DataFrameオブジェクトでshow()メソッドを使用します。

dataFrame.show()
+--------------------+----+-------------+
| _id| age| name|
+--------------------+----+-------------+
|[585024d558bef808...| 50|Bilbo Baggins|
|[585024d558bef808...|1000| Gandalf|
|[585024d558bef808...| 195| Thorin|
|[585024d558bef808...| 178| Balin|
|[585024d558bef808...| 77| Kíli|
|[585024d558bef808...| 169| Dwalin|
|[585024d558bef808...| 167| Óin|
|[585024d558bef808...| 158| Glóin|
|[585024d558bef808...| 82| Fíli|
|[585024d558bef808...|null| Bombur|
+--------------------+----+-------------+

schemaHint構成オプションを指定することで、スキーマ推論中に使用する既知のフィールド値を含むスキーマを指定できます。 次の Spark 形式のいずれかでschemaHintオプションを指定できます。

タイプ
形式
DDL
<field one name> <FIELD ONE TYPE>, <field two name> <FIELD TWO TYPE>
SQL DDL
STRUCT<<field one name>: <FIELD ONE TYPE>, <field two name>: <FIELD TWO TYPE>
JSON
{ "type": "struct", "fields": [
{ "name": "<field name>", "type": "<field type>", "nullable": <true/false> },
{ "name": "<field name>", "type": "<field type>", "nullable": <true/false> }]}

次の例は、Spark shell を使用して、各形式でschemaHintオプションを指定する方法を示しています。 この例では、 "value"という名前の string 値のフィールドと、 "count"という名前の整数値のフィールドを指定しています。

import org.apache.spark.sql.types._
val mySchema = StructType(Seq(
StructField("value", StringType),
StructField("count", IntegerType))
// Generate DDL format
mySchema.toDDL
// Generate SQL DDL format
mySchema.sql
// Generate Simple String DDL format
mySchema.simpleString
// Generate JSON format
mySchema.json

次の例に示すように、PySpark を使用して Simple string DDL 形式またはJSON形式で schemaHint オプションを指定することもできます。

from pyspark.sql.types import StructType, StructField, StringType, IntegerType
mySchema = StructType([
StructField('value', StringType(), True),
StructField('count', IntegerType(), True)])
# Generate Simple String DDL format
mySchema.simpleString()
# Generate JSON format
mySchema.json()

DataFrames または Datasets でフィルターを使用する場合、基礎の MongoDB Connector コードは集計パイプラインを構築し、MongoDB のデータを Spark に送信する前にフィルタリングします。 これにより、必要なデータのみを取得して処理することで、Spark のパフォーマンスが向上します。

MongoDB Spark Connector は、次のフィルターを集計パイプライン ステージに変換します。

  • および

  • EqualNullセーフ

  • EqualTo

  • greaterThan

  • greaterThanOrEqual

  • IsNull

  • 未満

  • LegsThanOrEqual

  • ではない

  • または

  • StringContains

  • StringEndsWith

  • StringStartsWith

MongoDB コレクションからデータのサブセットを読み取るには、 filter()を使用します。

次のドキュメントを含むfruitという名前のコレクションについて考えてみます。

{ "_id" : 1, "type" : "apple", "qty" : 5 }
{ "_id" : 2, "type" : "orange", "qty" : 10 }
{ "_id" : 3, "type" : "banana", "qty" : 15 }

まず、デフォルトの MongoDB データソースに接続するためのDataFrameオブジェクトを設定します。

df = spark.read.format("mongodb").load()

次の例には、 qtyフィールドが10以上であるレコードのみが含まれます。

df.filter(df['qty'] >= 10).show()

この操作では、次の出力が印刷されます。

+---+----+------+
|_id| qty| type|
+---+----+------+
|2.0|10.0|orange|
|3.0|15.0|banana|
+---+----+------+

DataFrames または Datasets でフィルターを使用する場合、基礎の MongoDB Connector コードは集計パイプラインを構築し、MongoDB のデータを Spark に送信する前にフィルタリングします。 これにより、必要なデータのみを取得して処理することで、Spark のパフォーマンスが向上します。

MongoDB Spark Connector は、次のフィルターを集計パイプライン ステージに変換します。

  • および

  • EqualNullセーフ

  • EqualTo

  • greaterThan

  • greaterThanOrEqual

  • IsNull

  • 未満

  • LegsThanOrEqual

  • ではない

  • または

  • StringContains

  • StringEndsWith

  • StringStartsWith

次の例では、期限が100の下の文字をフィルタリングして出力します。

df.filter(df("age") < 100).show()

この操作では、以下の結果が出力されます。

+--------------------+---+-------------+
| _id|age| name|
+--------------------+---+-------------+
|[5755d7b4566878c9...| 50|Bilbo Baggins|
|[5755d7b4566878c9...| 82| Fíli|
|[5755d7b4566878c9...| 77| Kíli|
+--------------------+---+-------------+

データセットで SQL クエリを実行する前に、データセットの一時ビューを登録する必要があります。

次の操作では、 charactersテーブルを登録し、クエリを実行して 100 以上の文字をすべて検索します。

implicitDS.createOrReplaceTempView("characters");
Dataset<Row> centenarians = spark.sql("SELECT name, age FROM characters WHERE age >= 100");
centenarians.show();

centenarians.show() は、以下を出力します。

+-------+----+
| name| age|
+-------+----+
|Gandalf|1000|
| Thorin| 195|
| Balin| 178|
| Dwalin| 169|
| Óin| 167|
| Glóin| 158|
+-------+----+

DataFrame に対して SQL クエリを実行する前に、一時テーブルを登録する必要があります。

次の例では、 tempという一時テーブルを登録し、SQL を使用してtypeフィールドに文字eが含まれるレコードをクエリします。

df.createOrReplaceTempView("temp")
some_fruit = spark.sql("SELECT type, qty FROM temp WHERE type LIKE '%e%'")
some_fruit.show()

pyspark shell では、この操作によって次の出力が印刷されます。

+------+----+
| type| qty|
+------+----+
| apple| 5.0|
|orange|10.0|
+------+----+

データセットで SQL クエリを実行する前に、データセットの一時ビューを登録する必要があります。

次の操作では、 charactersテーブルを登録し、クエリを実行して 100 以上の文字をすべて検索します。

val characters = spark.read.format("mongodb").as[Character]
characters.createOrReplaceTempView("characters")
val centenarians = spark.sql("SELECT name, age FROM characters WHERE age >= 100")
centenarians.show()

これらの例で使用されている型の詳細については、次の Apache Spark API ドキュメントを参照してください。

戻る

バッチ モード