Docs Menu
Docs Home
/
Spark Connector

Spark Connector のスタートガイド

項目一覧

  • 前提条件
  • スタートガイド
  • Tutorials
  • MongoDB および Apache Spark に関する基礎知識 MongoDB のドキュメント Spark のドキュメント を参照してください 、詳細についてはこの MongoDB ホワイトペーパー を参照してください。

  • MongoDB バージョン4.0以降

  • Spark バージョン3.1から3.5

  • Java 8以降

重要

Connector のバージョン10.0.0以降では、MongoDB からの読み取りと MongoDB への書込みには、 mongodbという形式を使用します。

df = spark.read.format("mongodb").load()

Spark Core、Spark SQL、MongoDB Spark Connector の依存関係を依存関係管理ツールに指定します。

バージョン3.2.0以降、 Apache Spark は Scala 2.12と2.13の両方をサポートしています。 Spark 3.1.3およびそれ以前のバージョンは、Scala 2のみをサポートしています。 12 。 両方のScalaバージョンをサポートするには、 Spark Connectorのバージョン 10.4.0 では 2 つのアーティファクトが生成されます。

  • org.mongodb.spark:mongo-spark-connector_2.12:10.4.0 は Scala 2.12に対してコンパイルされ、Spark 3.1 .x 以降をサポートしています。

  • org.mongodb.spark:mongo-spark-connector_2.13:10.4.0 は Scala 2.13に対してコンパイルされ、Spark 3.2 .x 以降をサポートしています。

重要

Scala および Spark のバージョンと互換性のある Spark Connector アーティファクトを使用します。

Maven pom.xmlファイルからの次の抽出は、Scala 2.12と互換性のある依存関係を含める方法を示しています。

<dependencies>
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.12</artifactId>
<version>10.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.3.1</version>
</dependency>
</dependencies>

SparkSession経由で Connector 構成を指定する場合、 設定の前に適切に設定する必要があります。 詳細とその他の利用可能な MongoDB Spark Connector オプションについては、 Spark 構成ガイドをご覧ください。

package com.mongodb.spark_examples;
import org.apache.spark.sql.SparkSession;
public final class GettingStarted {
public static void main(final String[] args) throws InterruptedException {
/* Create the SparkSession.
* If config arguments are passed from the command line using --conf,
* parse args for the values to set.
*/
SparkSession spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/test.myCollection")
.config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/test.myCollection")
.getOrCreate();
// Application logic
}
}
  • spark.mongodb.read.connection.uriでは、MongoDB サーバーのアドレス( 127.0.0.1 )、接続するデータベース( test )、データを読み取るコレクション( myCollection )と読み込み設定(read preference)を指定します。

  • spark.mongodb.write.connection.uriは、MongoDB サーバーアドレス( 127.0.0.1 )、接続するデータベース( test )、データを書き込むコレクション( myCollection )を指定します。

SparkSessionオブジェクトを使用して、MongoDB へのデータの書き込み、MongoDB からのデータの読み取り、データセットの作成、SQL 操作の実行を行えます。

重要

Connector のバージョン10.0.0以降では、MongoDB からの読み取りと MongoDB への書込みに形式mongodbを使用します。

df = spark.read.format("mongodb").load()

このチュートリアルではpyspark shell を使用しますが、このコードは自己完結型の Python アプリケーションでも動作します。

pyspark shell を起動するときに、以下を指定できます。

  • MongoDB Spark Connector パッケージをダウンロードするには、 --packagesオプションを使用します。 次のパッケージが利用できます。

    • mongo-spark-connector

  • MongoDB Spark Connector を構成するには、 --confオプションを使用します。 これらの設定はSparkConfオブジェクトを構成します。

    注意

    SparkConf を使用してSpark Connectorを構成する場合は、 の前に適切に設定する必要があります。 詳細とその他の使用可能な MongoDB Spark Connector オプションについては、「 Spark の構成」ガイドを参照してください。

次の例では、コマンドラインからpyspark shell を起動します。

./bin/pyspark --conf "spark.mongodb.read.connection.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred" \
--conf "spark.mongodb.write.connection.uri=mongodb://127.0.0.1/test.myCollection" \
--packages org.mongodb.spark:mongo-spark-connector_2.12:10.4.0
  • spark.mongodb.read.connection.uriでは、MongoDB サーバーのアドレス( 127.0.0.1 )、接続するデータベース( test )、データを読み取るコレクション( myCollection )と読み込み設定(read preference)を指定します。

  • spark.mongodb.write.connection.uriは、MongoDB サーバーアドレス( 127.0.0.1 )、接続するデータベース( test )、データを書き込むコレクション( myCollection )を指定します。 デフォルトでポート27017に接続します。

  • packagesオプションは、Spark Connector の Maven 座標をgroupId:artifactId:version形式で指定します。

