Menu Docs
Página inicial do Docs
/
Conector do Spark
/

Ler do MongoDB no modo de transmissão

Nesta página

  • Visão geral
  • Exemplo
  • Documentação da API

Ao ler um fluxo de um MongoDB database, o Spark Connector oferece suporte ao processamento em microlote e ao processamento contínuo. O processamento em micro-lote, o mecanismo de processamento padrão, atinge latências de ponta a ponta de apenas 100 milissegundos com garantias de tolerância a falhas exatamente uma vez. O processamento contínuo é um recurso experimental introduzido no Spark versão 2.3 que atinge latências de ponta a ponta tão baixas quanto 1 milissegundo com garantias de pelo menos uma vez.

Para saber mais sobre o processamento contínuo, consulte a documentação do Spark .

Observação

O conector lê o fluxo de mudança do sistema do MongoDB. Para gerar eventos de alteração no fluxo de alteração, realize operações de atualização em seu banco de dados.

Para saber mais sobre fluxos de alterações, consulte Fluxos de alterações no manual do MongoDB.

Para ler dados do MongoDB, chame o método readStream() em seu objeto SparkSession . Este método gera um objeto DataStreamReader , que você pode usar para especificar o formato e outras definições de configuração para sua operação de leitura e streaming.

Você deve especificar as seguintes definições de configuração para ler do MongoDB:

Contexto
Descrição
readStream.format()
Especifica o formato da fonte de dados de entrada subjacente. Use mongodb para ler a partir do MongoDB.
readStream.option()

Especifica as configurações de fluxo, incluindo a string de conexão de implantação do MongoDB , o banco de banco de dados e a coleção do MongoDB e os estágios do pipeline de agregação .

Para obter uma lista de opções de configuração de fluxo de leitura, consulte o guia Opções de configuração de leitura de fluxo de leitura.

readStream.schema()
Especifica o esquema de entrada.

O seguinte trecho de código mostra como usar as definições de configuração anteriores para processar continuamente dados transmitidos do MongoDB. O connector acrescenta todos os novos dados aos dados existentes e grava de forma assíncrona os checkpoint no /tmp/checkpointDir uma vez por segundo. Passar o parâmetro Trigger.Continuous para o método trigger() habilita o processamento contínuo.

import org.apache.spark.sql.streaming.Trigger;
Dataset<Row> streamingDataset = <local SparkSession>.readStream()
.format("mongodb")
.load();
DataStreamWriter<Row> dataStreamWriter = streamingDataset.writeStream()
.trigger(Trigger.Continuous("1 second"))
.format("memory")
.option("checkpointLocation", "/tmp/checkpointDir")
.outputMode("append");
StreamingQuery query = dataStreamWriter.start();

Observação

O Spark não inicia a transmissão até que você chame o método start() em uma query de transmissão.

Para obter uma lista completa dos métodos, consulte a referência Streaming estruturado Java .

Para ler dados do MongoDB, ligue para a função readStream em seu objeto SparkSession . Esta função retorna um objeto DataStreamReader , que você pode utilizar para especificar o formato e outras definições de configuração para sua operação de leitura de streaming.

Você deve especificar as seguintes definições de configuração para ler do MongoDB:

Contexto
Descrição
readStream.format()
Especifica o formato da fonte de dados de entrada subjacente. Use mongodb para ler a partir do MongoDB.
readStream.option()

Especifica as configurações de fluxo, incluindo a string de conexão de implantação do MongoDB , o banco de banco de dados e a coleção do MongoDB e os estágios do pipeline de agregação .

Para obter uma lista de opções de configuração de fluxo de leitura, consulte o guia Opções de configuração de leitura de streaming .

readStream.schema()
Especifica o esquema de entrada.

O seguinte trecho de código mostra como usar as definições de configuração anteriores para processar continuamente dados transmitidos do MongoDB. O connector acrescenta todos os novos dados aos dados existentes e grava de forma assíncrona os checkpoint no /tmp/checkpointDir uma vez por segundo. Passar o parâmetro continuous para o método trigger() habilita o processamento contínuo.

streamingDataFrame = (<local SparkSession>.readStream
.format("mongodb")
.load()
)
dataStreamWriter = (streamingDataFrame.writeStream
.trigger(continuous="1 second")
.format("memory")
.option("checkpointLocation", "/tmp/checkpointDir")
.outputMode("append")
)
query = dataStreamWriter.start()

Observação

O Spark não inicia a transmissão até que você chame o método start() em uma query de transmissão.

Para obter uma lista completa de métodos, consulte a referência de transmissão estruturada do pyspark.

Para ler dados do MongoDB, chame o método readStream em seu objeto SparkSession . Este método retorna um objeto DataStreamReader , que você pode utilizar para especificar o formato e outras definições de configuração para sua operação de leitura de streaming.

Você deve especificar as seguintes definições de configuração para ler do MongoDB:

Contexto
Descrição
readStream.format()
Especifica o formato da fonte de dados de entrada subjacente. Use mongodb para ler a partir do MongoDB.
readStream.option()

Especifica as configurações de fluxo, incluindo a string de conexão de implantação do MongoDB , o banco de banco de dados e a coleção do MongoDB e os estágios do pipeline de agregação .

Para obter uma lista de opções de configuração de fluxo de leitura, consulte o guia Opções de configuração de leitura de streaming .

readStream.schema()
Especifica o esquema de entrada.

O seguinte trecho de código mostra como usar as definições de configuração anteriores para processar continuamente dados transmitidos do MongoDB. O connector acrescenta todos os novos dados aos dados existentes e grava de forma assíncrona os checkpoint no /tmp/checkpointDir uma vez por segundo. Passar o parâmetro Trigger.Continuous para o método trigger() habilita o processamento contínuo.

