Opções de configuração de leitura em lote
Visão geral
Você pode configurar as seguintes propriedades ao ler dados do MongoDB no modo de lote.
Observação
Se você usa o SparkConf
para definir as configurações de leitura do conector, insira spark.mongodb.read.
como prefixo em cada propriedade.
Nome da propriedade | Descrição | ||
---|---|---|---|
connection.uri | Required. The connection string configuration key. Default: mongodb://localhost:27017/ | ||
database | Required. The database name configuration. | ||
collection | Required. The collection name configuration. | ||
comment | The comment to append to the read operation. Comments appear in the
output of the Database Profiler. Default: None | ||
mode | The parsing strategy to use when handling documents that don't match the
expected schema. This option accepts the following values:
Default: ReadConfig.ParseMode.FAILFAST | ||
columnNameOfCorruptRecord | If you set the mode option to ReadConfig.ParseMode.PERMISSIVE ,
this option specifies the name of the new column that stores the invalid
document as extended JSON. If you're using an explicit schema, it must
include the name of the new column. If you're
using an inferred schema, the Spark Connector adds the new column to the
end of the schema.Default: None | ||
mongoClientFactory | MongoClientFactory configuration key. You can specify a custom implementation which must implement the
com.mongodb.spark.sql.connector.connection.MongoClientFactory
interface.Default: com.mongodb.spark.sql.connector.connection.DefaultMongoClientFactory | ||
partitioner | The partitioner full class name. You can specify a custom implementation that must implement the
com.mongodb.spark.sql.connector.read.partitioner.Partitioner
interface.See the
Partitioner Configuration section for more
information about partitioners. Default: com.mongodb.spark.sql.connector.read.partitioner.SamplePartitioner | ||
partitioner.options. | Partitioner configuration prefix. See the
Partitioner Configuration section for more
information about partitioners. | ||
sampleSize | The number of documents to sample from the collection when inferring the schema. Default: 1000 | ||
sql.inferSchema.mapTypes.enabled | Whether to enable Map types when inferring the schema. When enabled, large compatible struct types are inferred to a
MapType instead.Default: true | ||
sql.inferSchema.mapTypes.minimum.key.size | Minimum size of a StructType before inferring as a MapType .Default: 250 | ||
aggregation.pipeline | Specifies a custom aggregation pipeline to apply to the collection
before sending data to Spark. The value must be either an extended JSON single document or list
of documents. A single document resembles the following:
A list of documents resembles the following:
ImportantePipelines de agregação personalizados devem ser compatíveis com a estratégia do particionador. Por exemplo, estágios de agregação como | ||
aggregation.allowDiskUse | Specifies whether to allow storage to disk when running the
aggregation. Default: true | ||
outputExtendedJson | When true , the connector converts BSON types not supported by Spark into
extended JSON strings.
When false , the connector uses the original relaxed JSON format for
unsupported types.Default: false | ||
schemaHint | Specifies a partial schema of known field types to use when inferring
the schema for the collection. To learn more about the schemaHint
option, see the Specify Known Fields with Schema Hints section.Default: None |
Configurações do particionador
Particionadores alteram o comportamento de leitura das leituras em lote que usam o Conector Spark. Ao dividir os dados em partições, você pode executar transformações em paralelo.
Esta seção contém informações de configuração para o seguinte particionador:
Observação
Somente leituras em lote
Como o mecanismo de processamento de fluxo de dados produz um único fluxo de dados, os particionadores não afetam as leituras de streaming.
SamplePartitioner
Configuração
SamplePartitioner
é a configuração padrão do particionador. Esta configuração permite especificar um campo de partição, tamanho da partição e número de amostras por partição.
Para usar esta configuração, defina a opção de configuração partitioner
como com.mongodb.spark.sql.connector.read.partitioner.SamplePartitioner
.
Nome da propriedade | Descrição | |
---|---|---|
partitioner.options.partition.field | O campo a ser usado para o particionamento, que deve ser um campo único. Padrão: | |
partitioner.options.partition.size | O tamanho (em MB) de cada partição. Tamanhos menores de partição criam mais partições contendo menos documentos. Padrão: | |
partitioner.options.samples.per.partition | O número de amostras a serem coletadas por partição. O número total de amostras coletadas é:
Padrão: |
Exemplo
Para uma coleção com 640 documentos e com um tamanho médio de 0,5 MB por documento, a configuração padrão SamplePartitioner
cria 5 partições com 128 documentos por partição.
O Conector Spark faz amostras de 50 documentos (padrão de 10 por cada partição pretendida) e define 5 partições ao selecionar intervalos de campos de partição dos documentos de amostra.
ShardedPartitioner
Configuração
A configuração do ShardedPartitioner
automaticamente particiona os dados com base na sua configuração de fragmento.
Para usar esta configuração, defina a opção de configuração partitioner
como com.mongodb.spark.sql.connector.read.partitioner.ShardedPartitioner
.
Importante
Restrições do ShardedPartitioner
No MongoDB Server v6.0 e posterior, a operação de fragmentação cria um grande chunk inicial para cobrir todos os valores de chave de shard, tornando o particionador fragmentado ineficiente. Não recomendamos usar o particionador fragmentado quando conectado ao MongoDB v6.0 e posterior.
O particionador fragmentado não é compatível com chaves de shard com hash.
PaginateBySizePartitioner
Configuração
A configuração do PaginateBySizePartitioner
pagina os dados utilizando o tamanho médio do documento para dividir a coleção em partes de tamanho médio.
Para usar esta configuração, defina a opção de configuração partitioner
como com.mongodb.spark.sql.connector.read.partitioner.PaginateBySizePartitioner
.
Nome da propriedade | Descrição |
---|---|
partitioner.options.partition.field | O campo a ser usado para o particionamento, que deve ser um campo único. Padrão: |
partitioner.options.partition.size | O tamanho (em MB) de cada partição. Tamanhos de partição menores criam mais partições contendo menos documentos. Padrão: |
PaginateIntoPartitionsPartitioner
Configuração
A configuração do PaginateIntoPartitionsPartitioner
pagina os dados dividindo a contagem de documentos na coleção pelo número máximo de partições permitidas.
Para usar esta configuração, defina a opção de configuração partitioner
como com.mongodb.spark.sql.connector.read.partitioner.PaginateIntoPartitionsPartitioner
.
Nome da propriedade | Descrição |
---|---|
partitioner.options.partition.field | O campo a ser usado para o particionamento, que deve ser um campo único. Padrão: |
partitioner.options.max.number.of.partitions | O número de partições a serem criadas. Padrão: |
SinglePartitionPartitioner
Configuração
A configuração do SinglePartitionPartitioner
cria uma única partição.
Para usar esta configuração, defina a opção de configuração partitioner
como com.mongodb.spark.sql.connector.read.partitioner.SinglePartitionPartitioner
.
AutoBucketPartitioner
Configuração
A configuração AutoBucketPartitioner
é semelhante à configuração SamplePartitioner , mas utiliza o estágio de agregação $bucketAuto para paginar os dados. Ao usar essa configuração, você pode particionar os dados em um ou vários campos, incluindo campos aninhados.
Para usar esta configuração, defina a opção de configuração partitioner
como com.mongodb.spark.sql.connector.read.partitioner.AutoBucketPartitioner
.
Nome da propriedade | Descrição |
---|---|
partitioner.options.partition.fieldList | A lista de campos a serem usados para o particionamento. O valor pode ser um único nome de campo ou uma lista de campos separados por vírgula. Padrão: |
partitioner.options.partition.chunkSize | O tamanho médio (MB) para cada partição. Tamanhos de partição menores criam mais partições contendo menos documentos. Como essa configuração usa o tamanho médio do documento para determinar o número de documentos por partição, as partições podem não ter o mesmo tamanho. Padrão: |
partitioner.options.partition.samplesPerPartition | O número de amostras a serem coletadas por partição. Padrão: |
partitioner.options.partition.partitionKeyProjectionField | O nome do campo a ser usado para um campo projetado que contém todos os campos usados para dividir a coleção. Recomendamos alterar o valor dessa propriedade somente se cada documento já contiver o campo Padrão: |
Especificando propriedades em connection.uri
Se você usa SparkConf para especificar qualquer uma das configurações anteriores, você poderá incluí-las na configuração do connection.uri
ou listá-las individualmente.
O exemplo de código abaixo mostra como especificar o banco de dados, coleção e preferência de leitura como parte da configuração do connection.uri
:
spark.mongodb.read.connection.uri=mongodb://127.0.0.1/myDB.myCollection?readPreference=primaryPreferred
Para manter o connection.uri
curto e facilitar a leitura das configurações, você pode especificá-las individualmente:
spark.mongodb.read.connection.uri=mongodb://127.0.0.1/ spark.mongodb.read.database=myDB spark.mongodb.read.collection=myCollection spark.mongodb.read.readPreference.name=primaryPreferred
Importante
Se você especificar uma configuração em connection.uri
e em sua própria linha, a configuração connection.uri
terá precedência. Por exemplo, na configuração abaixo, o banco de dados de conexão é foobar
, porque é o valor na configuração connection.uri
:
spark.mongodb.read.connection.uri=mongodb://127.0.0.1/foobar spark.mongodb.read.database=bar