Ler do MongoDB no modo de transmissão
Nesta página
Visão geral
Ao ler um fluxo de um MongoDB database, o Spark Connector oferece suporte ao processamento em microlote e ao processamento contínuo. O processamento em micro-lote, o mecanismo de processamento padrão, atinge latências de ponta a ponta de apenas 100 milissegundos com garantias de tolerância a falhas exatamente uma vez. O processamento contínuo é um recurso experimental introduzido no Spark versão 2.3 que atinge latências de ponta a ponta tão baixas quanto 1 milissegundo com garantias de pelo menos uma vez.
Para saber mais sobre o processamento contínuo, consulte a documentação do Spark .
Observação
O conector lê o fluxo de mudança do sistema do MongoDB. Para gerar eventos de alteração no fluxo de alteração, realize operações de atualização em seu banco de dados.
Para saber mais sobre fluxos de alterações, consulte Fluxos de alterações no manual do MongoDB.
Para ler dados do MongoDB, chame o método readStream()
em seu objeto SparkSession
. Este método gera um objeto DataStreamReader
, que você pode usar para especificar o formato e outras definições de configuração para sua operação de leitura e streaming.
Você deve especificar as seguintes definições de configuração para ler do MongoDB:
Contexto | Descrição |
---|---|
readStream.format() | Especifica o formato da fonte de dados de entrada subjacente. Use mongodb para ler a partir do MongoDB. |
readStream.option() | Especifica as configurações de fluxo, incluindo a string de conexão de implantação do MongoDB , o banco de banco de dados e a coleção do MongoDB e os estágios do pipeline de agregação . Para obter uma lista de opções de configuração de fluxo de leitura, consulte o guia Opções de configuração de leitura de fluxo de leitura. |
readStream.schema() | Especifica o esquema de entrada. |
O seguinte trecho de código mostra como usar as definições de configuração anteriores para processar continuamente dados transmitidos do MongoDB. O connector acrescenta todos os novos dados aos dados existentes e grava de forma assíncrona os checkpoint no /tmp/checkpointDir
uma vez por segundo. Passar o parâmetro Trigger.Continuous
para o método trigger()
habilita o processamento contínuo.
import org.apache.spark.sql.streaming.Trigger; Dataset<Row> streamingDataset = <local SparkSession>.readStream() .format("mongodb") .load(); DataStreamWriter<Row> dataStreamWriter = streamingDataset.writeStream() .trigger(Trigger.Continuous("1 second")) .format("memory") .option("checkpointLocation", "/tmp/checkpointDir") .outputMode("append"); StreamingQuery query = dataStreamWriter.start();
Observação
O Spark não inicia a transmissão até que você chame o método start()
em uma query de transmissão.
Para obter uma lista completa dos métodos, consulte a referência Streaming estruturado Java .
Para ler dados do MongoDB, ligue para a função readStream
em seu objeto SparkSession
. Esta função retorna um objeto DataStreamReader
, que você pode utilizar para especificar o formato e outras definições de configuração para sua operação de leitura de streaming.
Você deve especificar as seguintes definições de configuração para ler do MongoDB:
Contexto | Descrição |
---|---|
readStream.format() | Especifica o formato da fonte de dados de entrada subjacente. Use mongodb para ler a partir do MongoDB. |
readStream.option() | Especifica as configurações de fluxo, incluindo a string de conexão de implantação do MongoDB , o banco de banco de dados e a coleção do MongoDB e os estágios do pipeline de agregação . Para obter uma lista de opções de configuração de fluxo de leitura, consulte o guia Opções de configuração de leitura de streaming . |
readStream.schema() | Especifica o esquema de entrada. |
O seguinte trecho de código mostra como usar as definições de configuração anteriores para processar continuamente dados transmitidos do MongoDB. O connector acrescenta todos os novos dados aos dados existentes e grava de forma assíncrona os checkpoint no /tmp/checkpointDir
uma vez por segundo. Passar o parâmetro continuous
para o método trigger()
habilita o processamento contínuo.
streamingDataFrame = (<local SparkSession>.readStream .format("mongodb") .load() ) dataStreamWriter = (streamingDataFrame.writeStream .trigger(continuous="1 second") .format("memory") .option("checkpointLocation", "/tmp/checkpointDir") .outputMode("append") ) query = dataStreamWriter.start()
Observação
O Spark não inicia a transmissão até que você chame o método start()
em uma query de transmissão.
Para obter uma lista completa de métodos, consulte a referência de transmissão estruturada do pyspark.
Para ler dados do MongoDB, chame o método readStream
em seu objeto SparkSession
. Este método retorna um objeto DataStreamReader
, que você pode utilizar para especificar o formato e outras definições de configuração para sua operação de leitura de streaming.
Você deve especificar as seguintes definições de configuração para ler do MongoDB:
Contexto | Descrição |
---|---|
readStream.format() | Especifica o formato da fonte de dados de entrada subjacente. Use mongodb para ler a partir do MongoDB. |
readStream.option() | Especifica as configurações de fluxo, incluindo a string de conexão de implantação do MongoDB , o banco de banco de dados e a coleção do MongoDB e os estágios do pipeline de agregação . Para obter uma lista de opções de configuração de fluxo de leitura, consulte o guia Opções de configuração de leitura de streaming . |
readStream.schema() | Especifica o esquema de entrada. |
O seguinte trecho de código mostra como usar as definições de configuração anteriores para processar continuamente dados transmitidos do MongoDB. O connector acrescenta todos os novos dados aos dados existentes e grava de forma assíncrona os checkpoint no /tmp/checkpointDir
uma vez por segundo. Passar o parâmetro Trigger.Continuous
para o método trigger()
habilita o processamento contínuo.
import org.apache.spark.sql.streaming.Trigger val streamingDataFrame = <local SparkSession>.readStream .format("mongodb") .load() val dataStreamWriter = streamingDataFrame.writeStream .trigger(Trigger.Continuous("1 second")) .format("memory") .option("checkpointLocation", "/tmp/checkpointDir") .outputMode("append") val query = dataStreamWriter.start()
Observação
O Spark não inicia a transmissão até que você chame o método start()
em uma query de transmissão.
Para obter uma lista completa dos métodos, consulte a referência de transmissão estruturada do Scala.
Exemplo
O exemplo a seguir mostra como transmitir dados do MongoDB para o seu console.
Crie um objeto
DataStreamReader
que leia do MongoDB.Crie um objeto
DataStreamWriter
chamando o métodowriteStream()
no objeto de streamingDataset
que você criou com umDataStreamReader
. Especifique o formatoconsole
utilizando o métodoformat()
.Chame o método
start()
na instânciaDataStreamWriter
para iniciar o stream.
À medida que novos dados são inseridos no MongoDB, o MongoDB transmite esses dados para o seu console de acordo com o outputMode
que você especifica.
Importante
Evite a transmissão de grandes conjuntos de dados para o seu console. A transmissão para o console consome muita memória e destina-se apenas a fins de teste.
// create a local SparkSession SparkSession spark = SparkSession.builder() .appName("readExample") .master("spark://spark-master:<port>") .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") .getOrCreate(); // define the schema of the source collection StructType readSchema = new StructType() .add("company_symbol", DataTypes.StringType) .add("company_name", DataTypes.StringType) .add("price", DataTypes.DoubleType) .add("tx_time", DataTypes.TimestampType); // define a streaming query DataStreamWriter<Row> dataStreamWriter = spark.readStream() .format("mongodb") .option("spark.mongodb.connection.uri", "<mongodb-connection-string>") .option("spark.mongodb.database", "<database-name>") .option("spark.mongodb.collection", "<collection-name>") .schema(readSchema) .load() // manipulate your streaming data .writeStream() .format("console") .trigger(Trigger.Continuous("1 second")) .outputMode("append"); // run the query StreamingQuery query = dataStreamWriter.start();
Crie um objeto
DataStreamReader
que leia do MongoDB.Crie um objeto
DataStreamWriter
chamando o métodowriteStream()
noDataFrame
streaming que você criou com umDataStreamReader
. Especifique o formatoconsole
usando o métodoformat()
.Chame o método
start()
na instânciaDataStreamWriter
para iniciar o stream.
À medida que novos dados são inseridos no MongoDB, o MongoDB transmite esses dados para o seu console de acordo com o outputMode
que você especifica.
Importante
Evite a transmissão de grandes conjuntos de dados para o seu console. A transmissão para o console consome muita memória e destina-se apenas a fins de teste.
# create a local SparkSession spark = SparkSession.builder \ .appName("readExample") \ .master("spark://spark-master:<port>") \ .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") \ .getOrCreate() # define the schema of the source collection readSchema = (StructType() .add('company_symbol', StringType()) .add('company_name', StringType()) .add('price', DoubleType()) .add('tx_time', TimestampType()) ) # define a streaming query dataStreamWriter = (spark.readStream .format("mongodb") .option("spark.mongodb.connection.uri", <mongodb-connection-string>) .option('spark.mongodb.database', <database-name>) .option('spark.mongodb.collection', <collection-name>) .schema(readSchema) .load() # manipulate your streaming data .writeStream .format("console") .trigger(continuous="1 second") .outputMode("append") ) # run the query query = dataStreamWriter.start()
Crie um objeto
DataStreamReader
que leia do MongoDB.Crie um objeto
DataStreamWriter
chamando o métodowriteStream()
no objetoDataFrame
streaming que você criou usando oDataStreamReader
. Especifique o formatoconsole
usando o métodoformat()
.Chame o método
start()
na instânciaDataStreamWriter
para iniciar o stream.
À medida que novos dados são inseridos no MongoDB, o MongoDB transmite esses dados para o seu console de acordo com o outputMode
que você especifica.
Importante
Evite a transmissão de grandes conjuntos de dados para o seu console. A transmissão para o console consome muita memória e destina-se apenas a fins de teste.
// create a local SparkSession val spark = SparkSession.builder .appName("readExample") .master("spark://spark-master:<port>") .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") .getOrCreate() // define the schema of the source collection val readSchema = StructType() .add("company_symbol", StringType()) .add("company_name", StringType()) .add("price", DoubleType()) .add("tx_time", TimestampType()) // define a streaming query val dataStreamWriter = spark.readStream .format("mongodb") .option("spark.mongodb.connection.uri", <mongodb-connection-string>) .option("spark.mongodb.database", <database-name>) .option("spark.mongodb.collection", <collection-name>) .schema(readSchema) .load() // manipulate your streaming data .writeStream .format("console") .trigger(Trigger.Continuous("1 second")) .outputMode("append") // run the query val query = dataStreamWriter.start()
Importante
Inferindo o esquema de um fluxo de mudança
Se você definir a opção change.stream.publish.full.document.only
como true
, o Spark Connector inferirá o esquema de um DataFrame
usando o esquema dos documentos digitalizados. Se você configurar a opção para false
, deverá especificar um esquema.
Para obter mais informações sobre essa configuração e para ver uma lista completa de opções de configuração de change stream, consulte o guia Opções de configuração de leitura .
Documentação da API
Para saber mais sobre os tipos usados nestes exemplos, consulte a seguinte documentação do Apache Spark API: