Spark Connector のスタートガイド
前提条件
MongoDB および Apache Spark に関する基礎知識 MongoDB のドキュメント 、 Spark のドキュメント を参照してください 、詳細についてはこの MongoDB ホワイトペーパー を参照してください。
MongoDB バージョン4.0以降
Spark バージョン3.1から3.5
Java 8以降
スタートガイド
重要
Connector のバージョン10.0.0以降では、MongoDB からの読み取りと MongoDB への書込みには、 mongodb
という形式を使用します。
df = spark.read.format("mongodb").load()
依存関係マネジメント
Spark Core、Spark SQL、MongoDB Spark Connector の依存関係を依存関係管理ツールに指定します。
バージョン3.2.0以降、 Apache Spark は Scala 2.12と2.13の両方をサポートしています。 Spark 3.1.3およびそれ以前のバージョンは、Scala 2のみをサポートしています。 12 。 両方のScalaバージョンをサポートするには、 Spark Connectorのバージョン 10.4.0 では 2 つのアーティファクトが生成されます。
org.mongodb.spark:mongo-spark-connector_2.12:10.4.0
は Scala 2.12に対してコンパイルされ、Spark 3.1 .x 以降をサポートしています。org.mongodb.spark:mongo-spark-connector_2.13:10.4.0
は Scala 2.13に対してコンパイルされ、Spark 3.2 .x 以降をサポートしています。
重要
Scala および Spark のバージョンと互換性のある Spark Connector アーティファクトを使用します。
Maven pom.xml
ファイルからの次の抽出は、Scala 2.12と互換性のある依存関係を含める方法を示しています。
<dependencies> <dependency> <groupId>org.mongodb.spark</groupId> <artifactId>mongo-spark-connector_2.12</artifactId> <version>10.4.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.3.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.3.1</version> </dependency> </dependencies>
構成
SparkSession
経由で Connector 構成を指定する場合、 設定の前に適切に設定する必要があります。 詳細とその他の利用可能な MongoDB Spark Connector オプションについては、 のSpark 構成ガイドをご覧ください。
package com.mongodb.spark_examples; import org.apache.spark.sql.SparkSession; public final class GettingStarted { public static void main(final String[] args) throws InterruptedException { /* Create the SparkSession. * If config arguments are passed from the command line using --conf, * parse args for the values to set. */ SparkSession spark = SparkSession.builder() .master("local") .appName("MongoSparkConnectorIntro") .config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/test.myCollection") .config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/test.myCollection") .getOrCreate(); // Application logic } }
spark.mongodb.read.connection.uri
では、MongoDB サーバーのアドレス(127.0.0.1
)、接続するデータベース(test
)、データを読み取るコレクション(myCollection
)と読み込み設定(read preference)を指定します。spark.mongodb.write.connection.uri
は、MongoDB サーバーアドレス(127.0.0.1
)、接続するデータベース(test
)、データを書き込むコレクション(myCollection
)を指定します。
SparkSession
オブジェクトを使用して、MongoDB へのデータの書き込み、MongoDB からのデータの読み取り、データセットの作成、SQL 操作の実行を行えます。
重要
Connector のバージョン10.0.0以降では、MongoDB からの読み取りと MongoDB への書込みに形式mongodb
を使用します。
df = spark.read.format("mongodb").load()
Python Spark shell
このチュートリアルではpyspark
shell を使用しますが、このコードは自己完結型の Python アプリケーションでも動作します。
pyspark
shell を起動するときに、以下を指定できます。
MongoDB Spark Connector パッケージをダウンロードするには、
--packages
オプションを使用します。 次のパッケージが利用できます。mongo-spark-connector
MongoDB Spark Connector を構成するには、
--conf
オプションを使用します。 これらの設定はSparkConf
オブジェクトを構成します。注意
SparkConf
を使用してSpark Connectorを構成する場合は、 の前に適切に設定する必要があります。 詳細とその他の使用可能な MongoDB Spark Connector オプションについては、「 Spark の構成」ガイドを参照してください。
次の例では、コマンドラインからpyspark
shell を起動します。
./bin/pyspark --conf "spark.mongodb.read.connection.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred" \ --conf "spark.mongodb.write.connection.uri=mongodb://127.0.0.1/test.myCollection" \ --packages org.mongodb.spark:mongo-spark-connector_2.12:10.4.0
spark.mongodb.read.connection.uri
では、MongoDB サーバーのアドレス(127.0.0.1
)、接続するデータベース(test
)、データを読み取るコレクション(myCollection
)と読み込み設定(read preference)を指定します。spark.mongodb.write.connection.uri
は、MongoDB サーバーアドレス(127.0.0.1
)、接続するデータベース(test
)、データを書き込むコレクション(myCollection
)を指定します。 デフォルトでポート27017
に接続します。packages
オプションは、Spark Connector の Maven 座標をgroupId:artifactId:version
形式で指定します。
このチュートリアルの例では、このデータベースとコレクションを使用します。
SparkSession
オブジェクトの作成
注意
pyspark
を起動すると、デフォルトでspark
と呼ばれるSparkSession
オブジェクトが生成されます。 スタンドアロンの Python アプリケーションでは、以下に示すようにSparkSession
オブジェクトを明示的に作成する必要があります。
pyspark
の起動時にspark.mongodb.read.connection.uri
とspark.mongodb.write.connection.uri
構成オプションを指定した場合、デフォルトのSparkSession
オブジェクトはそれらを使用します。 pyspark
内から独自のSparkSession
オブジェクトを作成する場合は、 SparkSession.builder
を使用して別の構成オプションを指定できます。
from pyspark.sql import SparkSession my_spark = SparkSession \ .builder \ .appName("myApp") \ .config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/test.coll") \ .config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/test.coll") \ .getOrCreate()
SparkSession
オブジェクトを使用して、MongoDB へのデータの書き込み、MongoDB からのデータの読み取り、DataFrames の作成、SQL 操作の実行を行えます。
重要
Connector のバージョン10.0.0以降では、MongoDB からの読み取りと MongoDB への書込みに形式mongodb
を使用します。
df = spark.read.format("mongodb").load()
Spark shell
Spark shell を起動するときに、以下を指定します。
MongoDB Spark Connector パッケージをダウンロードするには、
--packages
オプションを使用します。 次のパッケージが利用できます。mongo-spark-connector
MongoDB Spark Connector を構成するには、
--conf
オプションを使用します。 これらの設定はSparkConf
オブジェクトを構成します。注意
SparkConf
を使用してSpark Connectorを構成する場合は、 の前に適切に設定する必要があります。 詳細とその他の使用可能な MongoDB Spark Connector オプションについては、「 Spark の構成」ガイドを参照してください。
たとえば、
./bin/spark-shell --conf "spark.mongodb.read.connection.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred" \ --conf "spark.mongodb.write.connection.uri=mongodb://127.0.0.1/test.myCollection" \ --packages org.mongodb.spark:mongo-spark-connector_2.12:10.4.0
spark.mongodb.read.connection.uri
では、MongoDB サーバーのアドレス(127.0.0.1
)、接続するデータベース(test
)、データを読み取るコレクション(myCollection
)と読み込み設定(read preference)を指定します。spark.mongodb.write.connection.uri
は、MongoDB サーバーアドレス(127.0.0.1
)、接続するデータベース(test
)、データを書き込むコレクション(myCollection
)を指定します。 デフォルトでポート27017
に接続します。packages
オプションは、Spark Connector の Maven 座標をgroupId:artifactId:version
形式で指定します。
MongoDB Connector パッケージのインポート
Spark shell に次のパッケージをインポートして、 オブジェクトと オブジェクトで MongoDB Connector 固有の関数と暗黙的な機能を有効にします。SparkSession
Dataset
import com.mongodb.spark._
MongoDB に接続する
MongoDB への接続は、Dataset アクションで MongoDB からの読み取りまたは MongoDB への書き込みが必要な場合に、自動的に行われます。
自己完結型の Scala アプリケーション
依存関係マネジメント
Spark Core、Spark SQL、MongoDB Spark Connector の依存関係を依存関係管理ツールに指定します。
次の図は、これらの依存関係を SFTbuild.scala
に含める方法を示しています。 ファイル:
scalaVersion := "2.12", libraryDependencies ++= Seq( "org.mongodb.spark" %% "mongo-spark-connector_2.12" % "10.4.0", "org.apache.spark" %% "spark-core" % "3.3.1", "org.apache.spark" %% "spark-sql" % "3.3.1" )
構成
SparkSession
経由で Connector 構成を指定する場合、 設定の前に適切に設定する必要があります。 詳細とその他の使用可能な MongoDB Spark Connector オプションについては、「 Spark の構成」ガイドを参照してください。
package com.mongodb object GettingStarted { def main(args: Array[String]): Unit = { /* Create the SparkSession. * If config arguments are passed from the command line using --conf, * parse args for the values to set. */ import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .master("local") .appName("MongoSparkConnectorIntro") .config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/test.myCollection") .config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/test.myCollection") .getOrCreate() } }
トラブルシューティング
java.net.BindException: Can't assign requested address
を取得した場合、
別の Spark shell がすでに実行されていないことを確認してください。
SPARK_LOCAL_IP
環境変数を設定してみてください。例:export SPARK_LOCAL_IP=127.0.0.1 Spark shell を起動するときに、次のオプションを含めてみてください。
--driver-java-options "-Djava.net.preferIPv4Stack=true"
このチュートリアルの例の実行中にエラーが発生した場合は、ローカル Ividy キャッシュ( ~/.ivy2/cache/org.mongodb.spark
と~/.ivy2/jars
)をクリアする必要がある場合があります。