Streaming Read Configuration Options
On this page
Overview
You can configure the following properties when reading data from MongoDB in streaming mode.
Note
If you use SparkConf
to set the connector's read configurations,
prefix spark.mongodb.read.
to each property.
Property name | Description | ||
---|---|---|---|
connection.uri | Required. The connection string configuration key. Default: mongodb://localhost:27017/ | ||
database | Required. The database name configuration. | ||
collection | Required. The collection name configuration. You can specify multiple collections by separating the collection names
with a comma. To learn more about specifying multiple collections, see Specifying Multiple Collections in the collection Property. | ||
comment | The comment to append to the read operation. Comments appear in the
output of the Database Profiler. Default: None | ||
mode | The parsing strategy to use when handling documents that don't match the
expected schema. This option accepts the following values:
Default: ReadConfig.ParseMode.FAILFAST | ||
columnNameOfCorruptRecord | If you set the mode option to ReadConfig.ParseMode.PERMISSIVE ,
this option specifies the name of the new column that stores the invalid
document as extended JSON. If you're using an explicit schema, it must
include the name of the new column. If you're
using an inferred schema, the Spark Connector adds the new column to the
end of the schema.Default: None | ||
mongoClientFactory | MongoClientFactory configuration key. You can specify a custom implementation, which must implement the
com.mongodb.spark.sql.connector.connection.MongoClientFactory
interface.Default: com.mongodb.spark.sql.connector.connection.DefaultMongoClientFactory | ||
aggregation.pipeline | Specifies a custom aggregation pipeline to apply to the collection
before sending data to Spark. The value must be either an extended JSON single document or list
of documents. A single document resembles the following:
A list of documents resembles the following:
Custom aggregation pipelines must be compatible with the
partitioner strategy. For example, aggregation stages such as
| ||
aggregation.allowDiskUse | Specifies whether to allow storage to disk when running the
aggregation. Default: true | ||
change.stream. | Change stream configuration prefix. See the
Change Stream Configuration section for more
information about change streams. | ||
outputExtendedJson | When true , the connector converts BSON types not supported by Spark into
extended JSON strings.
When false , the connector uses the original relaxed JSON format for
unsupported types.Default: false | ||
schemaHint | Specifies a partial schema of known field types to use when inferring
the schema for the collection. To learn more about the schemaHint
option, see the Specify Known Fields with Schema Hints section.Default: None |
Change Stream Configuration
You can configure the following properties when reading a change stream from MongoDB:
Property name | Description |
---|---|
change.stream.lookup.full.document | Determines what values your change stream returns on update operations. The default setting returns the differences between the original document and the updated document. The For more information on how this change stream option works, see the MongoDB server manual guide Lookup Full Document for Update Operation. Default: "default" |
change.stream.micro.batch.max.partition.count | The maximum number of partitions the Spark Connector divides each
micro-batch into. Spark workers can process these partitions in parallel. This setting applies only when using micro-batch streams. Default: 1 WARNING: Specifying a value larger than |
change.stream.publish.full.document.only | Specifies whether to publish the changed document or the full
change stream document. When this setting is false , you must specify a schema. The schema
must include all fields that you want to read from the change stream. You can
use optional fields to ensure that the schema is valid for all change-stream
events.When this setting is true , the connector exhibits the following behavior:
This setting overrides the Default: |
change.stream.startup.mode | Specifies how the connector starts up when no offset is available. This setting accepts the following values:
|
Specifying Properties in connection.uri
If you use SparkConf to specify any of the previous settings, you can
either include them in the connection.uri
setting or list them individually.
The following code example shows how to specify the
database, collection, and read preference as part of the connection.uri
setting:
spark.mongodb.read.connection.uri=mongodb://127.0.0.1/myDB.myCollection?readPreference=primaryPreferred
To keep the connection.uri
shorter and make the settings easier to read, you can
specify them individually instead:
spark.mongodb.read.connection.uri=mongodb://127.0.0.1/ spark.mongodb.read.database=myDB spark.mongodb.read.collection=myCollection spark.mongodb.read.readPreference.name=primaryPreferred
Important
If you specify a setting in both the connection.uri
and on its own line,
the connection.uri
setting takes precedence.
For example, in the following configuration, the connection
database is foobar
, because it's the value in the connection.uri
setting:
spark.mongodb.read.connection.uri=mongodb://127.0.0.1/foobar spark.mongodb.read.database=bar
Specifying Multiple Collections in the collection
Property
You can specify multiple collections in the collection
change stream
configuration property by separating the collection names
with a comma. Do not add a space between the collections unless the space is a
part of the collection name.
Specify multiple collections as shown in the following example:
... .option("spark.mongodb.collection", "collectionOne,collectionTwo")
If a collection name is "*", or if the name includes a comma or a backslash (\), you must escape the character as follows:
If the name of a collection used in your
collection
configuration option contains a comma, the Spark Connector treats it as two different collections. To avoid this, you must escape the comma by preceding it with a backslash (\). Escape a collection named "my,collection" as follows:"my\,collection" If the name of a collection used in your
collection
configuration option is "*", the Spark Connector interprets it as a specification to scan all collections. To avoid this, you must escape the asterisk by preceding it with a backslash (\). Escape a collection named "*" as follows:"\*" If the name of a collection used in your
collection
configuration option contains a backslash (\), the Spark Connector treats the backslash as an escape character, which might change how it interprets the value. To avoid this, you must escape the backslash by preceding it with another backslash. Escape a collection named "\collection" as follows:"\\collection" Note
When specifying the collection name as a string literal in Java, you must further escape each backslash with another one. For example, escape a collection named "\collection" as follows:
"\\\\collection"
You can stream from all collections in the database by passing an asterisk (*) as a string for the collection name.
Specify all collections as shown in the following example:
... .option("spark.mongodb.collection", "*")
If you create a collection while streaming from all collections, the new collection is automatically included in the stream.
You can drop collections at any time while streaming from multiple collections.
Important
Inferring the Schema with Multiple Collections
If you set the change.stream.publish.full.document.only
option to true
, the Spark Connector infers the schema of a DataFrame
by using the schema of the scanned documents.
Schema inference happens at the beginning of streaming, and does not take into account collections that are created during streaming.
When streaming from multiple collections and inferring the schema, the connector samples each collection sequentially. Streaming from a large number of collections can cause the schema inference to have noticeably slower performance. This performance impact occurs only while inferring the schema.