Docs 菜单
Docs 主页
/
Spark Connector
/ /

批量读取配置选项

在此页面上

  • Overview
  • 分区器配置
  • 指定属性,在 connection.uri

以批处理模式从 MongoDB 读取数据时,可以配置以下属性。

注意

如果您使用 SparkConf 设置连接器的读取配置,请为每个属性添加前缀 spark.mongodb.read.

属性名称
说明
connection.uri
Required.
The connection string configuration key.

Default: mongodb://localhost:27017/
database
Required.
The database name configuration.
collection
Required.
The collection name configuration.
comment
The comment to append to the read operation. Comments appear in the output of the Database Profiler.

Default: None
mode
The parsing strategy to use when handling documents that don't match the expected schema. This option accepts the following values:
  • ReadConfig.ParseMode.FAILFAST:在解析与模式不匹配的文档时引发异常。

  • ReadConfig.ParseMode.PERMISSIVE:当数据类型与模式不匹配时,将字段设置为null 。 要将每个无效文档存储为扩展JSON string ,请将此值与 columnNameOfCorruptRecord 选项结合使用。

  • ReadConfig.ParseMode.DROPMALFORMED:忽略任何与模式不匹配的文档。


Default: ReadConfig.ParseMode.FAILFAST
columnNameOfCorruptRecord
If you set the mode option to ReadConfig.ParseMode.PERMISSIVE, this option specifies the name of the new column that stores the invalid document as extended JSON. If you're using an explicit schema, it must include the name of the new column. If you're using an inferred schema, the Spark Connector adds the new column to the end of the schema.

Default: None
mongoClientFactory
MongoClientFactory configuration key.
You can specify a custom implementation which must implement the com.mongodb.spark.sql.connector.connection.MongoClientFactory interface.

Default: com.mongodb.spark.sql.connector.connection.DefaultMongoClientFactory
partitioner
The partitioner full class name.
You can specify a custom implementation that must implement the com.mongodb.spark.sql.connector.read.partitioner.Partitioner interface.
See the Partitioner Configuration section for more information about partitioners.

Default: com.mongodb.spark.sql.connector.read.partitioner.SamplePartitioner
partitioner.options.
Partitioner configuration prefix.
See the Partitioner Configuration section for more information about partitioners.
sampleSize
The number of documents to sample from the collection when inferring
the schema.

Default: 1000
sql.inferSchema.mapTypes.enabled
Whether to enable Map types when inferring the schema.
When enabled, large compatible struct types are inferred to a MapType instead.

Default: true
sql.inferSchema.mapTypes.minimum.key.size
Minimum size of a StructType before inferring as a MapType.

Default: 250
aggregation.pipeline
Specifies a custom aggregation pipeline to apply to the collection before sending data to Spark.
The value must be either an extended JSON single document or list of documents.
A single document resembles the following:
{"$match": {"closed": false}}
A list of documents resembles the following:
[{"$match": {"closed": false}}, {"$project": {"status": 1, "name": 1, "description": 1}}]

重要

自定义聚合管道必须与分区器策略兼容。例如,诸如 $group 之类的聚合阶段不适用于创建多个分区的任何分区器。

aggregation.allowDiskUse
Specifies whether to allow storage to disk when running the aggregation.

Default: true
outputExtendedJson
When true, the connector converts BSON types not supported by Spark into extended JSON strings. When false, the connector uses the original relaxed JSON format for unsupported types.

Default: false
schemaHint
Specifies a partial schema of known field types to use when inferring the schema for the collection. To learn more about the schemaHint option, see the Specify Known Fields with Schema Hints section.

Default: None

分区器更改使用 Spark Connector 的批量读取的读取行为。通过将数据划分为多个分区,您可以并行运行转换。

本部分包含以下分区器的配置信息:

注意

仅批量读取

由于数据流处理引擎生成单个数据流,因此分区器不会影响流式读取。

SamplePartitioner 是默认的分区器配置。此配置允许您指定分区字段、分区大小和每个分区的样本数。

要使用此配置,请将 partitioner 配置选项设置为 com.mongodb.spark.sql.connector.read.partitioner.SamplePartitioner

属性名称
说明
partitioner.options.partition.field

用于分区的字段,必须是唯一的字段。

默认: _id

partitioner.options.partition.size

