Docs 菜单
Docs 主页
/
MongoDB Spark Connector
/ /

流式读取配置选项

在此页面上

  • Overview
  • 变更流配置
  • 指定属性,在 connection.uri

以流式传输模式从 MongoDB 读取数据时,可以配置以下属性。

注意

如果您使用 SparkConf 设置连接器的读取配置,请为每个属性添加前缀 spark.mongodb.read.

属性名称
说明

connection.uri

Required.
The connection string configuration key.

Default: mongodb://localhost:27017/

database

Required.
The database name configuration.

collection

Required.
The collection name configuration.

comment

The comment to append to the read operation. Comments appear in the output of the Database Profiler.

Default: None

mongoClientFactory

MongoClientFactory configuration key.
You can specify a custom implementation, which must implement the com.mongodb.spark.sql.connector.connection.MongoClientFactory interface.

Default: com.mongodb.spark.sql.connector.connection.DefaultMongoClientFactory

aggregation.pipeline

Specifies a custom aggregation pipeline to apply to the collection before sending data to Spark.
The value must be either an extended JSON single document or list of documents.
A single document resembles the following:
{"$match": {"closed": false}}
A list of documents resembles the following:
[{"$match": {"closed": false}}, {"$project": {"status": 1, "name": 1, "description": 1}}]

自定义聚合管道必须与分区器策略兼容。例如,诸如 $group 之类的聚合阶段不适用于创建多个分区的任何分区器。

aggregation.allowDiskUse

Specifies whether to allow storage to disk when running the aggregation.

Default: true

change.stream.

Change stream configuration prefix.
See the Change Stream Configuration section for more information about change streams.

outputExtendedJson

When true, the connector converts BSON types not supported by Spark into extended JSON strings. When false, the connector uses the original relaxed JSON format for unsupported types.

Default: false

从 MongoDB 读取变更流时,您可以配置以下属性:

属性名称
说明

change.stream.lookup.full.document

确定变更流在更新操作中返回的值。

默认设置返回原始文档和更新文档之间的差异。

updateLookup设置还返回原始文档和更新文档之间的差异,但它也包括整个更新文档的副本。

有关此变更流选项如何工作的更多信息,请参阅 MongoDB 服务器手册指南“查找更新操作的完整文档”。

默认值: "default"

change.stream.micro.batch.max.partition.count

The maximum number of partitions the Spark Connector divides each micro-batch into. Spark workers can process these partitions in parallel.

This setting applies only when using micro-batch streams.

Default: 1

警告:指定大于1的值可能会改变Spark Connector处理更改事件的顺序。 如果无序处理可能会导致下游数据不一致,请避免此设置。

change.stream.publish.full.document.only

Specifies whether to publish the changed document or the full change stream document.

When this setting is false, you must specify a schema. The schema must include all fields that you want to read from the change stream. You can use optional fields to ensure that the schema is valid for all change-stream events.

When this setting is true, the connector exhibits the following behavior:
  • connector会过滤掉省略 fullDocument 字段的消息,并仅发布该字段的值。

  • 如果不指定模式,Connector将从变更流文档而不是根本的集合中推断模式。

此设置会覆盖change.stream.lookup.full.document设置。

默认false

change.stream.startup.mode

Specifies how the connector starts up when no offset is available.
This setting accepts the following values:

如果使用 SparkConf 指定了之前的任何设置,可以将其包含在 connection.uri 设置中,也可以单独列出。

以下代码示例显示如何将数据库、集合和读取偏好指定为 connection.uri 设置的一部分:

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/myDB.myCollection?readPreference=primaryPreferred

为了缩短 connection.uri 并使设置更易于阅读,您可以改为单独指定它们:

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/
spark.mongodb.read.database=myDB
spark.mongodb.read.collection=myCollection
spark.mongodb.read.readPreference.name=primaryPreferred

重要

如果您在 connection.uri 及其自己的行中指定设置,则 connection.uri 设置优先。例如,在以下配置中,连接数据库为 foobar,因为它是 connection.uri 设置中的值:

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/foobar
spark.mongodb.read.database=bar

后退

读取