Docs Menu
Docs Home
/
Spark Connector
/

Read from MongoDB in Batch Mode

On this page

  • Overview
  • Schema Inference
  • Filters
  • SQL Queries
  • API Documentation

To read data from MongoDB, call the read() method on your SparkSession object. This method returns a DataFrameReader object, which you can use to specify the format and other configuration settings for your batch read operation.

You must specify the following configuration settings to read from MongoDB:

Setting
Description
dataFrame.read.format()
Specifies the format of the underlying input data source. Use mongodb to read from MongoDB.
dataFrame.read.option()

Use the option method to configure batch read settings, including the MongoDB deployment connection string, MongoDB database and collection, and partitioner configuration.

For a list of batch read configuration options, see the Batch Read Configuration Options guide.

The following code example shows how to use the previous configuration settings to read data from people.contacts in MongoDB:

Dataset<Row> dataFrame = spark.read()
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load();

Tip

DataFrame Type

DataFrame doesn't exist as a class in the Java API. Use Dataset<Row> to reference a DataFrame.

To read data from MongoDB, call the read function on your SparkSession object. This function returns a DataFrameReader object, which you can use to specify the format and other configuration settings for your batch read operation.

You must specify the following configuration settings to read from MongoDB:

Setting
Description
dataFrame.read.format()
Specifies the format of the underlying input data source. Use mongodb to read from MongoDB.
dataFrame.read.option()

Use the option method to configure batch read settings, including the MongoDB deployment connection string, MongoDB database and collection, and partitioner configuration.

For a list of batch read configuration options, see the Batch Read Configuration Options guide.

The following code example shows how to use the previous configuration settings to read data from people.contacts in MongoDB:

dataFrame = spark.read
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load()

To read data from MongoDB, call the read method on your SparkSession object. This method returns a DataFrameReader object, which you can use to specify the format and other configuration settings for your batch read operation.

You must specify the following configuration settings to read from MongoDB:

Setting
Description
dataFrame.read.format()
Specifies the format of the underlying input data source. Use mongodb to read from MongoDB.
dataFrame.read.option()

Use the option method to configure batch read settings, including the MongoDB deployment connection string, MongoDB database and collection, and partitioner configuration.

For a list of batch read configuration options, see the Batch Read Configuration Options guide.

The following code example shows how to use the previous configuration settings to read data from people.contacts in MongoDB:

val dataFrame = spark.read
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load()

Tip

DataFrame Type

A DataFrame is represented by a Dataset of Row objects. The DataFrame type is an alias for Dataset[Row].

When you load a Dataset or DataFrame without a schema, Spark samples the records to infer the schema of the collection.

Suppose that the MongoDB collection people.contacts contains the following documents:

