开始使用 Spark Connector
先决条件
具备 MongoDB 和 Apache Spark 的基本使用知识。 请参阅 MongoDB 文档 、 Spark 文档 ,并参阅此 MongoDB 白皮书 以了解更多详细信息。
MongoDB 4.0 或更高版本
Spark 版本 3.1 至 3.5
Java 8 或更高版本
开始体验
重要
在 Connector 版本10.0.0及更高版本中,使用 mongodb
格式读取和写入MongoDB:
df = spark.read.format("mongodb").load()
依赖项管理
向依赖项管理工具提供 Spark Core、Spark SQL 和 MongoDB Spark Connector 依赖项。
从版本3.2.0开始, Apache Spark 同时支持 Scala 2.12和2.13 。 Spark 3.1.3和以前的版本仅支持 Scala 2 。 12 。 要为两个 Scala 版本提供支持,请使用10版本。 4 。 Spark Connector 的0会生成两个工件:
org.mongodb.spark:mongo-spark-connector_2.12:10.4.0
针对 Scala 2.12进行编译,并支持 Spark 3.1 .x 及更高版本。org.mongodb.spark:mongo-spark-connector_2.13:10.4.0
针对 Scala 2.13进行编译,并支持 Spark 3.2 .x 及更高版本。
重要
使用与您的 Scala 和 Spark 版本兼容的 Spark Connector 工件。
以下 Maven pom.xml
文件摘录显示了如何包含与 Scala 2.12兼容的依赖项:
<dependencies> <dependency> <groupId>org.mongodb.spark</groupId> <artifactId>mongo-spark-connector_2.12</artifactId> <version>10.4.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.3.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.3.1</version> </dependency> </dependencies>
配置
通过 SparkSession
指定Connector配置时,必须为设置添加适当的前缀。 有关详细信息和其他可用的MongoDB Spark Connector选项,请参阅《 配置Spark 》指南。
package com.mongodb.spark_examples; import org.apache.spark.sql.SparkSession; public final class GettingStarted { public static void main(final String[] args) throws InterruptedException { /* Create the SparkSession. * If config arguments are passed from the command line using --conf, * parse args for the values to set. */ SparkSession spark = SparkSession.builder() .master("local") .appName("MongoSparkConnectorIntro") .config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/test.myCollection") .config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/test.myCollection") .getOrCreate(); // Application logic } }
spark.mongodb.read.connection.uri
指定MongoDB服务器解决(127.0.0.1
)、要连接的数据库(test
)、要从中读取数据的集合(myCollection
) 以及读取偏好(read preference)。spark.mongodb.write.connection.uri
指定 MongoDB 服务器地址 (127.0.0.1
)、要连接的数据库 (test
) 以及要写入数据的集合 (myCollection
)。
您可以使用SparkSession
对象将数据写入 MongoDB、从 MongoDB 读取数据、创建数据集以及执行 SQL 操作。
重要
在版本 10.0.0 及更高版本的 Connector 中,请使用 mongodb
格式读取和写入 MongoDB:
df = spark.read.format("mongodb").load()
Python Spark Shell
本教程使用 pyspark
shell,但该代码也适用于独立的 Python 应用程序。
在启动 pyspark
Shell 时,您可以指定:
--packages
选项以下载 MongoDB Spark Connector 包。提供了以下包:mongo-spark-connector
--conf
选项以配置 MongoDB Spark Connnector。这些设置可配置SparkConf
对象。注意
如果使用
SparkConf
配置Spark Connector,则必须为设置相应地添加前缀。 有关详细信息和其他可用的MongoDB Spark Connector选项,请参阅《配置Spark 》指南。
以下示例从命令行启动 pyspark
Shell:
./bin/pyspark --conf "spark.mongodb.read.connection.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred" \ --conf "spark.mongodb.write.connection.uri=mongodb://127.0.0.1/test.myCollection" \ --packages org.mongodb.spark:mongo-spark-connector_2.12:10.4.0
spark.mongodb.read.connection.uri
指定 MongoDB 服务器地址 (127.0.0.1
)、要连接的数据库 (test
)、要从中读取数据的集合 (myCollection
) 以及读取偏好。spark.mongodb.write.connection.uri
指定MongoDB服务器解决(127.0.0.1
)、要连接的数据库(test
) 以及要写入数据的集合(myCollection
)。 默认连接到端口27017
。packages
选项以groupId:artifactId:version
格式指定 Spark Connector 的 Maven 坐标。
本教程中的示例将使用此数据库和集合。
创建SparkSession
对象
注意
在启动 pyspark
时,您默认获得一个名为 spark
的 SparkSession
spark
对象。在独立的 Python 应用程序中,您需要显式地创建 SparkSession
对象,如下所示。
如果您在启动 pyspark
时指定 spark.mongodb.read.connection.uri
和 spark.mongodb.write.connection.uri
配置选项,则默认 SparkSession
对象将使用它们。如果您希望在 pyspark
中创建自己的 SparkSession
对象,则可以使用 SparkSession.builder
并指定不同的配置选项。
from pyspark.sql import SparkSession my_spark = SparkSession \ .builder \ .appName("myApp") \ .config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/test.coll") \ .config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/test.coll") \ .getOrCreate()
您可以使用 SparkSession
对象在 MongoDB 中写入和读取数据、创建 DataFrame 以及执行 SQL 操作。
重要
在版本 10.0.0 及更高版本的 Connector 中,请使用 mongodb
格式读取和写入 MongoDB:
df = spark.read.format("mongodb").load()
Spark Shell
启动 Spark Shell 时,请指定:
--packages
选项以下载 MongoDB Spark Connector 包。提供了以下包:mongo-spark-connector
--conf
选项以配置 MongoDB Spark Connnector。这些设置可配置SparkConf
对象。注意
如果使用
SparkConf
配置Spark Connector,则必须为设置相应地添加前缀。 有关详细信息和其他可用的MongoDB Spark Connector选项,请参阅《配置Spark 》指南。
例如,
./bin/spark-shell --conf "spark.mongodb.read.connection.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred" \ --conf "spark.mongodb.write.connection.uri=mongodb://127.0.0.1/test.myCollection" \ --packages org.mongodb.spark:mongo-spark-connector_2.12:10.4.0
spark.mongodb.read.connection.uri
指定 MongoDB 服务器地址 (127.0.0.1
)、要连接的数据库 (test
)、要从中读取数据的集合 (myCollection
) 以及读取偏好。spark.mongodb.write.connection.uri
指定MongoDB服务器解决(127.0.0.1
)、要连接的数据库(test
) 以及要写入数据的集合(myCollection
)。 默认连接到端口27017
。packages
选项以groupId:artifactId:version
格式指定 Spark Connector 的 Maven 坐标。
导入 MongoDB Connector 程序包
通过在 中导入以下包,为 和 对象启用 Connector 特定的函数和隐式函数:MongoDBSparkSession
Dataset
Sparkshell
import com.mongodb.spark._
连接至 MongoDB
当数据集操作需要读取 MongoDB 或写入 MongoDB 时,会自动连接到 MongoDB。
自包含 Scala 应用程序
依赖项管理
向依赖项管理工具提供 Spark Core、Spark SQL 和 MongoDB Spark Connector 依赖项。
以下摘录演示了如何将这些依赖项包含在 SBTbuild.scala
文件:
scalaVersion := "2.12", libraryDependencies ++= Seq( "org.mongodb.spark" %% "mongo-spark-connector_2.12" % "10.4.0", "org.apache.spark" %% "spark-core" % "3.3.1", "org.apache.spark" %% "spark-sql" % "3.3.1" )
配置
在通过 SparkSession
指定 Connector 配置时,您必须相应地为设置添加前缀。有关详细信息和其他可用的 MongoDB Spark Connector 选项,请参阅配置 Spark 指南。
package com.mongodb object GettingStarted { def main(args: Array[String]): Unit = { /* Create the SparkSession. * If config arguments are passed from the command line using --conf, * parse args for the values to set. */ import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .master("local") .appName("MongoSparkConnectorIntro") .config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/test.myCollection") .config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/test.myCollection") .getOrCreate() } }
故障排除
如果得到 java.net.BindException: Can't assign requested address
,
检查并确保没有其他 Spark Shell 正在运行。
尝试设置
SPARK_LOCAL_IP
环境变量;例如export SPARK_LOCAL_IP=127.0.0.1 尝试在启动 Spark Shell 时加入以下选项:
--driver-java-options "-Djava.net.preferIPv4Stack=true"
如果运行本教程中的示例时出现错误,可能需要清除本地 Ivy 缓存(~/.ivy2/cache/org.mongodb.spark
和 ~/.ivy2/jars
)。