Menu Docs
Página inicial do Docs
/
Conector do Spark
/ /

Opções de configuração de leitura de streaming

Nesta página

  • Visão geral
  • Alterar configuração do stream
  • Especificando propriedades em connection.uri
  • Especificando múltiplas coleções na propriedade collection

Você pode configurar as seguintes propriedades ao ler dados do MongoDB no modo de streaming.

Observação

Se você usa o SparkConf para definir as configurações de leitura do conector, insira spark.mongodb.read. como prefixo em cada propriedade.

Nome da propriedade
Descrição
connection.uri
Required.
The connection string configuration key.

Default: mongodb://localhost:27017/
database
Required.
The database name configuration.
collection
Required.
The collection name configuration.
You can specify multiple collections by separating the collection names with a comma.

To learn more about specifying multiple collections, see Specifying Multiple Collections in the collection Property.
comment
The comment to append to the read operation. Comments appear in the output of the Database Profiler.

Default: None
mode
The parsing strategy to use when handling documents that don't match the expected schema. This option accepts the following values:
  • ReadConfig.ParseMode.FAILFAST: Lança uma exceção ao analisar um documento que não corresponde ao esquema.

  • ReadConfig.ParseMode.PERMISSIVE: Define campos para null quando os tipos de dados não correspondem ao esquema. Para armazenar cada documento inválido como uma string JSON estendida, combine este valor com a opção columnNameOfCorruptRecord .

  • ReadConfig.ParseMode.DROPMALFORMED: ignora qualquer documento que não corresponda ao esquema.


Default: ReadConfig.ParseMode.FAILFAST
columnNameOfCorruptRecord
If you set the mode option to ReadConfig.ParseMode.PERMISSIVE, this option specifies the name of the new column that stores the invalid document as extended JSON. If you're using an explicit schema, it must include the name of the new column. If you're using an inferred schema, the Spark Connector adds the new column to the end of the schema.

Default: None
mongoClientFactory
MongoClientFactory configuration key.
You can specify a custom implementation, which must implement the com.mongodb.spark.sql.connector.connection.MongoClientFactory interface.

Default: com.mongodb.spark.sql.connector.connection.DefaultMongoClientFactory
aggregation.pipeline
Specifies a custom aggregation pipeline to apply to the collection before sending data to Spark.
The value must be either an extended JSON single document or list of documents.
A single document resembles the following:
{"$match": {"closed": false}}
A list of documents resembles the following:
[{"$match": {"closed": false}}, {"$project": {"status": 1, "name": 1, "description": 1}}]

Pipelines de agregação personalizados devem ser compatíveis com a estratégia do particionador. Por exemplo, estágios de agregação como $group não funcionam com nenhum particionador que cria mais de uma partição.

aggregation.allowDiskUse
Specifies whether to allow storage to disk when running the aggregation.

Default: true
change.stream.
Change stream configuration prefix.
See the Change Stream Configuration section for more information about change streams.
outputExtendedJson
When true, the connector converts BSON types not supported by Spark into extended JSON strings. When false, the connector uses the original relaxed JSON format for unsupported types.

Default: false
schemaHint
Specifies a partial schema of known field types to use when inferring the schema for the collection. To learn more about the schemaHint option, see the Specify Known Fields with Schema Hints section.

Default: None

Você pode configurar as seguintes propriedades ao ler um change stream do MongoDB:

Nome da propriedade
Descrição
change.stream.lookup.full.document

Determina quais valores seu change stream retorna nas operações de atualização.

A configuração padrão retorna as diferenças entre o documento original e o documento atualizado.

A configuração updateLookup também retorna as diferenças entre o documento original e o documento atualizado, mas também inclui uma cópia de todo o documento atualizado.

Para obter mais informações sobre como essa opção de fluxo de alterações funciona, consulte o guia manual do servidor MongoDB Pesquisar documento completo para operação de atualização.

Padrão: "default"

change.stream.micro.batch.max.partition.count
The maximum number of partitions the Spark Connector divides each micro-batch into. Spark workers can process these partitions in parallel.

This setting applies only when using micro-batch streams.

Default: 1

AVISO: especificar um valor superior a 1 pode alterar a ordem em que o Spark Connector processa eventos de mudança. Evite essa configuração se o processamento fora de ordem puder criar inconsistências de dados downstream.

change.stream.publish.full.document.only
Specifies whether to publish the changed document or the full change stream document.

When this setting is false, you must specify a schema. The schema must include all fields that you want to read from the change stream. You can use optional fields to ensure that the schema is valid for all change-stream events.

When this setting is true, the connector exhibits the following behavior:
  • O connector filtra mensagens que omitem o campo fullDocument e publica somente o valor do campo.

  • Se você não especificar um esquema, o connector inferirá o esquema a partir do documento do fluxo de alterações.

Esta configuração substitui a configuração change.stream.lookup.full.document .

Padrão: false

change.stream.startup.mode
Specifies how the connector starts up when no offset is available.
This setting accepts the following values:
  • latest: o connector começa a processar eventos de mudança começando com o evento mais recente. Ele não processará nenhum evento anterior não processado.

  • timestamp: o connector começa a processar eventos de mudança em um horário especificado.

    Para usar a opção timestamp , você deve especificar um horário usando a configuração change.stream.startup.mode.timestamp.start.at.operation.time . Esta configuração aceita carimbos de data/hora nos seguintes formatos:

    Padrão: latest

Se você usa SparkConf para especificar qualquer uma das configurações anteriores, você poderá incluí-las na configuração do connection.uri ou listá-las individualmente.

O exemplo de código abaixo mostra como especificar o banco de dados, coleção e preferência de leitura como parte da configuração do connection.uri:

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/myDB.myCollection?readPreference=primaryPreferred

Para manter o connection.uri curto e facilitar a leitura das configurações, você pode especificá-las individualmente:

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/
spark.mongodb.read.database=myDB
spark.mongodb.read.collection=myCollection
spark.mongodb.read.readPreference.name=primaryPreferred

Importante

Se você especificar uma configuração em connection.uri e em sua própria linha, a configuração connection.uri terá precedência. Por exemplo, na configuração abaixo, o banco de dados de conexão é foobar, porque é o valor na configuração connection.uri:

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/foobar
spark.mongodb.read.database=bar

Você pode especificar várias collections na propriedade de configuração change stream collection separando os nomes das collections com uma vírgula. Não adicione um espaço entre as coleções, a menos que o espaço faça parte do nome da coleção.

Especifique várias collections, como mostrado no exemplo a seguir:

...
.option("spark.mongodb.collection", "collectionOne,collectionTwo")

Se o nome de uma coleção for "*", ou se o nome incluir uma vírgula ou uma barra invertida (\), você deverá trocar o caractere da seguinte maneira:

  • Se o nome de uma collection usada em sua opção de configuração collection contiver uma vírgula, o Spark Connector a tratará como duas collections diferentes. Para evitar isso, você deve escapar da vírgula precedendo-a com uma barra invertida (\). Escape de uma coleção chamada "minha coleção" da seguinte maneira:

    "my\,collection"
  • Se o nome de uma collection usada em sua opção de configuração collection for "*", o Spark Connector o interpretará como uma especificação para verificar todas as collections. Para evitar isso, você deve escapar do asterisco precedendo-o com uma barra invertida (\). Escape de uma collection chamada "*" da seguinte maneira:

    "\*"
  • Se o nome de uma coleção usada em sua opção de configuração do collection contiver uma barra invertida (\), o Spark Connector tratará a barra invertida como um caractere de escape, o que pode alterar a forma como ele interpreta o valor. Para evitar isso, você deve escapar da barra invertida precedendo-a com outra barra invertida. Escape de uma coleção chamada "\collection" da seguinte maneira:

    "\\collection"

    Observação

    Ao especificar o nome da coleção como uma string literal em Java, você deve escapar ainda mais de cada barra invertida com outra. Por exemplo, escape de uma collection chamada "\collection" da seguinte maneira:

    "\\\\collection"

Você pode transmitir de todas as coleções no banco de dados passando um asterisco() como uma string para o nome da coleção.

Especifique todas as collections como mostrado no exemplo a seguir:

...
.option("spark.mongodb.collection", "*")

Se você criar uma collection durante a transmissão a partir de todas as collections, a nova collection será automaticamente incluída no stream.

Você pode descartar coleções a qualquer momento durante a transmissão de várias coleções.

Importante

Inferindo o esquema com múltiplas coleções

Se você definir a opção change.stream.publish.full.document.only como true, o Spark Connector inferirá o esquema de um DataFrame usando o esquema dos documentos digitalizados.

A inferência de esquema acontece no início da transmissão e não leva em conta as coleções criadas durante a transmissão.

Ao transmitir de várias coleções e inferir o esquema, o connector amostra cada coleção sequencialmente. O streaming de um grande número de collections pode fazer com que a inferência de esquema tenha desempenho notoriamente mais lento. Esse impacto no desempenho ocorre apenas ao inferir o esquema.

Voltar

Leia