Docs Menu
Docs Home
/
MongoDB Spark Connector

Getting Started with the Spark Connector

On this page

  • Prerequisites
  • Getting Started
  • Tutorials
  • Basic working knowledge of MongoDB and Apache Spark. Refer to the MongoDB documentation, Spark documentation, and this MongoDB white paper for more details.

  • MongoDB version 4.0 or later

  • Spark version 3.1 through 3.2.4

  • Java 8 or later

Important

In version 10.0.0 and later of the Connector, use the format mongodb to read from and write to MongoDB:

df = spark.read.format("mongodb").load()

Provide the Spark Core, Spark SQL, and MongoDB Spark Connector dependencies to your dependency management tool.

The following excerpt from a Maven pom.xml file shows how to include dependencies compatible with Scala 2.12:

<dependencies>
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.12</artifactId>
<version>10.0.5</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.0.2</version>
</dependency>
</dependencies>

When specifying the Connector configuration via SparkSession, you must prefix the settings appropriately. For details and other available MongoDB Spark Connector options, see the Configuring Spark guide.

package com.mongodb.spark_examples;
import org.apache.spark.sql.SparkSession;
public final class GettingStarted {
public static void main(final String[] args) throws InterruptedException {
/* Create the SparkSession.
* If config arguments are passed from the command line using --conf,
* parse args for the values to set.
*/
SparkSession spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/test.myCollection")
.config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/test.myCollection")
.getOrCreate();
// Application logic
}
}
  • The spark.mongodb.read.connection.uri specifies the MongoDB server address(127.0.0.1), the database to connect (test), and the collection (myCollection) from which to read data, and the read preference.

  • The spark.mongodb.write.connection.uri specifies the MongoDB server address(127.0.0.1), the database to connect (test), and the collection (myCollection) to which to write data.

You can use a SparkSession object to write data to MongoDB, read data from MongoDB, create Datasets, and perform SQL operations.

Important

In version 10.0.0 and later of the Connector, use the format mongodb to read from and write to MongoDB:

df = spark.read.format("mongodb").load()

This tutorial uses the pyspark shell, but the code works with self-contained Python applications as well.

When starting the pyspark shell, you can specify:

  • the --packages option to download the MongoDB Spark Connector package. The following package is available:

    • mongo-spark-connector

  • the --conf option to configure the MongoDB Spark Connnector. These settings configure the SparkConf object.

    Note

    If you use SparkConf to configure the Spark Connector, you must prefix the settings appropriately. For details and other available MongoDB Spark Connector options, see the Configuring Spark guide.

The following example starts the pyspark shell from the command line:

./bin/pyspark --conf "spark.mongodb.read.connection.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred" \
--conf "spark.mongodb.write.connection.uri=mongodb://127.0.0.1/test.myCollection" \
--packages org.mongodb.spark:mongo-spark-connector_2.12:10.0.5
  • The spark.mongodb.read.connection.uri specifies the MongoDB server address (127.0.0.1), the database to connect (test), and the collection (myCollection) from which to read data, and the read preference.

  • The spark.mongodb.write.connection.uri specifies the MongoDB server address (127.0.0.1), the database to connect (test), and the collection (myCollection) to which to write data. Connects to port 27017 by default.

  • The packages option specifies the Spark Connector's Maven coordinates, in the format groupId:artifactId:version.

The examples in this tutorial will use this database and collection.

Note

When you start pyspark you get a SparkSession object called spark by default. In a standalone Python application, you need to create your SparkSession object explicitly, as show below.

If you specified the spark.mongodb.read.connection.uri and spark.mongodb.write.connection.uri configuration options when you started pyspark, the default SparkSession object uses them. If you'd rather create your own SparkSession object from within pyspark, you can use SparkSession.builder and specify different configuration options.

from pyspark.sql import SparkSession
my_spark = SparkSession \
.builder \
.appName("myApp") \
.config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/test.coll") \
.config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/test.coll") \
.getOrCreate()

You can use a SparkSession object to write data to MongoDB, read data from MongoDB, create DataFrames, and perform SQL operations.

Important

In version 10.0.0 and later of the Connector, use the format mongodb to read from and write to MongoDB:

df = spark.read.format("mongodb").load()

When starting the Spark shell, specify:

  • the --packages option to download the MongoDB Spark Connector package. The following package is available:

    • mongo-spark-connector

  • the --conf option to configure the MongoDB Spark Connnector. These settings configure the SparkConf object.

    Note

    If you use SparkConf to configure the Spark Connector, you must prefix the settings appropriately. For details and other available MongoDB Spark Connector options, see the Configuring Spark guide.

For example,

./bin/spark-shell --conf "spark.mongodb.read.connection.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred" \
--conf "spark.mongodb.write.connection.uri=mongodb://127.0.0.1/test.myCollection" \
--packages org.mongodb.spark:mongo-spark-connector_2.12:10.0.5
  • The spark.mongodb.read.connection.uri specifies the MongoDB server address (127.0.0.1), the database to connect (test), and the collection (myCollection) from which to read data, and the read preference.

  • The spark.mongodb.write.connection.uri specifies the MongoDB server address (127.0.0.1), the database to connect (test), and the collection (myCollection) to which to write data. Connects to port 27017 by default.

  • The packages option specifies the Spark Connector's Maven coordinates, in the format groupId:artifactId:version.

Enable MongoDB Connector-specific functions and implicits for your SparkSession and Dataset objects by importing the following package in the Spark shell:

import com.mongodb.spark._

Connection to MongoDB happens automatically when a Dataset action requires a read from MongoDB or a write to MongoDB.

Provide the Spark Core, Spark SQL, and MongoDB Spark Connector dependencies to your dependency management tool.

The following excerpt demonstrates how to include these dependencies in a SBT build.scala file:

scalaVersion := "2.12.13",
libraryDependencies ++= Seq(
"org.mongodb.spark" %% "mongo-spark-connector" % "10.0.5",
"org.apache.spark" %% "spark-core" % "3.0.2",
"org.apache.spark" %% "spark-sql" % "3.0.2"
)

When specifying the Connector configuration via SparkSession, you must prefix the settings appropriately. For details and other available MongoDB Spark Connector options, see the Configuring Spark guide.

package com.mongodb
object GettingStarted {
def main(args: Array[String]): Unit = {
/* Create the SparkSession.
* If config arguments are passed from the command line using --conf,
* parse args for the values to set.
*/
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/test.myCollection")
.config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/test.myCollection")
.getOrCreate()
}
}

If you get a java.net.BindException: Can't assign requested address,

  • Check to ensure that you do not have another Spark shell already running.

  • Try setting the SPARK_LOCAL_IP environment variable; e.g.

    export SPARK_LOCAL_IP=127.0.0.1
  • Try including the following option when starting the Spark shell:

    --driver-java-options "-Djava.net.preferIPv4Stack=true"

If you have errors running the examples in this tutorial, you may need to clear your local Ivy cache (~/.ivy2/cache/org.mongodb.spark and ~/.ivy2/jars).

Back

MongoDB Connector for Spark