Docs 菜单
Docs 主页
/
Spark Connector

开始使用 Spark Connector

在此页面上

  • 先决条件
  • 开始体验
  • Tutorials
  • 具备 MongoDB 和 Apache Spark 的基本使用知识。 请参阅 MongoDB 文档 Spark 文档 ,并参阅此 MongoDB 白皮书 以了解更多详细信息。

  • MongoDB 4.0 或更高版本

  • Spark 版本 3.1 至 3.5

  • Java 8 或更高版本

重要

在 Connector 版本10.0.0及更高版本中,使用 mongodb格式读取和写入MongoDB:

df = spark.read.format("mongodb").load()

向依赖项管理工具提供 Spark Core、Spark SQL 和 MongoDB Spark Connector 依赖项。

从版本3.2.0开始, Apache Spark 同时支持 Scala 2.12和2.13 。 Spark 3.1.3和以前的版本仅支持 Scala 2 。 12 。 要为两个 Scala 版本提供支持,请使用10版本。 4 。 Spark Connector 的0会生成两个工件:

  • org.mongodb.spark:mongo-spark-connector_2.12:10.4.0 针对 Scala 2.12进行编译,并支持 Spark 3.1 .x 及更高版本。

  • org.mongodb.spark:mongo-spark-connector_2.13:10.4.0 针对 Scala 2.13进行编译,并支持 Spark 3.2 .x 及更高版本。

重要

使用与您的 Scala 和 Spark 版本兼容的 Spark Connector 工件。

以下 Maven pom.xml文件摘录显示了如何包含与 Scala 2.12兼容的依赖项:

<dependencies>
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.12</artifactId>
<version>10.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.3.1</version>
</dependency>
</dependencies>

通过 SparkSession 指定Connector配置时,必须为设置添加适当的前缀。 有关详细信息和其他可用的MongoDB Spark Connector选项,请参阅《 配置Spark 》指南。

package com.mongodb.spark_examples;
import org.apache.spark.sql.SparkSession;
public final class GettingStarted {
public static void main(final String[] args) throws InterruptedException {
/* Create the SparkSession.
* If config arguments are passed from the command line using --conf,
* parse args for the values to set.
*/
SparkSession spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/test.myCollection")
.config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/test.myCollection")
.getOrCreate();
// Application logic
}
}
  • spark.mongodb.read.connection.uri指定MongoDB服务器解决( 127.0.0.1 )、要连接的数据库( test )、要从中读取数据的集合( myCollection ) 以及读取偏好(read preference)。

  • spark.mongodb.write.connection.uri指定 MongoDB 服务器地址 ( 127.0.0.1 )、要连接的数据库 ( test ) 以及要写入数据的集合 ( myCollection )。

您可以使用SparkSession对象将数据写入 MongoDB、从 MongoDB 读取数据、创建数据集以及执行 SQL 操作。

重要

在版本 10.0.0 及更高版本的 Connector 中,请使用 mongodb 格式读取和写入 MongoDB:

df = spark.read.format("mongodb").load()

本教程使用 pyspark shell,但该代码也适用于独立的 Python 应用程序。

在启动 pyspark Shell 时,您可以指定:

  • --packages 选项以下载 MongoDB Spark Connector 包。提供了以下包:

    • mongo-spark-connector

  • --conf 选项以配置 MongoDB Spark Connnector。这些设置可配置 SparkConf 对象。

    注意

    如果使用SparkConf配置Spark Connector,则必须为设置相应地添加前缀。 有关详细信息和其他可用的MongoDB Spark Connector选项,请参阅《配置Spark 》指南。

以下示例从命令行启动 pyspark Shell:

./bin/pyspark --conf "spark.mongodb.read.connection.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred" \
--conf "spark.mongodb.write.connection.uri=mongodb://127.0.0.1/test.myCollection" \
--packages org.mongodb.spark:mongo-spark-connector_2.12:10.4.0
  • spark.mongodb.read.connection.uri指定 MongoDB 服务器地址 ( 127.0.0.1 )、要连接的数据库 ( test )、要从中读取数据的集合 ( myCollection ) 以及读取偏好。

  • spark.mongodb.write.connection.uri指定MongoDB服务器解决( 127.0.0.1 )、要连接的数据库( test ) 以及要写入数据的集合( myCollection )。 默认连接到端口27017

  • packages 选项以 groupId:artifactId:version 格式指定 Spark Connector 的 Maven 坐标。

