Docs 菜单
Docs 主页
/
Spark Connector
/ /

流式读取配置选项

在此页面上

  • Overview
  • 变更流配置
  • 指定属性,在 connection.uri
  • collection属性中指定多个集合

以流式传输模式从 MongoDB 读取数据时,可以配置以下属性。

注意

如果您使用 SparkConf 设置连接器的读取配置,请为每个属性添加前缀 spark.mongodb.read.

属性名称
说明
connection.uri
Required.
The connection string configuration key.

Default: mongodb://localhost:27017/
database
Required.
The database name configuration.
collection
Required.
The collection name configuration.
You can specify multiple collections by separating the collection names with a comma.

To learn more about specifying multiple collections, see Specifying Multiple Collections in the collection Property.
comment
The comment to append to the read operation. Comments appear in the output of the Database Profiler.

Default: None
mode
The parsing strategy to use when handling documents that don't match the expected schema. This option accepts the following values:
  • ReadConfig.ParseMode.FAILFAST:在解析与模式不匹配的文档时引发异常。

  • ReadConfig.ParseMode.PERMISSIVE:当数据类型与模式不匹配时,将字段设置为null 。 要将每个无效文档存储为扩展JSON string ,请将此值与 columnNameOfCorruptRecord 选项结合使用。

  • ReadConfig.ParseMode.DROPMALFORMED:忽略任何与模式不匹配的文档。


Default: ReadConfig.ParseMode.FAILFAST
columnNameOfCorruptRecord
If you set the mode option to ReadConfig.ParseMode.PERMISSIVE, this option specifies the name of the new column that stores the invalid document as extended JSON. If you're using an explicit schema, it must include the name of the new column. If you're using an inferred schema, the Spark Connector adds the new column to the end of the schema.

Default: None
mongoClientFactory
MongoClientFactory configuration key.
You can specify a custom implementation, which must implement the com.mongodb.spark.sql.connector.connection.MongoClientFactory interface.

Default: com.mongodb.spark.sql.connector.connection.DefaultMongoClientFactory
aggregation.pipeline
Specifies a custom aggregation pipeline to apply to the collection before sending data to Spark.
The value must be either an extended JSON single document or list of documents.
A single document resembles the following:
{"$match": {"closed": false}}
A list of documents resembles the following:
[{"$match": {"closed": false}}, {"$project": {"status": 1, "name": 1, "description": 1}}]

自定义聚合管道必须与分区器策略兼容。例如,诸如 $group 之类的聚合阶段不适用于创建多个分区的任何分区器。

aggregation.allowDiskUse
Specifies whether to allow storage to disk when running the aggregation.

Default: true
change.stream.
Change stream configuration prefix.
See the Change Stream Configuration section for more information about change streams.
outputExtendedJson
When true, the connector converts BSON types not supported by Spark into extended JSON strings. When false, the connector uses the original relaxed JSON format for unsupported types.

Default: false
schemaHint
Specifies a partial schema of known field types to use when inferring the schema for the collection. To learn more about the schemaHint option, see the Specify Known Fields with Schema Hints section.

Default: None

从 MongoDB 读取变更流时,您可以配置以下属性:

属性名称
说明
change.stream.lookup.full.document

确定变更流在更新操作中返回的值。

默认设置返回原始文档和更新文档之间的差异。

updateLookup设置还返回原始文档和更新文档之间的差异,但它也包括整个更新文档的副本。

有关此变更流选项如何工作的更多信息,请参阅 MongoDB 服务器手册指南“查找更新操作的完整文档”。

默认值: "default"

change.stream.micro.batch.max.partition.count
The maximum number of partitions the Spark Connector divides each micro-batch into. Spark workers can process these partitions in parallel.

This setting applies only when using micro-batch streams.

Default: 1

警告:指定大于1的值可能会改变Spark Connector处理更改事件的顺序。 如果无序处理可能会导致下游数据不一致,请避免此设置。

change.stream.publish.full.document.only
Specifies whether to publish the changed document or the full change stream document.

When this setting is false, you must specify a schema. The schema must include all fields that you want to read from the change stream. You can use optional fields to ensure that the schema is valid for all change-stream events.

When this setting is true, the connector exhibits the following behavior:
  • connector会过滤掉省略 fullDocument 字段的消息,并仅发布该字段的值。

  • 如果不指定模式, connector将从变更流文档推断模式。

此设置会覆盖change.stream.lookup.full.document设置。

默认false

change.stream.startup.mode
Specifies how the connector starts up when no offset is available.
This setting accepts the following values:

如果使用 SparkConf 指定了之前的任何设置,可以将其包含在 connection.uri 设置中,也可以单独列出。

以下代码示例显示如何将数据库、集合和读取偏好指定为 connection.uri 设置的一部分:

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/myDB.myCollection?readPreference=primaryPreferred

为了缩短 connection.uri 并使设置更易于阅读,您可以改为单独指定它们:

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/
spark.mongodb.read.database=myDB
spark.mongodb.read.collection=myCollection
spark.mongodb.read.readPreference.name=primaryPreferred

重要

如果您在 connection.uri 及其自己的行中指定设置,则 connection.uri 设置优先。例如,在以下配置中,连接数据库为 foobar,因为它是 connection.uri 设置中的值:

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/foobar
spark.mongodb.read.database=bar

您可以在collection变更流 配置属性中指定多个集合,用逗号分隔集合名称。 不要在集合之间添加空格,除非空格是集合名称的一部分。

指定多个集合,如下例所示:

...
.option("spark.mongodb.collection", "collectionOne,collectionTwo")

如果集合名称为 "*",或者名称包含逗号或反斜杠 (\),则必须按如下方式对字符进行转义:

  • 如果collection配置选项中使用的集合名称包含逗号,则 Spark Connector 会将其视为两个不同的集合。 为避免这种情况,必须在逗号前面加上反斜杠 (\) 来对逗号进行转义。 对名为 "my,collection" 的集合进行转义,如下所示:

    "my\,collection"
  • 如果collection配置选项中使用的集合名称为“*”,Spark Connector 会将其解释为扫描所有集合的规范。 为避免这种情况,必须在星号前加上反斜杠 (\) 对星号进行转义。 对名为 "*" 的集合进行转义,如下所示:

    "\*"
  • 如果collection配置选项中使用的集合名称包含反斜杠 (\),则 Spark Connector 会将反斜杠视为转义字符,这可能会改变它对该值的解释方式。 为避免这种情况,必须在反斜杠前面加上另一个反斜杠来对其进行转义。 对名为 "\collection" 的集合进行转义,如下所示:

    "\\collection"

    注意

    string在 中将集合名称指定为Java 文字时,必须使用另一个反斜杠进一步转义每个反斜杠。例如,对名为 "\collection" 的集合进行转义,如下所示:

    "\\\\collection"

您可以通过将星号 (*) 作为集合名称的string传递,从数据库中的所有集合进行流式传输。

指定所有集合,如下例所示:

...
.option("spark.mongodb.collection", "*")

如果您在从所有集合进行流式传输时创建集合,则新集合会自动包含在流中。

在从多个集合进行流式传输时,您可以随时删除集合。

重要

推断具有多个集合的模式

如果将change.stream.publish.full.document.only选项设置为true ,Spark Connector 将使用扫描文档的模式推断DataFrame的模式。

模式推断发生在流式传输开始时,并且不考虑流式传输期间创建的集合。

从多个集合进行流式传输并推断模式时, connector会按顺序对每个集合进行采样。 来自大量集合的流式传输可能会导致模式推断的性能明显降低。 这种性能影响仅在推断模式时才会发生。

后退

读取