每个分区的大小(以 MB 为单位)。分区大小越小,创建的分区越多,包含的文档越少。

默认: 64

partitioner.options.samples.per.partition

每个分区要采集的样本数。采集的样本总数为:

samples per partition * ( count / number of documents per partition)

默认: 10

例子

对于包含 640 个文档且平均文档大小为 0.5 MB 的集合,默认 SamplePartitioner 配置会创建 5 个分区,每个分区包含 128 个文档。

Spark Connector 对 50 个文档进行采样(每个预期分区默认为 10 个),并通过从采样文档中选择分区字段范围来定义 5 个分区。

ShardedPartitioner 配置会根据您的分片配置自动对数据进行分区。

要使用此配置,请将 partitioner 配置选项设置为 com.mongodb.spark.sql.connector.read.partitioner.ShardedPartitioner

重要

ShardedPartitioner 限制

  1. 在 MongoDB Server v 6.0及更高版本中,分片操作会创建一个较大的初始数据段来覆盖所有分片键值,从而导致分片分区器效率低下。 我们不建议在连接到 MongoDB v 6.0及更高版本时使用分片分区器。

  2. 分片分区器与哈希分片键不兼容。

PaginateBySizePartitioner 配置通过使用平均文档大小将集合拆分为平均大小的数据块来对数据进行分页。

要使用此配置,请将 partitioner 配置选项设置为 com.mongodb.spark.sql.connector.read.partitioner.PaginateBySizePartitioner

属性名称
说明
partitioner.options.partition.field

用于分区的字段,必须是唯一的字段。

默认: _id

partitioner.options.partition.size

每个分区的大小(以 MB 为单位)。分区大小越小,

创建的分区越多,包含的文档越少。

默认: 64

PaginateIntoPartitionsPartitioner 配置通过将集合中的文档计数除以允许的最大分区数来对数据进行分页。

要使用此配置,请将 partitioner 配置选项设置为 com.mongodb.spark.sql.connector.read.partitioner.PaginateIntoPartitionsPartitioner

属性名称
说明
partitioner.options.partition.field

用于分区的字段,必须是唯一的字段。

默认: _id

partitioner.options.max.number.of.partitions

要创建的分区数量。

默认: 64

SinglePartitionPartitioner 配置创建单个分区。

要使用此配置,请将 partitioner 配置选项设置为 com.mongodb.spark.sql.connector.read.partitioner.SinglePartitionPartitioner

AutoBucketPartitioner配置与SamplePartitioner配置类似,但使用$bucketAuto聚合阶段对数据进行分页。 通过使用此配置,您可以跨单个或多个字段(包括嵌套字段)对数据进行分区。

要使用此配置,请将 partitioner 配置选项设置为 com.mongodb.spark.sql.connector.read.partitioner.AutoBucketPartitioner

属性名称
说明
partitioner.options.partition.fieldList

用于分区的字段列表。 该值可以是单个字段名称,也可以是逗号分隔字段的列表。

默认: _id

partitioner.options.partition.chunkSize

每个分区的平均大小 (MB)。 较小的分区会创建更多包含较少文档的分区。 由于此配置使用平均文档大小来确定每个分区的文档数量,因此分区的大小可能不同。

默认: 64

partitioner.options.partition.samplesPerPartition

每个分区要采集的样本数。

默认: 100

partitioner.options.partition.partitionKeyProjectionField

用于投影字段的字段名称,该投影字段包含用于对集合进行分区的所有字段。 建议仅当每个文档已包含__idx字段时才更改此属性的值。

默认: __idx

如果使用 SparkConf 指定了之前的任何设置,可以将其包含在 connection.uri 设置中,也可以单独列出。

以下代码示例显示如何将数据库、集合和读取偏好指定为 connection.uri 设置的一部分:

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/myDB.myCollection?readPreference=primaryPreferred

为了缩短 connection.uri 并使设置更易于阅读,您可以改为单独指定它们:

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/
spark.mongodb.read.database=myDB
spark.mongodb.read.collection=myCollection
spark.mongodb.read.readPreference.name=primaryPreferred

重要

如果您在 connection.uri 及其自己的行中指定设置,则 connection.uri 设置优先。例如,在以下配置中,连接数据库为 foobar,因为它是 connection.uri 设置中的值:

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/foobar
spark.mongodb.read.database=bar

后退

以批处理模式从 MongoDB 读取