{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 }
{ "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 }
{ "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 }
{ "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 }
{ "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 }
{ "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 }
{ "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 }
{ "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 }
{ "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 }
{ "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }

The following operation loads data from people.contacts and infers the schema of the DataFrame:

Dataset<Row> dataFrame = spark.read()
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load();

To see the inferred schema, use the printSchema() method on your Dataset<Row> object, as shown in the following example:

dataFrame.printSchema();

To see the data in the DataFrame, use the show() method on your DataFrame object, as shown in the following example:

dataFrame.show();

When you load a Dataset or DataFrame without a schema, Spark samples the records to infer the schema of the collection.

Suppose that the MongoDB collection people.contacts contains the following documents:

{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 }
{ "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 }
{ "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 }
{ "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 }
{ "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 }
{ "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 }
{ "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 }
{ "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 }
{ "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 }
{ "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }

The following operation loads data from people.contacts and infers the schema of the DataFrame:

dataFrame = spark.read
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load()

To see the inferred schema, use the printSchema() function on your DataFrame object, as shown in the following example:

dataFrame.printSchema()

To see the data in the DataFrame, use the show() function on your DataFrame object, as shown in the following example:

dataFrame.show()

When you load a Dataset or DataFrame without a schema, Spark samples the records to infer the schema of the collection.

Suppose that the MongoDB collection people.contacts contains the following documents:

{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 }
{ "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 }
{ "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 }
{ "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 }
{ "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 }
{ "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 }
{ "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 }
{ "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 }
{ "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 }
{ "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }

The following operation loads data from people.contacts and infers the schema of the DataFrame:

val dataFrame = spark.read()
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load()

To see the inferred schema, use the printSchema() method on your DataFrame object, as shown in the following example:

dataFrame.printSchema()

To see the data in the DataFrame, use the show() method on your DataFrame object, as shown in the following example:

dataFrame.show()

You can specify a schema containing known field values to use during schema inference by specifying the schemaHint configuration option. You can specify the schemaHint option in any of the following Spark formats:

Type
Format
DDL
<field one name> <FIELD ONE TYPE>, <field two name> <FIELD TWO TYPE>
SQL DDL
STRUCT<<field one name>: <FIELD ONE TYPE>, <field two name>: <FIELD TWO TYPE>
JSON
{ "type": "struct", "fields": [
{ "name": "<field name>", "type": "<field type>", "nullable": <true/false> },
{ "name": "<field name>", "type": "<field type>", "nullable": <true/false> }]}

The following example shows how to specify the schemaHint option in each format by using the Spark shell. The example specifies a string-valued field named "value" and an integer-valued field named "count".

import org.apache.spark.sql.types._
val mySchema = StructType(Seq(
StructField("value", StringType),
StructField("count", IntegerType))
// Generate DDL format
mySchema.toDDL
// Generate SQL DDL format
mySchema.sql
// Generate Simple String DDL format
mySchema.simpleString
// Generate JSON format
mySchema.json

You can also specify the schemaHint option in the Simple String DDL format, or in JSON format by using PySpark, as shown in the following example:

from pyspark.sql.types import StructType, StructField, StringType, IntegerType
mySchema = StructType([
StructField('value', StringType(), True),
StructField('count', IntegerType(), True)])
# Generate Simple String DDL format
mySchema.simpleString()
# Generate JSON format
mySchema.json()

When using filters with DataFrames or Datasets, the underlying MongoDB Connector code constructs an aggregation pipeline to filter the data in MongoDB before sending it to Spark. This improves Spark performance by retrieving and processing only the data you need.

MongoDB Spark Connector turns the following filters into aggregation pipeline stages:

  • And

  • EqualNullSafe

  • EqualTo

  • GreaterThan

  • GreaterThanOrEqual

  • In

  • IsNull

  • LessThan

  • LessThanOrEqual

  • Not

  • Or

  • StringContains

  • StringEndsWith

  • StringStartsWith

Use filter() to read a subset of data from your MongoDB collection.

Consider a collection named fruit that contains the following documents:

{ "_id" : 1, "type" : "apple", "qty" : 5 }
{ "_id" : 2, "type" : "orange", "qty" : 10 }
{ "_id" : 3, "type" : "banana", "qty" : 15 }

First, set up a DataFrame object to connect with your default MongoDB data source:

df = spark.read.format("mongodb").load()

The following example includes only records in which the qty field is greater than or equal to 10.

df.filter(df['qty'] >= 10).show()

The operation prints the following output:

+---+----+------+
|_id| qty| type|
+---+----+------+
|2.0|10.0|orange|
|3.0|15.0|banana|
+---+----+------+

When using filters with DataFrames or Datasets, the underlying MongoDB Connector code constructs an aggregation pipeline to filter the data in MongoDB before sending it to Spark. This improves Spark performance by retrieving and processing only the data you need.

MongoDB Spark Connector turns the following filters into aggregation pipeline stages:

  • And

  • EqualNullSafe

  • EqualTo

  • GreaterThan

  • GreaterThanOrEqual

  • In

  • IsNull

  • LessThan

  • LessThanOrEqual

  • Not

  • Or

  • StringContains

  • StringEndsWith

  • StringStartsWith

The following example filters and output the characters with ages under 100:

df.filter(df("age") < 100).show()

The operation outputs the following:

+--------------------+---+-------------+
| _id|age| name|
+--------------------+---+-------------+
|[5755d7b4566878c9...| 50|Bilbo Baggins|
|[5755d7b4566878c9...| 82| Fíli|
|[5755d7b4566878c9...| 77| Kíli|
+--------------------+---+-------------+

Before running SQL queries on your Dataset, you must register a temporary view for the Dataset.

The following operation registers a characters table and then queries it to find all characters that are 100 or older:

implicitDS.createOrReplaceTempView("characters");
Dataset<Row> centenarians = spark.sql("SELECT name, age FROM characters WHERE age >= 100");
centenarians.show();

centenarians.show() outputs the following:

+-------+----+
| name| age|
+-------+----+
|Gandalf|1000|
| Thorin| 195|
| Balin| 178|
| Dwalin| 169|
| Óin| 167|
| Glóin| 158|
+-------+----+

Before you can run SQL queries against your DataFrame, you need to register a temporary table.

The following example registers a temporary table called temp, then uses SQL to query for records in which the type field contains the letter e:

df.createOrReplaceTempView("temp")
some_fruit = spark.sql("SELECT type, qty FROM temp WHERE type LIKE '%e%'")
some_fruit.show()

In the pyspark shell, the operation prints the following output:

+------+----+
| type| qty|
+------+----+
| apple| 5.0|
|orange|10.0|
+------+----+

Before running SQL queries on your Dataset, you must register a temporary view for the Dataset.

The following operation registers a characters table and then queries it to find all characters that are 100 or older:

val characters = spark.read.format("mongodb").as[Character]
characters.createOrReplaceTempView("characters")
val centenarians = spark.sql("SELECT name, age FROM characters WHERE age >= 100")
centenarians.show()

To learn more about the types used in these examples, see the following Apache Spark API documentation:

Back

Batch Mode