Docs Menu

Docs HomeMongoDB Spark Connector

Write to MongoDB

When saving RDD data into MongoDB, the data must be convertible to a BSON document. You may need to include a map transformation to convert the data into a Document (or BsonDocument or a DBObject).

The following example creates a 10 document RDD and saves it to the MongoDB collection specified in the SparkConf:

import org.bson.Document
val documents = sc.parallelize((1 to 10).map(i => Document.parse(s"{test: $i}")))
MongoSpark.save(documents) // Uses the SparkConf for configuration

MongoSpark.save() can accept a WriteConfig object which specifies various write configuration settings, such as the collection or the write concern.

For example, the following code saves data to the spark collection with a majority write concern:

import com.mongodb.spark.config._
val writeConfig = WriteConfig(Map("collection" -> "spark", "writeConcern.w" -> "majority"), Some(WriteConfig(sc)))
val sparkDocuments = sc.parallelize((1 to 10).map(i => Document.parse(s"{spark: $i}")))
MongoSpark.save(sparkDocuments, writeConfig)

RDDs have an implicit helper method saveToMongoDB() to write data to MongoDB:

For example, the following uses the documents RDD defined above and uses its saveToMongoDB() method without any arguments to save the documents to the collection specified in the SparkConf:

documents.saveToMongoDB() // Uses the SparkConf for configuration

Call saveToMongoDB() with a WriteConfig object to specify a different MongoDB server address, database and collection. See write configuration settings for available settings:

documents.saveToMongoDB(WriteConfig(Map("uri" -> "mongodb://example.com/database.collection")))
// Uses the WriteConfig

Some Scala types (e.g. Lists) are unsupported and should be converted to their Java equivalent. To convert from Scala into native types include the following import statement to use the .asJava method:

The following operation imports the .asJava method, converts a Scala list to its Java equivalent, and saves it to MongoDB:

import scala.collection.JavaConverters._
import org.bson.Document
val documents = sc.parallelize(
Seq(new Document("fruits", List("apples", "oranges", "pears").asJava))
)
MongoSpark.save(documents)
←  Spark Connector Scala GuideRead From MongoDB →
Share Feedback