Read from MongoDB in Batch Mode
Overview
To read data from MongoDB, call the read()
method on your
SparkSession
object. This method returns a
DataFrameReader
object, which you can use to specify the format and other
configuration settings for your batch read operation.
You must specify the following configuration settings to read from MongoDB:
Setting | Description |
---|---|
dataFrame.read.format() | Specifies the format of the underlying input data source. Use mongodb
to read from MongoDB. |
dataFrame.read.option() | Use the For a list of batch read configuration options, see the Batch Read Configuration Options guide. |
The following code example shows how to use the previous
configuration settings to read data from people.contacts
in MongoDB:
Dataset<Row> dataFrame = spark.read() .format("mongodb") .option("database", "people") .option("collection", "contacts") .load();
Tip
DataFrame Type
DataFrame
doesn't exist as a class in the Java API. Use
Dataset<Row>
to reference a DataFrame.
To read data from MongoDB, call the read
function on your
SparkSession
object. This function returns a
DataFrameReader
object, which you can use to specify the format and other configuration settings for your
batch read operation.
You must specify the following configuration settings to read from MongoDB:
Setting | Description |
---|---|
dataFrame.read.format() | Specifies the format of the underlying input data source. Use mongodb
to read from MongoDB. |
dataFrame.read.option() | Use the For a list of batch read configuration options, see the Batch Read Configuration Options guide. |
The following code example shows how to use the previous
configuration settings to read data from people.contacts
in MongoDB:
dataFrame = spark.read .format("mongodb") .option("database", "people") .option("collection", "contacts") .load()
To read data from MongoDB, call the read
method on your
SparkSession
object. This method returns a
DataFrameReader
object, which you can use to specify the format and other configuration settings for your
batch read operation.
You must specify the following configuration settings to read from MongoDB:
Setting | Description |
---|---|
dataFrame.read.format() | Specifies the format of the underlying input data source. Use mongodb
to read from MongoDB. |
dataFrame.read.option() | Use the For a list of batch read configuration options, see the Batch Read Configuration Options guide. |
The following code example shows how to use the previous
configuration settings to read data from people.contacts
in MongoDB:
val dataFrame = spark.read .format("mongodb") .option("database", "people") .option("collection", "contacts") .load()
Tip
DataFrame Type
A DataFrame is represented by a Dataset
of Row
objects.
The DataFrame
type is an alias for Dataset[Row]
.
Schema Inference
When you load a Dataset or DataFrame without a schema, Spark samples the records to infer the schema of the collection.
Suppose that the MongoDB collection people.contacts
contains the following documents:
{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 } { "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 } { "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 } { "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 } { "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 } { "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 } { "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 } { "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 } { "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 } { "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }
The following operation loads data from people.contacts
and infers the schema of the DataFrame:
Dataset<Row> dataFrame = spark.read() .format("mongodb") .option("database", "people") .option("collection", "contacts") .load();
To see the inferred schema, use the printSchema()
method on your Dataset<Row>
object, as shown in the following example:
dataFrame.printSchema();
root |-- _id: struct (nullable = true) | |-- oid: string (nullable = true) |-- age: integer (nullable = true) |-- name: string (nullable = true)
To see the data in the DataFrame, use the show()
method on your DataFrame
object,
as shown in the following example:
dataFrame.show();
+--------------------+----+-------------+ | _id| age| name| +--------------------+----+-------------+ |[585024d558bef808...| 50|Bilbo Baggins| |[585024d558bef808...|1000| Gandalf| |[585024d558bef808...| 195| Thorin| |[585024d558bef808...| 178| Balin| |[585024d558bef808...| 77| Kíli| |[585024d558bef808...| 169| Dwalin| |[585024d558bef808...| 167| Óin| |[585024d558bef808...| 158| Glóin| |[585024d558bef808...| 82| Fíli| |[585024d558bef808...|null| Bombur| +--------------------+----+-------------+
When you load a Dataset or DataFrame without a schema, Spark samples the records to infer the schema of the collection.
Suppose that the MongoDB collection people.contacts
contains the following documents:
{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 } { "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 } { "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 } { "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 } { "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 } { "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 } { "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 } { "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 } { "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 } { "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }
The following operation loads data from people.contacts
and infers the schema of the DataFrame:
dataFrame = spark.read .format("mongodb") .option("database", "people") .option("collection", "contacts") .load()
To see the inferred schema, use the printSchema()
function on your DataFrame
object, as shown in the following example:
dataFrame.printSchema()
root |-- _id: struct (nullable = true) | |-- oid: string (nullable = true) |-- age: integer (nullable = true) |-- name: string (nullable = true)
To see the data in the DataFrame, use the show()
function on your DataFrame
object, as shown in the following example:
dataFrame.show()
+--------------------+----+-------------+ | _id| age| name| +--------------------+----+-------------+ |[585024d558bef808...| 50|Bilbo Baggins| |[585024d558bef808...|1000| Gandalf| |[585024d558bef808...| 195| Thorin| |[585024d558bef808...| 178| Balin| |[585024d558bef808...| 77| Kíli| |[585024d558bef808...| 169| Dwalin| |[585024d558bef808...| 167| Óin| |[585024d558bef808...| 158| Glóin| |[585024d558bef808...| 82| Fíli| |[585024d558bef808...|null| Bombur| +--------------------+----+-------------+
When you load a Dataset or DataFrame without a schema, Spark samples the records to infer the schema of the collection.
Suppose that the MongoDB collection people.contacts
contains the following documents:
{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 } { "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 } { "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 } { "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 } { "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 } { "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 } { "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 } { "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 } { "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 } { "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }
The following operation loads data from people.contacts
and infers the schema of the DataFrame:
val dataFrame = spark.read() .format("mongodb") .option("database", "people") .option("collection", "contacts") .load()
To see the inferred schema, use the printSchema()
method on your DataFrame
object, as shown in the following example:
dataFrame.printSchema()
root |-- _id: struct (nullable = true) | |-- oid: string (nullable = true) |-- age: integer (nullable = true) |-- name: string (nullable = true)
To see the data in the DataFrame, use the show()
method on your DataFrame
object,
as shown in the following example:
dataFrame.show()
+--------------------+----+-------------+ | _id| age| name| +--------------------+----+-------------+ |[585024d558bef808...| 50|Bilbo Baggins| |[585024d558bef808...|1000| Gandalf| |[585024d558bef808...| 195| Thorin| |[585024d558bef808...| 178| Balin| |[585024d558bef808...| 77| Kíli| |[585024d558bef808...| 169| Dwalin| |[585024d558bef808...| 167| Óin| |[585024d558bef808...| 158| Glóin| |[585024d558bef808...| 82| Fíli| |[585024d558bef808...|null| Bombur| +--------------------+----+-------------+
Specify Known Fields with Schema Hints
You can specify a schema containing known field values to use during
schema inference by specifying the schemaHint
configuration option. You can
specify the schemaHint
option in any of the following Spark formats:
Type | Format | |||
---|---|---|---|---|
DDL | <field one name> <FIELD ONE TYPE>, <field two name> <FIELD TWO TYPE> | |||
SQL DDL | STRUCT<<field one name>: <FIELD ONE TYPE>, <field two name>: <FIELD TWO TYPE> | |||
JSON |
|
The following example shows how to specify the schemaHint
option in each
format by using the Spark shell. The example specifies a string-valued field named
"value"
and an integer-valued field named "count"
.
import org.apache.spark.sql.types._ val mySchema = StructType(Seq( StructField("value", StringType), StructField("count", IntegerType)) // Generate DDL format mySchema.toDDL // Generate SQL DDL format mySchema.sql // Generate Simple String DDL format mySchema.simpleString // Generate JSON format mySchema.json
You can also specify the schemaHint
option in the Simple String DDL format,
or in JSON format by using PySpark, as shown in the following example:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType mySchema = StructType([ StructField('value', StringType(), True), StructField('count', IntegerType(), True)]) # Generate Simple String DDL format mySchema.simpleString() # Generate JSON format mySchema.json()
Filters
When using filters with DataFrames or Datasets, the underlying MongoDB Connector code constructs an aggregation pipeline to filter the data in MongoDB before sending it to Spark. This improves Spark performance by retrieving and processing only the data you need.
MongoDB Spark Connector turns the following filters into aggregation pipeline stages:
And
EqualNullSafe
EqualTo
GreaterThan
GreaterThanOrEqual
In
IsNull
LessThan
LessThanOrEqual
Not
Or
StringContains
StringEndsWith
StringStartsWith
Use filter()
to read a subset of data from your MongoDB collection.
Consider a collection named fruit
that contains the
following documents:
{ "_id" : 1, "type" : "apple", "qty" : 5 } { "_id" : 2, "type" : "orange", "qty" : 10 } { "_id" : 3, "type" : "banana", "qty" : 15 }
First, set up a DataFrame
object to connect with your default MongoDB data
source:
df = spark.read.format("mongodb").load()
The following example includes only
records in which the qty
field is greater than or equal to 10
.
df.filter(df['qty'] >= 10).show()
The operation prints the following output:
+---+----+------+ |_id| qty| type| +---+----+------+ |2.0|10.0|orange| |3.0|15.0|banana| +---+----+------+
When using filters with DataFrames or Datasets, the underlying MongoDB Connector code constructs an aggregation pipeline to filter the data in MongoDB before sending it to Spark. This improves Spark performance by retrieving and processing only the data you need.
MongoDB Spark Connector turns the following filters into aggregation pipeline stages:
And
EqualNullSafe
EqualTo
GreaterThan
GreaterThanOrEqual
In
IsNull
LessThan
LessThanOrEqual
Not
Or
StringContains
StringEndsWith
StringStartsWith
The following example filters and output the characters with ages under 100:
df.filter(df("age") < 100).show()
The operation outputs the following:
+--------------------+---+-------------+ | _id|age| name| +--------------------+---+-------------+ |[5755d7b4566878c9...| 50|Bilbo Baggins| |[5755d7b4566878c9...| 82| Fíli| |[5755d7b4566878c9...| 77| Kíli| +--------------------+---+-------------+
SQL Queries
Before running SQL queries on your Dataset, you must register a temporary view for the Dataset.
The following operation registers a
characters
table and then queries it to find all characters that
are 100 or older:
implicitDS.createOrReplaceTempView("characters"); Dataset<Row> centenarians = spark.sql("SELECT name, age FROM characters WHERE age >= 100"); centenarians.show();
centenarians.show()
outputs the following:
+-------+----+ | name| age| +-------+----+ |Gandalf|1000| | Thorin| 195| | Balin| 178| | Dwalin| 169| | Óin| 167| | Glóin| 158| +-------+----+
Before you can run SQL queries against your DataFrame, you need to register a temporary table.
The following example registers a temporary table called temp
,
then uses SQL to query for records in which the type
field
contains the letter e
:
df.createOrReplaceTempView("temp") some_fruit = spark.sql("SELECT type, qty FROM temp WHERE type LIKE '%e%'") some_fruit.show()
In the pyspark
shell, the operation prints the following output:
+------+----+ | type| qty| +------+----+ | apple| 5.0| |orange|10.0| +------+----+
Before running SQL queries on your Dataset, you must register a temporary view for the Dataset.
The following operation registers a
characters
table and then queries it to find all characters that
are 100 or older:
val characters = spark.read.format("mongodb").as[Character] characters.createOrReplaceTempView("characters") val centenarians = spark.sql("SELECT name, age FROM characters WHERE age >= 100") centenarians.show()
API Documentation
To learn more about the types used in these examples, see the following Apache Spark API documentation: