Menu Docs
Página inicial do Docs
/
Conector do Spark
/ /

Opções de configuração de leitura em lote

Nesta página

  • Visão geral
  • Configurações do particionador
  • Especificando propriedades em connection.uri

Você pode configurar as seguintes propriedades ao ler dados do MongoDB no modo de lote.

Observação

Se você usa o SparkConf para definir as configurações de leitura do conector, insira spark.mongodb.read. como prefixo em cada propriedade.

Nome da propriedade
Descrição
connection.uri
Required.
The connection string configuration key.

Default: mongodb://localhost:27017/
database
Required.
The database name configuration.
collection
Required.
The collection name configuration.
comment
The comment to append to the read operation. Comments appear in the output of the Database Profiler.

Default: None
mode
The parsing strategy to use when handling documents that don't match the expected schema. This option accepts the following values:
  • ReadConfig.ParseMode.FAILFAST: Lança uma exceção ao analisar um documento que não corresponde ao esquema.

  • ReadConfig.ParseMode.PERMISSIVE: Define campos para null quando os tipos de dados não correspondem ao esquema. Para armazenar cada documento inválido como uma string JSON estendida, combine este valor com a opção columnNameOfCorruptRecord .

  • ReadConfig.ParseMode.DROPMALFORMED: ignora qualquer documento que não corresponda ao esquema.


Default: ReadConfig.ParseMode.FAILFAST
columnNameOfCorruptRecord
If you set the mode option to ReadConfig.ParseMode.PERMISSIVE, this option specifies the name of the new column that stores the invalid document as extended JSON. If you're using an explicit schema, it must include the name of the new column. If you're using an inferred schema, the Spark Connector adds the new column to the end of the schema.

Default: None
mongoClientFactory
MongoClientFactory configuration key.
You can specify a custom implementation which must implement the com.mongodb.spark.sql.connector.connection.MongoClientFactory interface.

Default: com.mongodb.spark.sql.connector.connection.DefaultMongoClientFactory
partitioner
The partitioner full class name.
You can specify a custom implementation that must implement the com.mongodb.spark.sql.connector.read.partitioner.Partitioner interface.
See the Partitioner Configuration section for more information about partitioners.

Default: com.mongodb.spark.sql.connector.read.partitioner.SamplePartitioner
partitioner.options.
Partitioner configuration prefix.
See the Partitioner Configuration section for more information about partitioners.
sampleSize
The number of documents to sample from the collection when inferring
the schema.

Default: 1000
sql.inferSchema.mapTypes.enabled
Whether to enable Map types when inferring the schema.
When enabled, large compatible struct types are inferred to a MapType instead.

Default: true
sql.inferSchema.mapTypes.minimum.key.size
Minimum size of a StructType before inferring as a MapType.

Default: 250
aggregation.pipeline
Specifies a custom aggregation pipeline to apply to the collection before sending data to Spark.
The value must be either an extended JSON single document or list of documents.
A single document resembles the following:
{"$match": {"closed": false}}
A list of documents resembles the following:
[{"$match": {"closed": false}}, {"$project": {"status": 1, "name": 1, "description": 1}}]

Importante

Pipelines de agregação personalizados devem ser compatíveis com a estratégia do particionador. Por exemplo, estágios de agregação como $group não funcionam com nenhum particionador que cria mais de uma partição.

aggregation.allowDiskUse
Specifies whether to allow storage to disk when running the aggregation.

Default: true
outputExtendedJson
When true, the connector converts BSON types not supported by Spark into extended JSON strings. When false, the connector uses the original relaxed JSON format for unsupported types.

Default: false
schemaHint
Specifies a partial schema of known field types to use when inferring the schema for the collection. To learn more about the schemaHint option, see the Specify Known Fields with Schema Hints section.

Default: None

Particionadores alteram o comportamento de leitura das leituras em lote que usam o Conector Spark. Ao dividir os dados em partições, você pode executar transformações em paralelo.

Esta seção contém informações de configuração para o seguinte particionador:

Observação

Somente leituras em lote

Como o mecanismo de processamento de fluxo de dados produz um único fluxo de dados, os particionadores não afetam as leituras de streaming.

SamplePartitioner é a configuração padrão do particionador. Esta configuração permite especificar um campo de partição, tamanho da partição e número de amostras por partição.

Para usar esta configuração, defina a opção de configuração partitioner como com.mongodb.spark.sql.connector.read.partitioner.SamplePartitioner.

Nome da propriedade
Descrição
partitioner.options.partition.field

O campo a ser usado para o particionamento, que deve ser um campo único.

Padrão: _id

partitioner.options.partition.size

O tamanho (em MB) de cada partição. Tamanhos menores de partição criam mais partições contendo menos documentos.

Padrão: 64

partitioner.options.samples.per.partition

O número de amostras a serem coletadas por partição. O número total de amostras coletadas é:

samples per partition * ( count / number of documents per partition)

Padrão: 10

Exemplo

Para uma coleção com 640 documentos e com um tamanho médio de 0,5 MB por documento, a configuração padrão SamplePartitioner cria 5 partições com 128 documentos por partição.

O Conector Spark faz amostras de 50 documentos (padrão de 10 por cada partição pretendida) e define 5 partições ao selecionar intervalos de campos de partição dos documentos de amostra.

A configuração do ShardedPartitioner automaticamente particiona os dados com base na sua configuração de fragmento.

Para usar esta configuração, defina a opção de configuração partitioner como com.mongodb.spark.sql.connector.read.partitioner.ShardedPartitioner.

Importante

Restrições do ShardedPartitioner

  1. No MongoDB Server v6.0 e posterior, a operação de fragmentação cria um grande chunk inicial para cobrir todos os valores de chave de shard, tornando o particionador fragmentado ineficiente. Não recomendamos usar o particionador fragmentado quando conectado ao MongoDB v6.0 e posterior.

  2. O particionador fragmentado não é compatível com chaves de shard com hash.

A configuração do PaginateBySizePartitioner pagina os dados utilizando o tamanho médio do documento para dividir a coleção em partes de tamanho médio.

Para usar esta configuração, defina a opção de configuração partitioner como com.mongodb.spark.sql.connector.read.partitioner.PaginateBySizePartitioner.

Nome da propriedade
Descrição
partitioner.options.partition.field

O campo a ser usado para o particionamento, que deve ser um campo único.

Padrão: _id

partitioner.options.partition.size

O tamanho (em MB) de cada partição. Tamanhos de partição menores

criam mais partições contendo menos documentos.

Padrão: 64

A configuração do PaginateIntoPartitionsPartitioner pagina os dados dividindo a contagem de documentos na coleção pelo número máximo de partições permitidas.

Para usar esta configuração, defina a opção de configuração partitioner como com.mongodb.spark.sql.connector.read.partitioner.PaginateIntoPartitionsPartitioner.

Nome da propriedade
Descrição
partitioner.options.partition.field

O campo a ser usado para o particionamento, que deve ser um campo único.

Padrão: _id

partitioner.options.max.number.of.partitions

O número de partições a serem criadas.

Padrão: 64

A configuração do SinglePartitionPartitioner cria uma única partição.

Para usar esta configuração, defina a opção de configuração partitioner como com.mongodb.spark.sql.connector.read.partitioner.SinglePartitionPartitioner.

A configuração AutoBucketPartitioner é semelhante à configuração SamplePartitioner , mas utiliza o estágio de agregação $bucketAuto para paginar os dados. Ao usar essa configuração, você pode particionar os dados em um ou vários campos, incluindo campos aninhados.

Para usar esta configuração, defina a opção de configuração partitioner como com.mongodb.spark.sql.connector.read.partitioner.AutoBucketPartitioner.

Nome da propriedade
Descrição
partitioner.options.partition.fieldList

A lista de campos a serem usados para o particionamento. O valor pode ser um único nome de campo ou uma lista de campos separados por vírgula.

Padrão: _id

partitioner.options.partition.chunkSize

O tamanho médio (MB) para cada partição. Tamanhos de partição menores criam mais partições contendo menos documentos. Como essa configuração usa o tamanho médio do documento para determinar o número de documentos por partição, as partições podem não ter o mesmo tamanho.

Padrão: 64

partitioner.options.partition.samplesPerPartition

O número de amostras a serem coletadas por partição.

Padrão: 100

partitioner.options.partition.partitionKeyProjectionField

O nome do campo a ser usado para um campo projetado que contém todos os campos usados para dividir a coleção. Recomendamos alterar o valor dessa propriedade somente se cada documento já contiver o campo __idx .

Padrão: __idx

Se você usa SparkConf para especificar qualquer uma das configurações anteriores, você poderá incluí-las na configuração do connection.uri ou listá-las individualmente.

O exemplo de código abaixo mostra como especificar o banco de dados, coleção e preferência de leitura como parte da configuração do connection.uri:

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/myDB.myCollection?readPreference=primaryPreferred

Para manter o connection.uri curto e facilitar a leitura das configurações, você pode especificá-las individualmente:

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/
spark.mongodb.read.database=myDB
spark.mongodb.read.collection=myCollection
spark.mongodb.read.readPreference.name=primaryPreferred

Importante

Se você especificar uma configuração em connection.uri e em sua própria linha, a configuração connection.uri terá precedência. Por exemplo, na configuração abaixo, o banco de dados de conexão é foobar, porque é o valor na configuração connection.uri:

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/foobar
spark.mongodb.read.database=bar

Voltar

Ler do MongoDB no modo de lote