Getting Started with the Spark Connector
On this page
Prerequisites
Basic working knowledge of MongoDB and Apache Spark. Refer to the MongoDB documentation, Spark documentation, and this MongoDB white paper for more details.
MongoDB version 4.0 or later
Spark version 3.1 through 3.2.4
Java 8 or later
Getting Started
Important
In version 10.0.0 and later of the Connector, use the format
mongodb
to read from and write to MongoDB:
df = spark.read.format("mongodb").load()
Dependency Management
Provide the Spark Core, Spark SQL, and MongoDB Spark Connector dependencies to your dependency management tool.
Beginning in version 3.2.0, Apache Spark supports both Scala 2.12 and 2.13. Spark 3.1.3 and previous versions support only Scala 2.12. To provide support for both Scala versions, version 10.2.2 of the Spark Connector produces two artifacts:
org.mongodb.spark:mongo-spark-connector_2.12:10.2.2
is compiled against Scala 2.12, and supports Spark 3.1.x and above.org.mongodb.spark:mongo-spark-connector_2.13:10.2.2
is compiled against Scala 2.13, and supports Spark 3.2.x and above.
Important
Use the Spark Connector artifact that's compatible with your versions of Scala and Spark.
The following excerpt from a Maven pom.xml
file shows how to include dependencies
compatible with Scala 2.12:
<dependencies> <dependency> <groupId>org.mongodb.spark</groupId> <artifactId>mongo-spark-connector_2.12</artifactId> <version>10.2.2</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.3.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.3.1</version> </dependency> </dependencies>
Configuration
When specifying the Connector configuration via SparkSession
, you
must prefix the settings appropriately. For details and other
available MongoDB Spark Connector options, see the
Configuring Spark guide.
package com.mongodb.spark_examples; import org.apache.spark.sql.SparkSession; public final class GettingStarted { public static void main(final String[] args) throws InterruptedException { /* Create the SparkSession. * If config arguments are passed from the command line using --conf, * parse args for the values to set. */ SparkSession spark = SparkSession.builder() .master("local") .appName("MongoSparkConnectorIntro") .config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/test.myCollection") .config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/test.myCollection") .getOrCreate(); // Application logic } }
The
spark.mongodb.read.connection.uri
specifies the MongoDB server address(127.0.0.1
), the database to connect (test
), and the collection (myCollection
) from which to read data, and the read preference.The
spark.mongodb.write.connection.uri
specifies the MongoDB server address(127.0.0.1
), the database to connect (test
), and the collection (myCollection
) to which to write data.
You can use a SparkSession
object to write data to MongoDB, read
data from MongoDB, create Datasets, and perform SQL operations.
Important
In version 10.0.0 and later of the Connector, use the format
mongodb
to read from and write to MongoDB:
df = spark.read.format("mongodb").load()
Python Spark Shell
This tutorial uses the pyspark
shell, but the code works
with self-contained Python applications as well.
When starting the pyspark
shell, you can specify:
the
--packages
option to download the MongoDB Spark Connector package. The following package is available:mongo-spark-connector
the
--conf
option to configure the MongoDB Spark Connnector. These settings configure theSparkConf
object.Note
If you use
SparkConf
to configure the Spark Connector, you must prefix the settings appropriately. For details and other available MongoDB Spark Connector options, see the Configuring Spark guide.
The following example starts the pyspark
shell from the command
line:
./bin/pyspark --conf "spark.mongodb.read.connection.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred" \ --conf "spark.mongodb.write.connection.uri=mongodb://127.0.0.1/test.myCollection" \ --packages org.mongodb.spark:mongo-spark-connector_2.12:10.2.2
The
spark.mongodb.read.connection.uri
specifies the MongoDB server address (127.0.0.1
), the database to connect (test
), and the collection (myCollection
) from which to read data, and the read preference.The
spark.mongodb.write.connection.uri
specifies the MongoDB server address (127.0.0.1
), the database to connect (test
), and the collection (myCollection
) to which to write data. Connects to port27017
by default.The
packages
option specifies the Spark Connector's Maven coordinates, in the formatgroupId:artifactId:version
.
The examples in this tutorial will use this database and collection.
Create a SparkSession
Object
Note
When you start pyspark
you get a SparkSession
object called
spark
by default. In a standalone Python application, you need
to create your SparkSession
object explicitly, as show below.
If you specified the spark.mongodb.read.connection.uri
and spark.mongodb.write.connection.uri
configuration options when you
started pyspark
, the default SparkSession
object uses them.
If you'd rather create your own SparkSession
object from within
pyspark
, you can use SparkSession.builder
and specify different
configuration options.
from pyspark.sql import SparkSession my_spark = SparkSession \ .builder \ .appName("myApp") \ .config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/test.coll") \ .config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/test.coll") \ .getOrCreate()
You can use a SparkSession
object to write data to MongoDB, read
data from MongoDB, create DataFrames, and perform SQL operations.
Important
In version 10.0.0 and later of the Connector, use the format
mongodb
to read from and write to MongoDB:
df = spark.read.format("mongodb").load()
Spark Shell
When starting the Spark shell, specify:
the
--packages
option to download the MongoDB Spark Connector package. The following package is available:mongo-spark-connector
the
--conf
option to configure the MongoDB Spark Connnector. These settings configure theSparkConf
object.Note
If you use
SparkConf
to configure the Spark Connector, you must prefix the settings appropriately. For details and other available MongoDB Spark Connector options, see the Configuring Spark guide.
For example,
./bin/spark-shell --conf "spark.mongodb.read.connection.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred" \ --conf "spark.mongodb.write.connection.uri=mongodb://127.0.0.1/test.myCollection" \ --packages org.mongodb.spark:mongo-spark-connector_2.12:10.2.2
The
spark.mongodb.read.connection.uri
specifies the MongoDB server address (127.0.0.1
), the database to connect (test
), and the collection (myCollection
) from which to read data, and the read preference.The
spark.mongodb.write.connection.uri
specifies the MongoDB server address (127.0.0.1
), the database to connect (test
), and the collection (myCollection
) to which to write data. Connects to port27017
by default.The
packages
option specifies the Spark Connector's Maven coordinates, in the formatgroupId:artifactId:version
.
Import the MongoDB Connector Package
Enable MongoDB Connector-specific functions and implicits for your
SparkSession
and Dataset
objects by importing the following
package in the Spark shell:
import com.mongodb.spark._
Connect to MongoDB
Connection to MongoDB happens automatically when a Dataset action requires a read from MongoDB or a write to MongoDB.
Self-Contained Scala Application
Dependency Management
Provide the Spark Core, Spark SQL, and MongoDB Spark Connector dependencies to your dependency management tool.
The following excerpt demonstrates how to include these dependencies in
a SBT build.scala
file:
scalaVersion := "2.12", libraryDependencies ++= Seq( "org.mongodb.spark" %% "mongo-spark-connector_2.12" % "10.2.2", "org.apache.spark" %% "spark-core" % "3.3.1", "org.apache.spark" %% "spark-sql" % "3.3.1" )
Configuration
When specifying the Connector configuration via SparkSession
, you
must prefix the settings appropriately. For details and other
available MongoDB Spark Connector options, see the
Configuring Spark guide.
package com.mongodb object GettingStarted { def main(args: Array[String]): Unit = { /* Create the SparkSession. * If config arguments are passed from the command line using --conf, * parse args for the values to set. */ import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .master("local") .appName("MongoSparkConnectorIntro") .config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/test.myCollection") .config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/test.myCollection") .getOrCreate() } }
Troubleshooting
If you get a java.net.BindException: Can't assign requested address
,
Check to ensure that you do not have another Spark shell already running.
Try setting the
SPARK_LOCAL_IP
environment variable; e.g.export SPARK_LOCAL_IP=127.0.0.1 Try including the following option when starting the Spark shell:
--driver-java-options "-Djava.net.preferIPv4Stack=true"
If you have errors running the examples in this tutorial, you may need
to clear your local Ivy cache (~/.ivy2/cache/org.mongodb.spark
and
~/.ivy2/jars
).