本教程中的示例将使用此数据库和集合。

注意

在启动 pyspark 时,您默认获得一个名为 sparkSparkSession spark对象。在独立的 Python 应用程序中,您需要显式地创建 SparkSession 对象,如下所示。

如果您在启动 pyspark 时指定 spark.mongodb.read.connection.urispark.mongodb.write.connection.uri 配置选项,则默认 SparkSession 对象将使用它们。如果您希望在 pyspark 中创建自己的 SparkSession 对象,则可以使用 SparkSession.builder 并指定不同的配置选项。

from pyspark.sql import SparkSession
my_spark = SparkSession \
.builder \
.appName("myApp") \
.config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/test.coll") \
.config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/test.coll") \
.getOrCreate()

您可以使用 SparkSession 对象在 MongoDB 中写入和读取数据、创建 DataFrame 以及执行 SQL 操作。

重要

在版本 10.0.0 及更高版本的 Connector 中,请使用 mongodb 格式读取和写入 MongoDB:

df = spark.read.format("mongodb").load()

启动 Spark Shell 时,请指定:

  • --packages 选项以下载 MongoDB Spark Connector 包。提供了以下包:

    • mongo-spark-connector

  • --conf 选项以配置 MongoDB Spark Connnector。这些设置可配置 SparkConf 对象。

    注意

    如果使用SparkConf配置Spark Connector,则必须为设置相应地添加前缀。 有关详细信息和其他可用的MongoDB Spark Connector选项,请参阅《配置Spark 》指南。

例如,

./bin/spark-shell --conf "spark.mongodb.read.connection.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred" \
--conf "spark.mongodb.write.connection.uri=mongodb://127.0.0.1/test.myCollection" \
--packages org.mongodb.spark:mongo-spark-connector_2.12:10.4.0
  • spark.mongodb.read.connection.uri指定 MongoDB 服务器地址 ( 127.0.0.1 )、要连接的数据库 ( test )、要从中读取数据的集合 ( myCollection ) 以及读取偏好。

  • spark.mongodb.write.connection.uri指定MongoDB服务器解决( 127.0.0.1 )、要连接的数据库( test ) 以及要写入数据的集合( myCollection )。 默认连接到端口27017

  • packages 选项以 groupId:artifactId:version 格式指定 Spark Connector 的 Maven 坐标。

通过在 中导入以下包,为 和 对象启用 Connector 特定的函数和隐式函数:MongoDBSparkSessionDatasetSparkshell

import com.mongodb.spark._

当数据集操作需要读取 MongoDB 或写入 MongoDB 时,会自动连接到 MongoDB。

向依赖项管理工具提供 Spark Core、Spark SQL 和 MongoDB Spark Connector 依赖项。

以下摘录演示了如何将这些依赖项包含在 SBTbuild.scala 文件:

scalaVersion := "2.12",
libraryDependencies ++= Seq(
"org.mongodb.spark" %% "mongo-spark-connector_2.12" % "10.4.0",
"org.apache.spark" %% "spark-core" % "3.3.1",
"org.apache.spark" %% "spark-sql" % "3.3.1"
)

在通过 SparkSession 指定 Connector 配置时,您必须相应地为设置添加前缀。有关详细信息和其他可用的 MongoDB Spark Connector 选项,请参阅配置 Spark 指南。

package com.mongodb
object GettingStarted {
def main(args: Array[String]): Unit = {
/* Create the SparkSession.
* If config arguments are passed from the command line using --conf,
* parse args for the values to set.
*/
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/test.myCollection")
.config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/test.myCollection")
.getOrCreate()
}
}

如果得到 java.net.BindException: Can't assign requested address

  • 检查并确保没有其他 Spark Shell 正在运行。

  • 尝试设置 SPARK_LOCAL_IP 环境变量;例如

    export SPARK_LOCAL_IP=127.0.0.1
  • 尝试在启动 Spark Shell 时加入以下选项:

    --driver-java-options "-Djava.net.preferIPv4Stack=true"

如果运行本教程中的示例时出现错误,可能需要清除本地 Ivy 缓存(~/.ivy2/cache/org.mongodb.spark~/.ivy2/jars)。

后退

MongoDB Connector for Spark