Docs Menu

Docs HomeMongoDB Spark Connector

Datasets and SQL

Note

Source Code

For the source code that contains the examples below, see SparkSQL.scala.

This tutorial works either as a self-contained Scala application or as individual commands in the Spark Shell.

Insert the following documents to the characters collection:

package com.mongodb
object SparkSQL {
def main(args: Array[String]): Unit = {
import org.apache.spark.sql.SparkSession
/* For Self-Contained Scala Apps: Create the SparkSession
* CREATED AUTOMATICALLY IN spark-shell */
val sparkSession = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.characters")
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.characters")
.getOrCreate()
import com.mongodb.spark._
import com.mongodb.spark.config._
import org.bson.Document
val docs = """
{"name": "Bilbo Baggins", "age": 50}
{"name": "Gandalf", "age": 1000}
{"name": "Thorin", "age": 195}
{"name": "Balin", "age": 178}
{"name": "Kíli", "age": 77}
{"name": "Dwalin", "age": 169}
{"name": "Óin", "age": 167}
{"name": "Glóin", "age": 158}
{"name": "Fíli", "age": 82}
{"name": "Bombur"}""".trim.stripMargin.split("[\\r\\n]+").toSeq
sparkSession.sparkContext.parallelize(docs.map(Document.parse)).saveToMongoDB()
// Additional operations go here...
}
}

New in Spark 2.0, a DataFrame is represented by a Dataset of Rows and is now an alias of Dataset[Row].

The Mongo Spark Connector provides the com.mongodb.spark.sql.DefaultSource class that creates DataFrames and Datasets from MongoDB. Use the connector's MongoSpark helper to facilitate the creation of a DataFrame:

val df = MongoSpark.load(sparkSession) // Uses the SparkSession
df.printSchema() // Prints DataFrame schema

The operation prints the following:

root
|-- _id: struct (nullable = true)
| |-- oid: string (nullable = true)
|-- age: integer (nullable = true)
|-- name: string (nullable = true)

Note

By default, reading from MongoDB in a SparkSession infers the schema by sampling documents from the database. To explicitly declare a schema, see Explicitly Declare a Schema.

Alternatively, you can use SparkSession methods to create DataFrames:

val df2 = sparkSession.loadFromMongoDB() // SparkSession used for configuration
val df3 = sparkSession.loadFromMongoDB(ReadConfig(
Map("uri" -> "mongodb://example.com/database.collection")
)
) // ReadConfig used for configuration
val df4 = sparkSession.read.mongo() // SparkSession used for configuration
sqlContext.read.format("mongo").load()
// Set custom options
import com.mongodb.spark.config._
val customReadConfig = ReadConfig(Map("readPreference.name" -> "secondaryPreferred"), Some(ReadConfig(sc)))
val df5 = sparkSession.read.mongo(customReadConfig)
val df6 = sparkSession.read.format("mongo").options(customReadConfig.asOptions).load()

Note

When using filters with DataFrames or Spark SQL, the underlying Mongo Connector code constructs an aggregation pipeline to filter the data in MongoDB before sending it to Spark.

The following example filters and output the characters with ages under 100:

df.filter(df("age") < 100).show()

The operation outputs the following:

