批量读取配置选项
Overview
以批处理模式从 MongoDB 读取数据时,可以配置以下属性。
注意
如果您使用 SparkConf
设置连接器的读取配置,请为每个属性添加前缀 spark.mongodb.read.
。
属性名称 | 说明 | ||
---|---|---|---|
connection.uri | Required. The connection string configuration key. Default: mongodb://localhost:27017/ | ||
database | Required. The database name configuration. | ||
collection | Required. The collection name configuration. | ||
comment | The comment to append to the read operation. Comments appear in the
output of the Database Profiler. Default: None | ||
mode | The parsing strategy to use when handling documents that don't match the
expected schema. This option accepts the following values:
Default: ReadConfig.ParseMode.FAILFAST | ||
columnNameOfCorruptRecord | If you set the mode option to ReadConfig.ParseMode.PERMISSIVE ,
this option specifies the name of the new column that stores the invalid
document as extended JSON. If you're using an explicit schema, it must
include the name of the new column. If you're
using an inferred schema, the Spark Connector adds the new column to the
end of the schema.Default: None | ||
mongoClientFactory | MongoClientFactory configuration key. You can specify a custom implementation which must implement the
com.mongodb.spark.sql.connector.connection.MongoClientFactory
interface.Default: com.mongodb.spark.sql.connector.connection.DefaultMongoClientFactory | ||
partitioner | The partitioner full class name. You can specify a custom implementation that must implement the
com.mongodb.spark.sql.connector.read.partitioner.Partitioner
interface.See the
Partitioner Configuration section for more
information about partitioners. Default: com.mongodb.spark.sql.connector.read.partitioner.SamplePartitioner | ||
partitioner.options. | Partitioner configuration prefix. See the
Partitioner Configuration section for more
information about partitioners. | ||
sampleSize | The number of documents to sample from the collection when inferring the schema. Default: 1000 | ||
sql.inferSchema.mapTypes.enabled | Whether to enable Map types when inferring the schema. When enabled, large compatible struct types are inferred to a
MapType instead.Default: true | ||
sql.inferSchema.mapTypes.minimum.key.size | Minimum size of a StructType before inferring as a MapType .Default: 250 | ||
aggregation.pipeline | Specifies a custom aggregation pipeline to apply to the collection
before sending data to Spark. The value must be either an extended JSON single document or list
of documents. A single document resembles the following:
A list of documents resembles the following:
重要自定义聚合管道必须与分区器策略兼容。例如,诸如 | ||
aggregation.allowDiskUse | Specifies whether to allow storage to disk when running the
aggregation. Default: true | ||
outputExtendedJson | When true , the connector converts BSON types not supported by Spark into
extended JSON strings.
When false , the connector uses the original relaxed JSON format for
unsupported types.Default: false | ||
schemaHint | Specifies a partial schema of known field types to use when inferring
the schema for the collection. To learn more about the schemaHint
option, see the Specify Known Fields with Schema Hints section.Default: None |
分区器配置
分区器更改使用 Spark Connector 的批量读取的读取行为。通过将数据划分为多个分区,您可以并行运行转换。
本部分包含以下分区器的配置信息:
注意
仅批量读取
由于数据流处理引擎生成单个数据流,因此分区器不会影响流式读取。
SamplePartitioner
配置
SamplePartitioner
是默认的分区器配置。此配置允许您指定分区字段、分区大小和每个分区的样本数。
要使用此配置,请将 partitioner
配置选项设置为 com.mongodb.spark.sql.connector.read.partitioner.SamplePartitioner
。
属性名称 | 说明 | |
---|---|---|
partitioner.options.partition.field | 用于分区的字段,必须是唯一的字段。 默认: | |
partitioner.options.partition.size | 每个分区的大小(以 MB 为单位)。分区大小越小,创建的分区越多,包含的文档越少。 默认: | |
partitioner.options.samples.per.partition | 每个分区要采集的样本数。采集的样本总数为:
默认: |
例子
对于包含 640 个文档且平均文档大小为 0.5 MB 的集合,默认 SamplePartitioner
配置会创建 5 个分区,每个分区包含 128 个文档。
Spark Connector 对 50 个文档进行采样(每个预期分区默认为 10 个),并通过从采样文档中选择分区字段范围来定义 5 个分区。
ShardedPartitioner
配置
ShardedPartitioner
配置会根据您的分片配置自动对数据进行分区。
要使用此配置,请将 partitioner
配置选项设置为 com.mongodb.spark.sql.connector.read.partitioner.ShardedPartitioner
。
重要
ShardedPartitioner 限制
在 MongoDB Server v 6.0及更高版本中,分片操作会创建一个较大的初始数据段来覆盖所有分片键值,从而导致分片分区器效率低下。 我们不建议在连接到 MongoDB v 6.0及更高版本时使用分片分区器。
分片分区器与哈希分片键不兼容。
PaginateBySizePartitioner
配置
PaginateBySizePartitioner
配置通过使用平均文档大小将集合拆分为平均大小的数据块来对数据进行分页。
要使用此配置,请将 partitioner
配置选项设置为 com.mongodb.spark.sql.connector.read.partitioner.PaginateBySizePartitioner
。
属性名称 | 说明 |
---|---|
partitioner.options.partition.field | 用于分区的字段,必须是唯一的字段。 默认: |
partitioner.options.partition.size | 每个分区的大小(以 MB 为单位)。分区大小越小, 创建的分区越多,包含的文档越少。 默认: |
PaginateIntoPartitionsPartitioner
配置
PaginateIntoPartitionsPartitioner
配置通过将集合中的文档计数除以允许的最大分区数来对数据进行分页。
要使用此配置,请将 partitioner
配置选项设置为 com.mongodb.spark.sql.connector.read.partitioner.PaginateIntoPartitionsPartitioner
。
属性名称 | 说明 |
---|---|
partitioner.options.partition.field | 用于分区的字段,必须是唯一的字段。 默认: |
partitioner.options.max.number.of.partitions | 要创建的分区数量。 默认: |
SinglePartitionPartitioner
配置
SinglePartitionPartitioner
配置创建单个分区。
要使用此配置,请将 partitioner
配置选项设置为 com.mongodb.spark.sql.connector.read.partitioner.SinglePartitionPartitioner
。
AutoBucketPartitioner
配置
AutoBucketPartitioner
配置与SamplePartitioner配置类似,但使用$bucketAuto聚合阶段对数据进行分页。 通过使用此配置,您可以跨单个或多个字段(包括嵌套字段)对数据进行分区。
要使用此配置,请将 partitioner
配置选项设置为 com.mongodb.spark.sql.connector.read.partitioner.AutoBucketPartitioner
。
属性名称 | 说明 |
---|---|
partitioner.options.partition.fieldList | 用于分区的字段列表。 该值可以是单个字段名称,也可以是逗号分隔字段的列表。 默认: |
partitioner.options.partition.chunkSize | 每个分区的平均大小 (MB)。 较小的分区会创建更多包含较少文档的分区。 由于此配置使用平均文档大小来确定每个分区的文档数量,因此分区的大小可能不同。 默认: |
partitioner.options.partition.samplesPerPartition | 每个分区要采集的样本数。 默认: |
partitioner.options.partition.partitionKeyProjectionField | 用于投影字段的字段名称,该投影字段包含用于对集合进行分区的所有字段。 建议仅当每个文档已包含 默认: |
指定属性,在 connection.uri
如果使用 SparkConf 指定了之前的任何设置,可以将其包含在 connection.uri
设置中,也可以单独列出。
以下代码示例显示如何将数据库、集合和读取偏好指定为 connection.uri
设置的一部分:
spark.mongodb.read.connection.uri=mongodb://127.0.0.1/myDB.myCollection?readPreference=primaryPreferred
为了缩短 connection.uri
并使设置更易于阅读,您可以改为单独指定它们:
spark.mongodb.read.connection.uri=mongodb://127.0.0.1/ spark.mongodb.read.database=myDB spark.mongodb.read.collection=myCollection spark.mongodb.read.readPreference.name=primaryPreferred
重要
如果您在 connection.uri
及其自己的行中指定设置,则 connection.uri
设置优先。例如,在以下配置中,连接数据库为 foobar
,因为它是 connection.uri
设置中的值:
spark.mongodb.read.connection.uri=mongodb://127.0.0.1/foobar spark.mongodb.read.database=bar