Docs Menu

Docs HomeMongoDB Spark Connector

Configuration Options

On this page

  • Specify Configuration
  • Input Configuration
  • Output Configuration
  • Cache Configuration

Various configuration options are available for the MongoDB Spark Connector.

You can specify these options via SparkConf using the --conf setting or the $SPARK_HOME/conf/spark-default.conf file, and MongoDB Spark Connector will use the settings in SparkConf as the defaults.

Important

When setting configurations via SparkConf, you must prefix the configuration options. Refer to the configuration sections for the specific prefix.

Various methods in the MongoDB Connector API accept an optional ReadConfig or a WriteConfig object. ReadConfig and WriteConfig settings override any corresponding settings in SparkConf.

For examples, see Using a ReadConfig and Using a WriteConfig. For more details, refer to the source for these methods.

In the Spark API, some methods (e.g. DataFrameReader and DataFrameWriter) accept options in the form of a Map[String, String].

You can convert custom ReadConfig or WriteConfig settings into a Map via the asOptions() method.

The connector provides a cache for MongoClients which can only be configured via the System Property. See Cache Configuration.

You can configure the following properties to read from MongoDB:

Note

If you use SparkConf to set the connector's input configurations, prefix spark.mongodb.input. to each property.

Property name
Description
uri

Required. The connection string in the form mongodb://host:port/. The host can be a hostname, IP address, or UNIX domain socket. If the connection string doesn't specify a port, it uses the default MongoDB port, 27017.

You can append the other remaining input options to the uri setting. See uri Configuration Setting.

database
Required. The database name to read data from.
collection
Required. The collection name to read data from.
localThreshold

The time in milliseconds to choose among multiple MongoDB servers to send a request.

Default: 15

readPreference.name

The name of the Read Preference mode to use.

Default: Primary

readPreference.tagSets
The ReadPreference TagSets to use.
readConcern.level
The Read Concern level to use.
sampleSize

The sample size to use when analyzing the schema.

Default: 1000

samplePoolSize

The size of the pool to sample from when analyzing the schema.

Default: 10000

partitioner

The name of the partitioner to use to split collection data into partitions. Partitions are based on a range of values of a field (e.g. _ids 1 to 100).

The connector provides the following partitioners:

  • MongoDefaultPartitioner
    Default. Wraps the MongoSamplePartitioner and provides help for users of older versions of MongoDB.
  • MongoSamplePartitioner
    Requires MongoDB 3.2. A general purpose partitioner for all deployments. Uses the average document size and random sampling of the collection to determine suitable partitions for the collection. For configuration settings for the MongoSamplePartitioner, see MongoSamplePartitioner Configuration.
  • MongoShardedPartitioner
    A partitioner for sharded clusters. Partitions the collection based on the data chunks. Requires read access to the config database. For configuration settings for the MongoShardedPartitioner, see MongoShardedPartitioner Configuration.
  • MongoSplitVectorPartitioner
    A partitioner for standalone or replica sets. Uses the splitVector command on the standalone or the primary to determine the partitions of the database. Requires privileges to run splitVector command. For configuration settings for the MongoSplitVectorPartitioner, see MongoSplitVectorPartitioner Configuration.
  • MongoPaginateByCountPartitioner
    A slow, general purpose partitioner for all deployments. Creates a specific number of partitions. Requires a query for every partition. For configuration settings for the MongoPaginateByCountPartitioner, see MongoPaginateByCountPartitioner Configuration.
  • MongoPaginateBySizePartitioner
    A slow, general purpose partitioner for all deployments. Creates partitions based on data size. Requires a query for every partition. For configuration settings for the MongoPaginateBySizePartitioner, see MongoPaginateBySizePartitioner Configuration.

You can also specify a custom partitioner implementation. For custom implementations of the MongoPartitioner trait, provide the full class name. If you don't provide package names, then this property uses the default package, com.mongodb.spark.rdd.partitioner.

To configure options for the various partitioners, see Partitioner Configuration.

Default: MongoDefaultPartitioner

partitionerOptions
The custom options to configure the partitioner.
registerSQLHelperFunctions

Registers SQL helper functions to allow easy querying of BSON types inside SQL queries.

Default: false

sql.inferschema.mapTypes.enabled

Enables you to analyze MapType and StructType in the data's schema.

Default: true

sql.inferschema.mapTypes.minimumKeys

The minimum number of keys a StructType needs for the connector to detect it as a MapType.

Default: 250

sql.pipeline.includeNullFilters
Includes null filters in the aggregation pipeline.
sql.pipeline.includeFiltersAndProjections
Includes any filters and projections in the aggregation pipeline.
pipeline
Enables you to apply custom aggregation pipelines to the collection before sending it to Spark.
hint
The JSON representation of the hint to use in the aggregation.
collation
The JSON representation of a collation to use in the aggregation. The connector creates this with Collation.asDocument.toJson.
allowDiskUse
Enables writing to temporary files during aggregation.
batchSize
The size of the internal batches within the cursor.

Note

If you use SparkConf to set the connector's input configurations, prefix spark.mongodb.input.partitionerOptions. to each property.

Property name
Description
partitionKey

The field by which to split the collection data. The field should be indexed and contain unique values.

Default: _id

partitionSizeMB

The size (in MB) for each partition. Smaller partition sizes create more partitions containing fewer documents.

Default: 64

samplesPerPartition

