Opções de configuração de leitura de streaming
Nesta página
Visão geral
Você pode configurar as seguintes propriedades ao ler dados do MongoDB no modo de streaming.
Observação
Se você usa o SparkConf
para definir as configurações de leitura do conector, insira spark.mongodb.read.
como prefixo em cada propriedade.
Nome da propriedade | Descrição | ||
---|---|---|---|
connection.uri | Required. The connection string configuration key. Default: mongodb://localhost:27017/ | ||
database | Required. The database name configuration. | ||
collection | Required. The collection name configuration. You can specify multiple collections by separating the collection names
with a comma. To learn more about specifying multiple collections, see Specifying Multiple Collections in the collection Property. | ||
comment | The comment to append to the read operation. Comments appear in the
output of the Database Profiler. Default: None | ||
mode | The parsing strategy to use when handling documents that don't match the
expected schema. This option accepts the following values:
Default: ReadConfig.ParseMode.FAILFAST | ||
columnNameOfCorruptRecord | If you set the mode option to ReadConfig.ParseMode.PERMISSIVE ,
this option specifies the name of the new column that stores the invalid
document as extended JSON. If you're using an explicit schema, it must
include the name of the new column. If you're
using an inferred schema, the Spark Connector adds the new column to the
end of the schema.Default: None | ||
mongoClientFactory | MongoClientFactory configuration key. You can specify a custom implementation, which must implement the
com.mongodb.spark.sql.connector.connection.MongoClientFactory
interface.Default: com.mongodb.spark.sql.connector.connection.DefaultMongoClientFactory | ||
aggregation.pipeline | Specifies a custom aggregation pipeline to apply to the collection
before sending data to Spark. The value must be either an extended JSON single document or list
of documents. A single document resembles the following:
A list of documents resembles the following:
Pipelines de agregação personalizados devem ser compatíveis com a estratégia do particionador. Por exemplo, estágios de agregação como | ||
aggregation.allowDiskUse | Specifies whether to allow storage to disk when running the
aggregation. Default: true | ||
change.stream. | Change stream configuration prefix. See the
Change Stream Configuration section for more
information about change streams. | ||
outputExtendedJson | When true , the connector converts BSON types not supported by Spark into
extended JSON strings.
When false , the connector uses the original relaxed JSON format for
unsupported types.Default: false | ||
schemaHint | Specifies a partial schema of known field types to use when inferring
the schema for the collection. To learn more about the schemaHint
option, see the Specify Known Fields with Schema Hints section.Default: None |
Alterar configuração do stream
Você pode configurar as seguintes propriedades ao ler um change stream do MongoDB:
Nome da propriedade | Descrição |
---|---|
change.stream.lookup.full.document | Determina quais valores seu change stream retorna nas operações de atualização. A configuração padrão retorna as diferenças entre o documento original e o documento atualizado. A configuração Para obter mais informações sobre como essa opção de fluxo de alterações funciona, consulte o guia manual do servidor MongoDB Pesquisar documento completo para operação de atualização. Padrão: "default" |
change.stream.micro.batch.max.partition.count | The maximum number of partitions the Spark Connector divides each
micro-batch into. Spark workers can process these partitions in parallel. This setting applies only when using micro-batch streams. Default: 1 AVISO: especificar um valor superior a |
change.stream.publish.full.document.only | Specifies whether to publish the changed document or the full
change stream document. When this setting is false , you must specify a schema. The schema
must include all fields that you want to read from the change stream. You can
use optional fields to ensure that the schema is valid for all change-stream
events.When this setting is true , the connector exhibits the following behavior:
Esta configuração substitui a configuração Padrão: |
change.stream.startup.mode | Specifies how the connector starts up when no offset is available. This setting accepts the following values:
|
Especificando propriedades em connection.uri
Se você usa SparkConf para especificar qualquer uma das configurações anteriores, você poderá incluí-las na configuração do connection.uri
ou listá-las individualmente.
O exemplo de código abaixo mostra como especificar o banco de dados, coleção e preferência de leitura como parte da configuração do connection.uri
:
spark.mongodb.read.connection.uri=mongodb://127.0.0.1/myDB.myCollection?readPreference=primaryPreferred
Para manter o connection.uri
curto e facilitar a leitura das configurações, você pode especificá-las individualmente:
spark.mongodb.read.connection.uri=mongodb://127.0.0.1/ spark.mongodb.read.database=myDB spark.mongodb.read.collection=myCollection spark.mongodb.read.readPreference.name=primaryPreferred
Importante
Se você especificar uma configuração em connection.uri
e em sua própria linha, a configuração connection.uri
terá precedência. Por exemplo, na configuração abaixo, o banco de dados de conexão é foobar
, porque é o valor na configuração connection.uri
:
spark.mongodb.read.connection.uri=mongodb://127.0.0.1/foobar spark.mongodb.read.database=bar
Especificando múltiplas coleções na collection
propriedade
Você pode especificar várias collections na propriedade de configuração change stream collection
separando os nomes das collections com uma vírgula. Não adicione um espaço entre as coleções, a menos que o espaço faça parte do nome da coleção.
Especifique várias collections, como mostrado no exemplo a seguir:
... .option("spark.mongodb.collection", "collectionOne,collectionTwo")
Se o nome de uma coleção for "*", ou se o nome incluir uma vírgula ou uma barra invertida (\), você deverá trocar o caractere da seguinte maneira:
Se o nome de uma collection usada em sua opção de configuração
collection
contiver uma vírgula, o Spark Connector a tratará como duas collections diferentes. Para evitar isso, você deve escapar da vírgula precedendo-a com uma barra invertida (\). Escape de uma coleção chamada "minha coleção" da seguinte maneira:"my\,collection" Se o nome de uma collection usada em sua opção de configuração
collection
for "*", o Spark Connector o interpretará como uma especificação para verificar todas as collections. Para evitar isso, você deve escapar do asterisco precedendo-o com uma barra invertida (\). Escape de uma collection chamada "*" da seguinte maneira:"\*" Se o nome de uma coleção usada em sua opção de configuração do
collection
contiver uma barra invertida (\), o Spark Connector tratará a barra invertida como um caractere de escape, o que pode alterar a forma como ele interpreta o valor. Para evitar isso, você deve escapar da barra invertida precedendo-a com outra barra invertida. Escape de uma coleção chamada "\collection" da seguinte maneira:"\\collection" Observação
Ao especificar o nome da coleção como uma string literal em Java, você deve escapar ainda mais de cada barra invertida com outra. Por exemplo, escape de uma collection chamada "\collection" da seguinte maneira:
"\\\\collection"
Você pode transmitir de todas as coleções no banco de dados passando um asterisco() como uma string para o nome da coleção.
Especifique todas as collections como mostrado no exemplo a seguir:
... .option("spark.mongodb.collection", "*")
Se você criar uma collection durante a transmissão a partir de todas as collections, a nova collection será automaticamente incluída no stream.
Você pode descartar coleções a qualquer momento durante a transmissão de várias coleções.
Importante
Inferindo o esquema com múltiplas coleções
Se você definir a opção change.stream.publish.full.document.only
como true
, o Spark Connector inferirá o esquema de um DataFrame
usando o esquema dos documentos digitalizados.
A inferência de esquema acontece no início da transmissão e não leva em conta as coleções criadas durante a transmissão.
Ao transmitir de várias coleções e inferir o esquema, o connector amostra cada coleção sequencialmente. O streaming de um grande número de collections pode fazer com que a inferência de esquema tenha desempenho notoriamente mais lento. Esse impacto no desempenho ocorre apenas ao inferir o esquema.