import org.apache.spark.sql.streaming.Trigger
val streamingDataFrame = <local SparkSession>.readStream
.format("mongodb")
.load()
val dataStreamWriter = streamingDataFrame.writeStream
.trigger(Trigger.Continuous("1 second"))
.format("memory")
.option("checkpointLocation", "/tmp/checkpointDir")
.outputMode("append")
val query = dataStreamWriter.start()

Observação

O Spark não inicia a transmissão até que você chame o método start() em uma query de transmissão.

Para obter uma lista completa dos métodos, consulte a referência de transmissão estruturada do Scala.

O exemplo a seguir mostra como transmitir dados do MongoDB para o seu console.

  1. Crie um objeto DataStreamReader que leia do MongoDB.

  2. Crie um objeto DataStreamWriter chamando o método writeStream() no objeto de streaming Dataset que você criou com um DataStreamReader. Especifique o formato console utilizando o método format() .

  3. Chame o método start() na instância DataStreamWriter para iniciar o stream.

À medida que novos dados são inseridos no MongoDB, o MongoDB transmite esses dados para o seu console de acordo com o outputMode que você especifica.

Importante

Evite a transmissão de grandes conjuntos de dados para o seu console. A transmissão para o console consome muita memória e destina-se apenas a fins de teste.

// create a local SparkSession
SparkSession spark = SparkSession.builder()
.appName("readExample")
.master("spark://spark-master:<port>")
.config("spark.jars", "<mongo-spark-connector-JAR-file-name>")
.getOrCreate();
// define the schema of the source collection
StructType readSchema = new StructType()
.add("company_symbol", DataTypes.StringType)
.add("company_name", DataTypes.StringType)
.add("price", DataTypes.DoubleType)
.add("tx_time", DataTypes.TimestampType);
// define a streaming query
DataStreamWriter<Row> dataStreamWriter = spark.readStream()
.format("mongodb")
.option("spark.mongodb.connection.uri", "<mongodb-connection-string>")
.option("spark.mongodb.database", "<database-name>")
.option("spark.mongodb.collection", "<collection-name>")
.schema(readSchema)
.load()
// manipulate your streaming data
.writeStream()
.format("console")
.trigger(Trigger.Continuous("1 second"))
.outputMode("append");
// run the query
StreamingQuery query = dataStreamWriter.start();
  1. Crie um objeto DataStreamReader que leia do MongoDB.

  2. Crie um objeto DataStreamWriter chamando o método writeStream() no DataFrame streaming que você criou com um DataStreamReader. Especifique o formato console usando o método format() .

  3. Chame o método start() na instância DataStreamWriter para iniciar o stream.

À medida que novos dados são inseridos no MongoDB, o MongoDB transmite esses dados para o seu console de acordo com o outputMode que você especifica.

Importante

Evite a transmissão de grandes conjuntos de dados para o seu console. A transmissão para o console consome muita memória e destina-se apenas a fins de teste.

# create a local SparkSession
spark = SparkSession.builder \
.appName("readExample") \
.master("spark://spark-master:<port>") \
.config("spark.jars", "<mongo-spark-connector-JAR-file-name>") \
.getOrCreate()
# define the schema of the source collection
readSchema = (StructType()
.add('company_symbol', StringType())
.add('company_name', StringType())
.add('price', DoubleType())
.add('tx_time', TimestampType())
)
# define a streaming query
dataStreamWriter = (spark.readStream
.format("mongodb")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option('spark.mongodb.database', <database-name>)
.option('spark.mongodb.collection', <collection-name>)
.schema(readSchema)
.load()
# manipulate your streaming data
.writeStream
.format("console")
.trigger(continuous="1 second")
.outputMode("append")
)
# run the query
query = dataStreamWriter.start()
  1. Crie um objeto DataStreamReader que leia do MongoDB.

  2. Crie um objeto DataStreamWriter chamando o método writeStream() no objeto DataFrame streaming que você criou usando o DataStreamReader. Especifique o formato console usando o método format() .

  3. Chame o método start() na instância DataStreamWriter para iniciar o stream.

À medida que novos dados são inseridos no MongoDB, o MongoDB transmite esses dados para o seu console de acordo com o outputMode que você especifica.

Importante

Evite a transmissão de grandes conjuntos de dados para o seu console. A transmissão para o console consome muita memória e destina-se apenas a fins de teste.

// create a local SparkSession
val spark = SparkSession.builder
.appName("readExample")
.master("spark://spark-master:<port>")
.config("spark.jars", "<mongo-spark-connector-JAR-file-name>")
.getOrCreate()
// define the schema of the source collection
val readSchema = StructType()
.add("company_symbol", StringType())
.add("company_name", StringType())
.add("price", DoubleType())
.add("tx_time", TimestampType())
// define a streaming query
val dataStreamWriter = spark.readStream
.format("mongodb")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option("spark.mongodb.database", <database-name>)
.option("spark.mongodb.collection", <collection-name>)
.schema(readSchema)
.load()
// manipulate your streaming data
.writeStream
.format("console")
.trigger(Trigger.Continuous("1 second"))
.outputMode("append")
// run the query
val query = dataStreamWriter.start()

Importante

Inferindo o esquema de um fluxo de mudança

Se você definir a opção change.stream.publish.full.document.only como true, o Spark Connector inferirá o esquema de um DataFrame usando o esquema dos documentos digitalizados. Se você configurar a opção para false, deverá especificar um esquema.

Para obter mais informações sobre essa configuração e para ver uma lista completa de opções de configuração de change stream, consulte o guia Opções de configuração de leitura .

Para saber mais sobre os tipos usados nestes exemplos, consulte a seguinte documentação do Apache Spark API:

Voltar

Modo de transmissão