Gravar no MongoDB no modo de transmissão
Para escrever dados no MongoDB, chame o método writeStream()
em seu objeto Dataset<Row>
. Este método retorna um objeto DataStreamWriter
, que você pode usar para especificar o formato e outras definições de configuração para sua operação de gravação de streaming.
Você deve especificar as seguintes definições de configuração para gravar no MongoDB:
Contexto | Descrição |
---|---|
writeStream.format() | Especifica o formato da fonte de dados de saída subjacente. Use mongodb para escrever no MongoDB. |
writeStream.option() | Especifica as configurações de stream, incluindo astring de conexão do MongoDB deployment, o banco de banco de dados e a collection do MongoDB e o diretório de checkpoint . Para obter uma lista de opções de configuração de fluxo de gravação, consulte o guia Opções de configuração de gravação de streaming . |
writeStream.outputMode() | Especifica como os dados de um DataFrame de streaming são gravados em um coletor de streaming. Para visualizar uma lista de todos os modos de saída suportados, consulte a documentação Java outputMode. |
writeStream.trigger() | Especifica com que frequência o Spark Connector grava resultados no coletor de streaming. Chame esse método no objeto Para usar o processamento contínuo, passe Para visualizar uma lista de todas as políticas de processamento suportadas, consulte a documentação do Java trigger. |
O seguinte trecho de código mostra como usar as definições de configuração anteriores para transmitir dados para o MongoDB:
<streaming DataFrame>.writeStream() .format("mongodb") .option("spark.mongodb.connection.uri", <mongodb-connection-string>) .option("spark.mongodb.database", <database-name>) .option("spark.mongodb.collection", <collection-name>) .outputMode("append");
Para obter uma lista completa dos métodos, consulte a referência Streaming estruturado Java .
Para escrever dados no MongoDB, chame a função writeStream
no seu objeto DataFrame
. Essa função retorna um objeto DataStreamWriter
, que você pode usar para especificar o formato e outras definições de configuração para sua operação de gravação de streaming.
Você deve especificar as seguintes definições de configuração para gravar no MongoDB:
Contexto | Descrição |
---|---|
writeStream.format() | Especifica o formato da fonte de dados de saída subjacente. Use mongodb para escrever no MongoDB. |
writeStream.option() | Especifica as configurações de fluxo, incluindo a string de conexão de implantação do MongoDB , o banco de banco de dados e a coleção do MongoDB e o diretório de checkpoint de verificação. Para obter uma lista de opções de configuração de fluxo de gravação, consulte o guia Opções de configuração de fluxo de gravação . |
writeStream.outputMode() | Especifica como o Spark Connector grava um DataFrame de streaming em um coletor de streaming. Para exibir uma lista de todos os modos de saída compatíveis, consulte a documentação do pyspark outputMode. |
writeStream.trigger() | Especifica com que frequência o Spark Connector grava resultados no coletor de streaming. Chame esse método no objeto Para utilizar o processamento contínuo, passe à função um valor de tempo utilizando o parâmetro Para ver uma lista de todas as políticas de processamento compatíveis, consulte a documentação do trigger pyspark. |
O seguinte trecho de código mostra como usar as definições de configuração anteriores para transmitir dados para o MongoDB:
<streaming DataFrame>.writeStream \ .format("mongodb") \ .option("spark.mongodb.connection.uri", <mongodb-connection-string>) \ .option("spark.mongodb.database", <database-name>) \ .option("spark.mongodb.collection", <collection-name>) \ .outputMode("append")
Para obter uma lista completa de funções, consulte a referência de transmissão estruturada do pyspark.
Para escrever dados no MongoDB, chame o método write
em seu objeto DataFrame
. Este método retorna um objeto DataStreamWriter
, que você pode usar para especificar o formato e outras definições de configuração para sua operação de gravação de streaming.
Você deve especificar as seguintes definições de configuração para gravar no MongoDB:
Contexto | Descrição |
---|---|
writeStream.format() | Especifica o formato da fonte de dados de saída subjacente. Use mongodb para escrever no MongoDB. |
writeStream.option() | Especifica as configurações de fluxo, incluindo a string de conexão de implantação do MongoDB , o banco de banco de dados e a coleção do MongoDB e o diretório de checkpoint de verificação. Para obter uma lista de opções de configuração de fluxo de gravação, consulte o guia Opções de configuração de fluxo de gravação . |
writeStream.outputMode() | Especifica como o Spark Connector grava um DataFrame de streaming em um coletor de streaming. Para visualizar uma lista de todos os modos de saída suportados, consulte a documentação do Scala outputMode. |
writeStream.trigger() | Especifica com que frequência o Spark Connector grava resultados no coletor de streaming. Chame esse método no objeto Para usar o processamento contínuo, passe Para visualizar uma lista de todas as políticas de processamento suportadas, consulte a documentação do trigger Scala. |
O seguinte trecho de código mostra como usar as definições de configuração anteriores para transmitir dados para o MongoDB:
<streaming DataFrame>.writeStream .format("mongodb") .option("spark.mongodb.connection.uri", <mongodb-connection-string>) .option("spark.mongodb.database", <database-name>) .option("spark.mongodb.collection", <collection-name>) .outputMode("append")
Para obter uma lista completa dos métodos, consulte a referência de transmissão estruturada do Scala.
Exemplo
O exemplo a seguir mostra como transmitir dados de um arquivoCSV do para MongoDB:
Crie um objeto
DataStreamReader
que leia a partir do arquivo CSV.Para criar um objeto
DataStreamWriter
, chame o métodowriteStream()
na transmissãoDataset<Row>
que você criou com oDataStreamReader
. Utilize o métodoformat()
para especificarmongodb
como o formato de dados subjacente.Ligue para o método
start()
no objetoDataStreamWriter
para iniciar o stream.
Conforme o conector lê dados do arquivo CSV, ele adiciona esses dados ao MongoDB de acordo com o outputMode
que você especificar.
// create a local SparkSession SparkSession spark = SparkSession.builder() .appName("writeExample") .master("spark://spark-master:<port>") .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") .getOrCreate(); // define a streaming query DataStreamWriter<Row> dataStreamWriter = spark.readStream() .format("csv") .option("header", "true") .schema("<csv-schema>") .load("<csv-file-name>") // manipulate your streaming data .writeStream() .format("mongodb") .option("checkpointLocation", "/tmp/") .option("forceDeleteTempCheckpointLocation", "true") .option("spark.mongodb.connection.uri", "<mongodb-connection-string>") .option("spark.mongodb.database", "<database-name>") .option("spark.mongodb.collection", "<collection-name>") .outputMode("append"); // run the query StreamingQuery query = dataStreamWriter.start();
Crie um objeto
DataStreamReader
que leia a partir do arquivo CSV.Para criar um objeto
DataStreamWriter
, chame a funçãowriteStream
no streamingDataFrame
que você criou com oDataStreamReader
. Utilize a funçãoformat()
para especificarmongodb
como o formato de dados subjacente.Ligue para a função
start()
na instânciaDataStreamWriter
para iniciar o fluxo.
Conforme o conector lê dados do arquivo CSV, ele adiciona esses dados ao MongoDB de acordo com o outputMode
que você especificar.
# create a local SparkSession spark = SparkSession.builder \ .appName("writeExample") \ .master("spark://spark-master:<port>") \ .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") \ .getOrCreate() # define a streaming query dataStreamWriter = (spark.readStream .format("csv") .option("header", "true") .schema(<csv-schema>) .load(<csv-file-name>) # manipulate your streaming data .writeStream .format("mongodb") .option("checkpointLocation", "/tmp/pyspark/") .option("forceDeleteTempCheckpointLocation", "true") .option("spark.mongodb.connection.uri", <mongodb-connection-string>) .option("spark.mongodb.database", <database-name>) .option("spark.mongodb.collection", <collection-name>) .outputMode("append") ) # run the query query = dataStreamWriter.start()
Crie um objeto
DataStreamReader
que leia a partir do arquivo CSV.Para criar um objeto
DataStreamWriter
, chame o métodowriteStream
na transmissãoDataFrame
que você criou com oDataStreamReader
. Utilize o métodoformat()
para especificarmongodb
como o formato de dados subjacente.Chame o método
start()
na instânciaDataStreamWriter
para iniciar o stream.
Conforme o conector lê dados do arquivo CSV, ele adiciona esses dados ao MongoDB de acordo com o outputMode
que você especificar.
// create a local SparkSession val spark = SparkSession.builder .appName("writeExample") .master("spark://spark-master:<port>") .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") .getOrCreate() // define a streaming query val dataStreamWriter = spark.readStream .format("csv") .option("header", "true") .schema(<csv-schema>) .load(<csv-file-name>) // manipulate your streaming data .writeStream .format("mongodb") .option("checkpointLocation", "/tmp/") .option("forceDeleteTempCheckpointLocation", "true") .option("spark.mongodb.connection.uri", <mongodb-connection-string>) .option("spark.mongodb.database", <database-name>) .option("spark.mongodb.collection", <collection-name>) .outputMode("append") // run the query val query = dataStreamWriter.start()
Documentação da API
Para saber mais sobre os tipos usados nestes exemplos, consulte a seguinte documentação do Apache Spark API: