Batch Read Configuration Options
Overview
You can configure the following properties when reading data from MongoDB in batch mode.
Note
If you use SparkConf
to set the connector's read configurations,
prefix spark.mongodb.read.
to each property.
Property name | Description | ||
---|---|---|---|
connection.uri | Required. The connection string configuration key. Default: mongodb://localhost:27017/ | ||
database | Required. The database name configuration. | ||
collection | Required. The collection name configuration. | ||
comment | The comment to append to the read operation. Comments appear in the
output of the Database Profiler. Default: None | ||
mode | The parsing strategy to use when handling documents that don't match the
expected schema. This option accepts the following values:
Default: ReadConfig.ParseMode.FAILFAST | ||
columnNameOfCorruptRecord | If you set the mode option to ReadConfig.ParseMode.PERMISSIVE ,
this option specifies the name of the new column that stores the invalid
document as extended JSON. If you're using an explicit schema, it must
include the name of the new column. If you're
using an inferred schema, the Spark Connector adds the new column to the
end of the schema.Default: None | ||
mongoClientFactory | MongoClientFactory configuration key. You can specify a custom implementation which must implement the
com.mongodb.spark.sql.connector.connection.MongoClientFactory
interface.Default: com.mongodb.spark.sql.connector.connection.DefaultMongoClientFactory | ||
partitioner | The partitioner full class name. You can specify a custom implementation that must implement the
com.mongodb.spark.sql.connector.read.partitioner.Partitioner
interface.See the
Partitioner Configuration section for more
information about partitioners. Default: com.mongodb.spark.sql.connector.read.partitioner.SamplePartitioner | ||
partitioner.options. | Partitioner configuration prefix. See the
Partitioner Configuration section for more
information about partitioners. | ||
sampleSize | The number of documents to sample from the collection when inferring the schema. Default: 1000 | ||
sql.inferSchema.mapTypes.enabled | Whether to enable Map types when inferring the schema. When enabled, large compatible struct types are inferred to a
MapType instead.Default: true | ||
sql.inferSchema.mapTypes.minimum.key.size | Minimum size of a StructType before inferring as a MapType .Default: 250 | ||
aggregation.pipeline | Specifies a custom aggregation pipeline to apply to the collection
before sending data to Spark. The value must be either an extended JSON single document or list
of documents. A single document resembles the following:
A list of documents resembles the following:
ImportantCustom aggregation pipelines must be compatible with the
partitioner strategy. For example, aggregation stages such as
| ||
aggregation.allowDiskUse | Specifies whether to allow storage to disk when running the
aggregation. Default: true | ||
outputExtendedJson | When true , the connector converts BSON types not supported by Spark into
extended JSON strings.
When false , the connector uses the original relaxed JSON format for
unsupported types.Default: false | ||
schemaHint | Specifies a partial schema of known field types to use when inferring
the schema for the collection. To learn more about the schemaHint
option, see the Specify Known Fields with Schema Hints section.Default: None |
Partitioner Configurations
Partitioners change the read behavior of batch reads that use the Spark Connector. By dividing the data into partitions, you can run transformations in parallel.
This section contains configuration information for the following partitioner:
Note
Batch Reads Only
Because the data-stream-processing engine produces a single data stream, partitioners do not affect streaming reads.
SamplePartitioner
Configuration
SamplePartitioner
is the default partitioner configuration. This configuration
lets you specify a partition field, partition size, and number of samples per partition.
To use this configuration, set the partitioner
configuration option to
com.mongodb.spark.sql.connector.read.partitioner.SamplePartitioner
.
Property name | Description | |
---|---|---|
partitioner.options.partition.field | The field to use for partitioning, which must be a unique field. Default: | |
partitioner.options.partition.size | The size (in MB) for each partition. Smaller partition sizes create more partitions containing fewer documents. Default: | |
partitioner.options.samples.per.partition | The number of samples to take per partition. The total number of samples taken is:
Default: |
Example
For a collection with 640 documents with an average document
size of 0.5 MB, the default SamplePartitioner
configuration creates
5 partitions with 128 documents per partition.
The Spark Connector samples 50 documents (the default 10 per intended partition) and defines 5 partitions by selecting partition field ranges from the sampled documents.
ShardedPartitioner
Configuration
The ShardedPartitioner
configuration automatically partitions the data
based on your shard configuration.
To use this configuration, set the partitioner
configuration option to
com.mongodb.spark.sql.connector.read.partitioner.ShardedPartitioner
.
Important
ShardedPartitioner Restrictions
In MongoDB Server v6.0 and later, the sharding operation creates one large initial chunk to cover all shard key values, making the sharded partitioner inefficient. We do not recommend using the sharded partitioner when connected to MongoDB v6.0 and later.
The sharded partitioner is not compatible with hashed shard keys.
PaginateBySizePartitioner
Configuration
The PaginateBySizePartitioner
configuration paginates the data by using the
average document size to split the collection into average-sized chunks.
To use this configuration, set the partitioner
configuration option to
com.mongodb.spark.sql.connector.read.partitioner.PaginateBySizePartitioner
.
Property name | Description |
---|---|
partitioner.options.partition.field | The field to use for partitioning, which must be a unique field. Default: |
partitioner.options.partition.size | The size (in MB) for each partition. Smaller partition sizes create more partitions containing fewer documents. Default: |
PaginateIntoPartitionsPartitioner
Configuration
The PaginateIntoPartitionsPartitioner
configuration paginates the data by dividing
the count of documents in the collection by the maximum number of allowable partitions.
To use this configuration, set the partitioner
configuration option to
com.mongodb.spark.sql.connector.read.partitioner.PaginateIntoPartitionsPartitioner
.
Property name | Description |
---|---|
partitioner.options.partition.field | The field to use for partitioning, which must be a unique field. Default: |
partitioner.options.max.number.of.partitions | The number of partitions to create. Default: |
SinglePartitionPartitioner
Configuration
The SinglePartitionPartitioner
configuration creates a single partition.
To use this configuration, set the partitioner
configuration option to
com.mongodb.spark.sql.connector.read.partitioner.SinglePartitionPartitioner
.
AutoBucketPartitioner
Configuration
The AutoBucketPartitioner
configuration is similar to the
SamplePartitioner
configuration, but uses the $bucketAuto
aggregation stage to paginate the data. By using this configuration,
you can partition the data across single or multiple fields, including nested fields.
To use this configuration, set the partitioner
configuration option to
com.mongodb.spark.sql.connector.read.partitioner.AutoBucketPartitioner
.
Property name | Description |
---|---|
partitioner.options.partition.fieldList | The list of fields to use for partitioning. The value can be either a single field name or a list of comma-separated fields. Default: |
partitioner.options.partition.chunkSize | The average size (MB) for each partition. Smaller partition sizes create more partitions containing fewer documents. Because this configuration uses the average document size to determine the number of documents per partition, partitions might not be the same size. Default: |
partitioner.options.partition.samplesPerPartition | The number of samples to take per partition. Default: |
partitioner.options.partition.partitionKeyProjectionField | The field name to use for a projected field that contains all the
fields used to partition the collection.
We recommend changing the value of this property only if each document already
contains the Default: |
Specifying Properties in connection.uri
If you use SparkConf to specify any of the previous settings, you can
either include them in the connection.uri
setting or list them individually.
The following code example shows how to specify the
database, collection, and read preference as part of the connection.uri
setting:
spark.mongodb.read.connection.uri=mongodb://127.0.0.1/myDB.myCollection?readPreference=primaryPreferred
To keep the connection.uri
shorter and make the settings easier to read, you can
specify them individually instead:
spark.mongodb.read.connection.uri=mongodb://127.0.0.1/ spark.mongodb.read.database=myDB spark.mongodb.read.collection=myCollection spark.mongodb.read.readPreference.name=primaryPreferred
Important
If you specify a setting in both the connection.uri
and on its own line,
the connection.uri
setting takes precedence.
For example, in the following configuration, the connection
database is foobar
, because it's the value in the connection.uri
setting:
spark.mongodb.read.connection.uri=mongodb://127.0.0.1/foobar spark.mongodb.read.database=bar