Batch Read Configuration Options
Overview
You can configure the following properties when reading data from MongoDB in batch mode.
Note
If you use SparkConf
to set the connector's read configurations,
prefix spark.mongodb.read.
to each property.
Property name | Description | ||
---|---|---|---|
connection.uri | Required. The connection string configuration key. Default: mongodb://localhost:27017/ | ||
database | Required. The database name configuration. | ||
collection | Required. The collection name configuration. | ||
mongoClientFactory | MongoClientFactory configuration key. You can specify a custom implementation, which must implement the
com.mongodb.spark.sql.connector.connection.MongoClientFactory
interface.Default: com.mongodb.spark.sql.connector.connection.DefaultMongoClientFactory | ||
partitioner | The partitioner full class name. You can specify a custom implementation that must implement the
com.mongodb.spark.sql.connector.read.partitioner.Partitioner
interface.See the
Partitioner Configuration section for more
information about partitioners. Default: com.mongodb.spark.sql.connector.read.partitioner.SamplePartitioner | ||
partitioner.options. | Partitioner configuration prefix. See the
Partitioner Configuration section for more
information about partitioners. | ||
sampleSize | The number of documents to sample from the collection when inferring the schema. Default: 1000 | ||
sql.inferSchema.mapTypes.enabled | Whether to enable Map types when inferring the schema. When enabled, large compatible struct types are inferred to a
MapType instead.Default: true | ||
sql.inferSchema.mapTypes.minimum.key.size | Minimum size of a StructType before inferring as a MapType .Default: 250 | ||
aggregation.pipeline | Specifies a custom aggregation pipeline to apply to the collection
before sending data to Spark. The value must be either an extended JSON single document or list
of documents. A single document resembles the following:
A list of documents resembles the following:
ImportantCustom aggregation pipelines must be compatible with the
partitioner strategy. For example, aggregation stages such as
| ||
aggregation.allowDiskUse | Specifies whether to allow storage to disk when running the
aggregation. Default: true |
Partitioner Configurations
Partitioners change the read behavior of batch reads that use the Spark Connector. By dividing the data into partitions, you can run transformations in parallel.
This section contains configuration information for the following partitioners:
Note
Batch Reads Only
Because the data-stream-processing engine produces a single data stream, partitioners do not affect streaming reads.
SamplePartitioner
Configuration
SamplePartitioner
is the default partitioner configuration. This configuration
lets you specify a partition field, partition size, and number of samples per partition.
To use this configuration, set the partitioner
configuration option to
com.mongodb.spark.sql.connector.read.partitioner.SamplePartitioner
.
Property name | Description | |
---|---|---|
partitioner.options.partition.field | The field to use for partitioning, which must be a unique field. Default: | |
partitioner.options.partition.size | The size (in MB) for each partition. Smaller partition sizes create more partitions containing fewer documents. Default: | |
partitioner.options.samples.per.partition | The number of samples to take per partition. The total number of samples taken is:
Default: |
Example
For a collection with 640 documents with an average document
size of 0.5 MB, the default SamplePartitioner
configuration creates
5 partitions with 128 documents per partition.
The Spark Connector samples 50 documents (the default 10 per intended partition) and defines 5 partitions by selecting partition field ranges from the sampled documents.
ShardedPartitioner
Configuration
The ShardedPartitioner
configuration automatically partitions the data
based on your shard configuration.
To use this configuration, set the partitioner
configuration option to
com.mongodb.spark.sql.connector.read.partitioner.ShardedPartitioner
.
Important
ShardedPartitioner Restrictions
In MongoDB Server v6.0 and later, the sharding operation creates one large initial chunk to cover all shard key values, making the sharded partitioner inefficient. We do not recommend using the sharded partitioner when connected to MongoDB v6.0 and later.
The sharded partitioner is not compatible with hashed shard keys.
PaginateBySizePartitioner
Configuration
The PaginateBySizePartitioner
configuration paginates the data by using the
average document size to split the collection into average-sized chunks.
To use this configuration, set the partitioner
configuration option to
com.mongodb.spark.sql.connector.read.partitioner.PaginateBySizePartitioner
.
Property name | Description |
---|---|
partitioner.options.partition.field | The field to use for partitioning, which must be a unique field. Default: |
partitioner.options.partition.size | The size (in MB) for each partition. Smaller partition sizes create more partitions containing fewer documents. Default: |
PaginateIntoPartitionsPartitioner
Configuration
The PaginateIntoPartitionsPartitioner
configuration paginates the data by dividing
the count of documents in the collection by the maximum number of allowable partitions.
To use this configuration, set the partitioner
configuration option to
com.mongodb.spark.sql.connector.read.partitioner.PaginateIntoPartitionsPartitioner
.
Property name | Description |
---|---|
partitioner.options.partition.field | The field to use for partitioning, which must be a unique field. Default: |
partitioner.options.max.number.of.partitions | The number of partitions to create. Default: |
SinglePartitionPartitioner
Configuration
The SinglePartitionPartitioner
configuration creates a single partition.
To use this configuration, set the partitioner
configuration option to
com.mongodb.spark.sql.connector.read.partitioner.SinglePartitionPartitioner
.
Specifying Properties in connection.uri
If you use SparkConf to specify any of the previous settings, you can
either include them in the connection.uri
setting or list them individually.
The following code example shows how to specify the
database, collection, and read preference as part of the connection.uri
setting:
spark.mongodb.read.connection.uri=mongodb://127.0.0.1/myDB.myCollection?readPreference=primaryPreferred
To keep the connection.uri
shorter and make the settings easier to read, you can
specify them individually instead:
spark.mongodb.read.connection.uri=mongodb://127.0.0.1/ spark.mongodb.read.database=myDB spark.mongodb.read.collection=myCollection spark.mongodb.read.readPreference.name=primaryPreferred
Important
If you specify a setting in both the connection.uri
and on its own line,
the connection.uri
setting takes precedence.
For example, in the following configuration, the connection
database is foobar
, because it's the value in the connection.uri
setting:
spark.mongodb.read.connection.uri=mongodb://127.0.0.1/foobar spark.mongodb.read.database=bar