流式读取配置选项
Overview
以流式传输模式从 MongoDB 读取数据时,可以配置以下属性。
注意
如果您使用 SparkConf
设置连接器的读取配置,请为每个属性添加前缀 spark.mongodb.read.
。
属性名称 | 说明 | ||
---|---|---|---|
connection.uri | Required. The connection string configuration key. Default: mongodb://localhost:27017/ | ||
database | Required. The database name configuration. | ||
collection | Required. The collection name configuration. You can specify multiple collections by separating the collection names
with a comma. To learn more about specifying multiple collections, see Specifying Multiple Collections in the collection Property. | ||
comment | The comment to append to the read operation. Comments appear in the
output of the Database Profiler. Default: None | ||
mode | The parsing strategy to use when handling documents that don't match the
expected schema. This option accepts the following values:
Default: ReadConfig.ParseMode.FAILFAST | ||
columnNameOfCorruptRecord | If you set the mode option to ReadConfig.ParseMode.PERMISSIVE ,
this option specifies the name of the new column that stores the invalid
document as extended JSON. If you're using an explicit schema, it must
include the name of the new column. If you're
using an inferred schema, the Spark Connector adds the new column to the
end of the schema.Default: None | ||
mongoClientFactory | MongoClientFactory configuration key. You can specify a custom implementation, which must implement the
com.mongodb.spark.sql.connector.connection.MongoClientFactory
interface.Default: com.mongodb.spark.sql.connector.connection.DefaultMongoClientFactory | ||
aggregation.pipeline | Specifies a custom aggregation pipeline to apply to the collection
before sending data to Spark. The value must be either an extended JSON single document or list
of documents. A single document resembles the following:
A list of documents resembles the following:
自定义聚合管道必须与分区器策略兼容。例如,诸如 | ||
aggregation.allowDiskUse | Specifies whether to allow storage to disk when running the
aggregation. Default: true | ||
change.stream. | Change stream configuration prefix. See the
Change Stream Configuration section for more
information about change streams. | ||
outputExtendedJson | When true , the connector converts BSON types not supported by Spark into
extended JSON strings.
When false , the connector uses the original relaxed JSON format for
unsupported types.Default: false | ||
schemaHint | Specifies a partial schema of known field types to use when inferring
the schema for the collection. To learn more about the schemaHint
option, see the Specify Known Fields with Schema Hints section.Default: None |
变更流配置
从 MongoDB 读取变更流时,您可以配置以下属性:
属性名称 | 说明 |
---|---|
change.stream.lookup.full.document | 确定变更流在更新操作中返回的值。 默认设置返回原始文档和更新文档之间的差异。
有关此变更流选项如何工作的更多信息,请参阅 MongoDB 服务器手册指南“查找更新操作的完整文档”。 默认值: "default" |
change.stream.micro.batch.max.partition.count | The maximum number of partitions the Spark Connector divides each
micro-batch into. Spark workers can process these partitions in parallel. This setting applies only when using micro-batch streams. Default: 1 警告:指定大于 |
change.stream.publish.full.document.only | Specifies whether to publish the changed document or the full
change stream document. When this setting is false , you must specify a schema. The schema
must include all fields that you want to read from the change stream. You can
use optional fields to ensure that the schema is valid for all change-stream
events.When this setting is true , the connector exhibits the following behavior:
此设置会覆盖 默认: |
change.stream.startup.mode | Specifies how the connector starts up when no offset is available. This setting accepts the following values:
|
指定属性,在 connection.uri
如果使用 SparkConf 指定了之前的任何设置,可以将其包含在 connection.uri
设置中,也可以单独列出。
以下代码示例显示如何将数据库、集合和读取偏好指定为 connection.uri
设置的一部分:
spark.mongodb.read.connection.uri=mongodb://127.0.0.1/myDB.myCollection?readPreference=primaryPreferred
为了缩短 connection.uri
并使设置更易于阅读,您可以改为单独指定它们:
spark.mongodb.read.connection.uri=mongodb://127.0.0.1/ spark.mongodb.read.database=myDB spark.mongodb.read.collection=myCollection spark.mongodb.read.readPreference.name=primaryPreferred
重要
如果您在 connection.uri
及其自己的行中指定设置,则 connection.uri
设置优先。例如,在以下配置中,连接数据库为 foobar
,因为它是 connection.uri
设置中的值:
spark.mongodb.read.connection.uri=mongodb://127.0.0.1/foobar spark.mongodb.read.database=bar
在 属性中指定多个集合collection
您可以在collection
变更流 配置属性中指定多个集合,用逗号分隔集合名称。 不要在集合之间添加空格,除非空格是集合名称的一部分。
指定多个集合,如下例所示:
... .option("spark.mongodb.collection", "collectionOne,collectionTwo")
如果集合名称为 "*",或者名称包含逗号或反斜杠 (\),则必须按如下方式对字符进行转义:
如果
collection
配置选项中使用的集合名称包含逗号,则 Spark Connector 会将其视为两个不同的集合。 为避免这种情况,必须在逗号前面加上反斜杠 (\) 来对逗号进行转义。 对名为 "my,collection" 的集合进行转义,如下所示:"my\,collection" 如果
collection
配置选项中使用的集合名称为“*”,Spark Connector 会将其解释为扫描所有集合的规范。 为避免这种情况,必须在星号前加上反斜杠 (\) 对星号进行转义。 对名为 "*" 的集合进行转义,如下所示:"\*" 如果
collection
配置选项中使用的集合名称包含反斜杠 (\),则 Spark Connector 会将反斜杠视为转义字符,这可能会改变它对该值的解释方式。 为避免这种情况,必须在反斜杠前面加上另一个反斜杠来对其进行转义。 对名为 "\collection" 的集合进行转义,如下所示:"\\collection" 注意
string在 中将集合名称指定为Java 文字时,必须使用另一个反斜杠进一步转义每个反斜杠。例如,对名为 "\collection" 的集合进行转义,如下所示:
"\\\\collection"
您可以通过将星号 (*) 作为集合名称的string传递,从数据库中的所有集合进行流式传输。
指定所有集合,如下例所示:
... .option("spark.mongodb.collection", "*")
如果您在从所有集合进行流式传输时创建集合,则新集合会自动包含在流中。
在从多个集合进行流式传输时,您可以随时删除集合。
重要
推断具有多个集合的模式
如果将change.stream.publish.full.document.only
选项设置为true
,Spark Connector 将使用扫描文档的模式推断DataFrame
的模式。
模式推断发生在流式传输开始时,并且不考虑流式传输期间创建的集合。
从多个集合进行流式传输并推断模式时, connector会按顺序对每个集合进行采样。 来自大量集合的流式传输可能会导致模式推断的性能明显降低。 这种性能影响仅在推断模式时才会发生。