+--------------------+---+-------------+
| _id|age| name|
+--------------------+---+-------------+
|[5755d7b4566878c9...| 50|Bilbo Baggins|
|[5755d7b4566878c9...| 82| Fíli|
|[5755d7b4566878c9...| 77| Kíli|
+--------------------+---+-------------+

By default, reading from MongoDB in a SparkSession infers the schema by sampling documents from the collection. You can also use a case class to define the schema explicitly, thus removing the extra queries needed for sampling.

Note

If you provide a case class for the schema, MongoDB returns only the declared fields. This helps minimize the data sent across the wire.

The following statement creates a Character case class and then uses it to define the schema for the DataFrame:

case class Character(name: String, age: Int)

Important

For self-contained Scala applications, the Character class should be defined outside of the method using the class.

val explicitDF = MongoSpark.load[Character](sparkSession)
explicitDF.printSchema()

The operation prints the following output:

root
|-- name: string (nullable = true)
|-- age: integer (nullable = false)

You can use the case class when converting the DataFrame to a Dataset as in the following example:

val dataset = explicitDF.as[Character]

The MongoRDD class provides helpers to convert an RDD to DataFrames and Datasets. The following example passes a SparkContext object to the MongoSpark.load() which returns an RDD, then converts it:

// Passing the SparkContext to load returns a RDD, not DF or DS
val rdd = MongoSpark.load(sparkSession.sparkContext)
val dfInferredSchema = rdd.toDF()
val dfExplicitSchema = rdd.toDF[Character]()
val ds = rdd.toDS[Character]()

Before running SQL queries on your dataset, you must register a temporary view for the dataset.

The following operation registers a characters table and then queries it to find all characters that are 100 or older:

val characters = MongoSpark.load[Character](sparkSession)
characters.createOrReplaceTempView("characters")
val centenarians = sparkSession.sql("SELECT name, age FROM characters WHERE age >= 100")
centenarians.show()

The MongoDB Spark Connector provides the ability to persist DataFrames to a collection in MongoDB.

The following example uses MongoSpark.save(DataFrameWriter) method to save the centenarians into the hundredClub collection in MongoDB and to verify the save, reads from the hundredClub collection:

MongoSpark.save(centenarians.write.option("collection", "hundredClub").mode("overwrite"))
println("Reading from the 'hundredClub' collection:")
MongoSpark.load[Character](sparkSession, ReadConfig(Map("collection" -> "hundredClub"), Some(ReadConfig(sparkSession)))).show()

The DataFrameWriter includes the .mode("overwrite") to drop the hundredClub collection before writing the results, if the collection already exists.

In the Spark Shell, the operation prints the following output:

+-------+----+
| name| age|
+-------+----+
|Gandalf|1000|
| Thorin| 195|
| Balin| 178|
| Dwalin| 169|
| Óin| 167|
| Glóin| 158|
+-------+----+

MongoSpark.save(dataFrameWriter) is shorthand for configuring and saving via the DataFrameWriter. The following examples write DataFrames to MongoDB using the DataFrameWriter directly:

centenarians.write.option("collection", "hundredClub").mode("overwrite").mongo()
centenarians.write.option("collection", "hundredClub").mode("overwrite").format("mongo").save()

Spark supports a limited number of data types to ensure that all BSON types can be round tripped in and out of Spark DataFrames/Datasets. The Spark Connector creates custom StructTypes for any unsupported BSON types.

The following table shows the mapping between the BSON Types and Spark Types:

BSON Type
Spark Type
Document
StructType
Array
ArrayType
32-bit integer
Integer
64-bit integer
Long
Binary data
Array[Byte] or StructType: { subType: Byte, data: Array[Byte]}
Boolean
Boolean
Date
java.sql.Timestamp
DBPointer
StructType: { ref: String , oid: String}
Double
Double
JavaScript
StructType: { code: String }
JavaScript with scope
StructType: { code: String , scope: String }
Max key
StructType: { maxKey: Integer }
Min key
StructType: { minKey: Integer }
Null
null
ObjectId
StructType: { oid: String }
Regular Expression
StructType: { regex: String , options: String }
String
String
Symbol
StructType: { symbol: String }
Timestamp
StructType: { time: Integer , inc: Integer }
Undefined
StructType: { undefined: Boolean }

To help better support Datasets, the following Scala case classes ( com.mongodb.spark.sql.fieldTypes) and JavaBean classes ( com.mongodb.spark.sql.fieldTypes.api.java.) have been created to represent the unsupported BSON Types:

BSON Type
Scala case class
JavaBean
Binary data
Binary
Binary
DBPointer
DBPointer
DBPointer
JavaScript
JavaScript
JavaScript
JavaScript with scope
JavaScriptWithScope
JavaScriptWithScope
Max key
MaxKey
MaxKey
Min key
MinKey
MinKey
ObjectId
ObjectId
ObjectId
Regular Expression
RegularExpression
RegularExpression
Symbol
Symbol
Symbol
Timestamp
Timestamp
Timestamp
Undefined
Undefined
Undefined

For convenience, all BSON Types can be represented as a String value as well. However, these values lose all their original type information and, if saved back to MongoDB, are stored as a Strings.

←  Filters and AggregationSpark Streaming →
Share Feedback