このチュートリアルの例では、このデータベースとコレクションを使用します。

注意

pysparkを起動すると、デフォルトでsparkと呼ばれるSparkSessionオブジェクトが生成されます。 スタンドアロンの Python アプリケーションでは、以下に示すようにSparkSessionオブジェクトを明示的に作成する必要があります。

pysparkの起動時にspark.mongodb.read.connection.urispark.mongodb.write.connection.uri構成オプションを指定した場合、デフォルトのSparkSessionオブジェクトはそれらを使用します。 pyspark内から独自のSparkSessionオブジェクトを作成する場合は、 SparkSession.builderを使用して別の構成オプションを指定できます。

from pyspark.sql import SparkSession
my_spark = SparkSession \
.builder \
.appName("myApp") \
.config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/test.coll") \
.config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/test.coll") \
.getOrCreate()

SparkSessionオブジェクトを使用して、MongoDB へのデータの書き込み、MongoDB からのデータの読み取り、DataFrames の作成、SQL 操作の実行を行えます。

重要

Connector のバージョン10.0.0以降では、MongoDB からの読み取りと MongoDB への書込みに形式mongodbを使用します。

df = spark.read.format("mongodb").load()

Spark shell を起動するときに、以下を指定します。

  • MongoDB Spark Connector パッケージをダウンロードするには、 --packagesオプションを使用します。 次のパッケージが利用できます。

    • mongo-spark-connector

  • MongoDB Spark Connector を構成するには、 --confオプションを使用します。 これらの設定はSparkConfオブジェクトを構成します。

    注意

    SparkConf を使用してSpark Connectorを構成する場合は、 の前に適切に設定する必要があります。 詳細とその他の使用可能な MongoDB Spark Connector オプションについては、「 Spark の構成」ガイドを参照してください。

たとえば、

./bin/spark-shell --conf "spark.mongodb.read.connection.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred" \
--conf "spark.mongodb.write.connection.uri=mongodb://127.0.0.1/test.myCollection" \
--packages org.mongodb.spark:mongo-spark-connector_2.12:10.4.0
  • spark.mongodb.read.connection.uriでは、MongoDB サーバーのアドレス( 127.0.0.1 )、接続するデータベース( test )、データを読み取るコレクション( myCollection )と読み込み設定(read preference)を指定します。

  • spark.mongodb.write.connection.uriは、MongoDB サーバーアドレス( 127.0.0.1 )、接続するデータベース( test )、データを書き込むコレクション( myCollection )を指定します。 デフォルトでポート27017に接続します。

  • packagesオプションは、Spark Connector の Maven 座標をgroupId:artifactId:version形式で指定します。

Spark shell に次のパッケージをインポートして、 オブジェクトと オブジェクトで MongoDB Connector 固有の関数と暗黙的な機能を有効にします。SparkSessionDataset

import com.mongodb.spark._

MongoDB への接続は、Dataset アクションで MongoDB からの読み取りまたは MongoDB への書き込みが必要な場合に、自動的に行われます。

Spark Core、Spark SQL、MongoDB Spark Connector の依存関係を依存関係管理ツールに指定します。

次の図は、これらの依存関係を SFTbuild.scala に含める方法を示しています。 ファイル:

scalaVersion := "2.12",
libraryDependencies ++= Seq(
"org.mongodb.spark" %% "mongo-spark-connector_2.12" % "10.4.0",
"org.apache.spark" %% "spark-core" % "3.3.1",
"org.apache.spark" %% "spark-sql" % "3.3.1"
)

SparkSession経由で Connector 構成を指定する場合、 設定の前に適切に設定する必要があります。 詳細とその他の使用可能な MongoDB Spark Connector オプションについては、「 Spark の構成」ガイドを参照してください。

package com.mongodb
object GettingStarted {
def main(args: Array[String]): Unit = {
/* Create the SparkSession.
* If config arguments are passed from the command line using --conf,
* parse args for the values to set.
*/
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/test.myCollection")
.config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/test.myCollection")
.getOrCreate()
}
}

java.net.BindException: Can't assign requested addressを取得した場合、

  • 別の Spark shell がすでに実行されていないことを確認してください。

  • SPARK_LOCAL_IP環境変数を設定してみてください。例:

    export SPARK_LOCAL_IP=127.0.0.1
  • Spark shell を起動するときに、次のオプションを含めてみてください。

    --driver-java-options "-Djava.net.preferIPv4Stack=true"

このチュートリアルの例の実行中にエラーが発生した場合は、ローカル Ividy キャッシュ( ~/.ivy2/cache/org.mongodb.spark~/.ivy2/jars )をクリアする必要がある場合があります。

戻る

MongoDB Connector for Spark