Docs Home → MongoDB Spark Connector
FAQ
How can I achieve data locality?
For any MongoDB deployment, the Mongo Spark Connector sets the preferred location for an RDD to be where the data is:
For a non sharded system, it sets the preferred location to be the hostname(s) of the standalone or the replica set.
For a sharded system, it sets the preferred location to be the hostname(s) of the shards.
To promote data locality,
Ensure there is a Spark Worker on one of the hosts for non-sharded system or one per shard for sharded systems.
Use a
nearest
read preference to read from the localmongod
.For a sharded cluster, you should have a
mongos
on the same nodes and use localThreshold configuration to connect to the nearestmongos
. To partition the data by shard use theMongoShardedPartitioner
Configuration.
How do I interact with Spark Streams?
Spark streams can be considered as a potentially infinite source of RDDs. Therefore, anything you can do with an RDD, you can do with the results of a Spark Stream.
For an example, see SparkStreams.scala
How do I resolve Unrecognized pipeline stage name
Error?
In MongoDB deployments with mixed versions of mongod
, it is
possible to get an Unrecognized pipeline stage name: '$sample'
error. To mitigate this situation, explicitly configure the partitioner
to use and define the Schema when using DataFrames.
How do I use MongoDB BSON types that are unsupported in Spark?
Some custom MongoDB BSON types, such as ObjectId
, are unsupported
in Spark.
The MongoDB Spark Connector converts custom MongoDB data types to and from extended JSON-like representations of those data types that are compatible with Spark. See DataTypes for a list of custom MongoDB types and their Spark counterparts.
Spark Datasets
To create a standard Dataset with custom MongoDB data types, use
fieldTypes
helpers:
import com.mongodb.spark.sql.fieldTypes case class MyData(id: fieldTypes.ObjectId, a: Int) val ds = spark.createDataset(Seq(MyData(fieldTypes.ObjectId(new ObjectId()), 99))) ds.show()
The preceding example creates a Dataset containing the following fields and data types:
The
id
field is a custom MongoDB BSON type,ObjectId
, defined byfieldTypes.ObjectId
.The
a
field is anInt
, a data type available in Spark.
Spark DataFrames
To create a DataFrame with custom MongoDB data types, you must supply those types when you create the RDD and schema:
Create RDDs using custom MongoDB BSON types (e.g.
ObjectId
). The Spark Connector handles converting those custom types into Spark-compatible data types.Declare schemas using the
StructFields
helpers for data types that are not natively supported by Spark (e.g.StructFields.objectId
). Refer to DataTypes for the mapping between BSON and custom MongoDB Spark types.
import org.apache.spark.sql.Row import org.apache.spark.sql.types.{StructType, StructField, IntegerType} import com.mongodb.spark.sql.helpers.StructFields val data = Seq(Row(Row(new ObjectId().toHexString()), 99)) val rdd = spark.sparkContext.parallelize(data) val schema = StructType(List(StructFields.objectId("id", true), StructField("a", IntegerType, true))) val df = spark.createDataFrame(rdd, schema) df.show()
The preceding example creates a DataFrame containing the following fields and data types:
The
id
field is a custom MongoDB BSON type,ObjectId
, defined byStructFields.objectId
.The
a
field is anInt
, a data type available in Spark.