The number of sample documents to take for each partition in order to establish a partitionKey range for each partition.

A greater number of samplesPerPartition helps to find partitionKey ranges that more closely match the partitionSizeMB you specify.

Note

For sampling to improve performance, samplesPerPartition must be fewer than the number of documents within each of your partitions.

You can estimate the number of documents within each of your partitions by dividing your partitionSizeMB by the average document size (in MB) in your collection.

Default: 10

Example

For a collection with 640 documents with an average document size of 0.5 MB, the default MongoSamplePartitioner configuration values creates 5 partitions with 128 documents per partition.

The MongoDB Spark Connector samples 50 documents (the default 10 per intended partition) and defines 5 partitions by selecting partitionKey ranges from the sampled documents.

Note

If you use SparkConf to set the connector's input configurations, prefix spark.mongodb.input.partitionerOptions. to each property.

Property name
Description
shardkey

The field by which to split the collection data. The field should be indexed.

Default: _id

Important

This property is not compatible with hashed shard keys.

Note

If you use SparkConf to set the connector's input configurations, prefix spark.mongodb.input.partitionerOptions. to each property.

Property name
Description
partitionKey

The field by which to split the collection data. The field should be indexed and contain unique values.

Default: _id

partitionSizeMB

The size (in MB) for each partition. Smaller partition sizes create more partitions containing fewer documents.

Default: 64

Note

If you use SparkConf to set the connector's input configurations, prefix spark.mongodb.input.partitionerOptions. to each property.

Property name
Description
partitionKey

The field by which to split the collection data. The field should be indexed and contain unique values.

Default: _id

numberOfPartitions

The number of partitions to create. A greater number of partitions means fewer documents per partition.

Default: 64

Note

If you use SparkConf to set the connector's input configurations, prefix spark.mongodb.input.partitionerOptions. to each property.

Property name
Description
partitionKey

The field by which to split the collection data. The field should be indexed and contain unique values.

Default: _id

partitionSizeMB

The size (in MB) for each partition. Smaller partition sizes create more partitions containing fewer documents.

Default: 64

You can set all Input Configuration via the input uri setting.

For example, consider the following example which sets the input uri setting via SparkConf:

Note

If you use SparkConf to set the connector's input configurations, prefix spark.mongodb.input. to the setting.

spark.mongodb.input.uri=mongodb://127.0.0.1/databaseName.collectionName?readPreference=primaryPreferred

The configuration corresponds to the following separate configuration settings:

spark.mongodb.input.uri=mongodb://127.0.0.1/
spark.mongodb.input.database=databaseName
spark.mongodb.input.collection=collectionName
spark.mongodb.input.readPreference.name=primaryPreferred

If you specify a setting both in the uri and in a separate configuration, the uri setting overrides the separate setting. For example, given the following configuration, the input database for the connection is foobar:

spark.mongodb.input.uri=mongodb://127.0.0.1/foobar
spark.mongodb.input.database=bar

The following options for writing to MongoDB are available:

Note

If you use SparkConf to set the connector's output configurations, prefix spark.mongodb.output. to each property.

Property name
Description
uri

Required. The connection string in the form mongodb://host:port/. The host can be a hostname, IP address, or UNIX domain socket. If the connection string doesn't specify a port, it uses the default MongoDB port, 27017.

Note

The other remaining options may be appended to the uri setting. See uri Configuration Setting.

database
Required. The database name to write data.
collection
Required. The collection name to write data to
extendedBsonTypes

Enables extended BSON types when writing data to MongoDB.

Default: true

localThreshold

The threshold (milliseconds) for choosing a server from multiple MongoDB servers.

Default: 15

replaceDocument

Replace the whole document when saving Datasets that contain an _id field. If false it will only update the fields in the document that match the fields in the Dataset.

Default: true

maxBatchSize

The maximum batch size for bulk operations when saving data.

Default: 512

writeConcern.w

The write concern w value.

Default: w: 1

writeConcern.journal
The write concern journal value.
writeConcern.wTimeoutMS
The write concern wTimeout value.
shardKey

The field by which to split the collection data. The field should be indexed and contain unique values.

Default: _id

forceInsert

Forces saves to use inserts, even if a Dataset contains _id.

Default: false

ordered

Sets the bulk operations ordered property.

Default: true

You can set all Output Configuration via the output uri.

For example, consider the following example which sets the input uri setting via SparkConf:

Note

If you use SparkConf to set the connector's output configurations, prefix spark.mongodb.output. to the setting.

spark.mongodb.output.uri=mongodb://127.0.0.1/test.myCollection

The configuration corresponds to the following separate configuration settings:

spark.mongodb.output.uri=mongodb://127.0.0.1/
spark.mongodb.output.database=test
spark.mongodb.output.collection=myCollection

If you specify a setting both in the uri and in a separate configuration, the uri setting overrides the separate setting. For example, given the following configuration, the output database for the connection is foobar:

spark.mongodb.output.uri=mongodb://127.0.0.1/foobar
spark.mongodb.output.database=bar

The MongoConnector includes a cache for MongoClients, so workers can share the MongoClient across threads.

Important

As the cache is setup before the Spark Configuration is available, the cache can only be configured via a System Property.

System Property name
Description
mongodb.keep_alive_ms

The length of time to keep a MongoClient available for sharing.

Default: 5000

←  MongoDB Connector for SparkSpark Connector Scala Guide →
Share Feedback