Write to MongoDB in Streaming Mode
To write data to MongoDB, call the writeStream()
method on your
Dataset<Row>
object. This method returns a
DataStreamWriter
object, which you can use to specify the format and other configuration settings
for your streaming write operation.
You must specify the following configuration settings to write to MongoDB:
Setting | Description |
---|---|
writeStream.format() | Specifies the format of the underlying output data source. Use mongodb
to write to MongoDB. |
writeStream.option() | Specifies stream settings, including the MongoDB deployment connection string, MongoDB database and collection, and checkpoint directory. For a list of write stream configuration options, see the Streaming Write Configuration Options guide. |
writeStream.outputMode() | Specifies how data of a streaming DataFrame is
written to a streaming sink. To view a list of all
supported output modes, see the Java outputMode documentation. |
writeStream.trigger() | Specifies how often the Spark Connector writes results
to the streaming sink. Call this method on the To use continuous processing, pass To view a list of all supported processing policies, see the Java trigger documentation. |
The following code snippet shows how to use the previous configuration settings to stream data to MongoDB:
<streaming DataFrame>.writeStream() .format("mongodb") .option("spark.mongodb.connection.uri", <mongodb-connection-string>) .option("spark.mongodb.database", <database-name>) .option("spark.mongodb.collection", <collection-name>) .outputMode("append");
For a complete list of methods, see the Java Structured Streaming reference.
To write data to MongoDB, call the writeStream
function on your
DataFrame
object. This function returns a
DataStreamWriter
object, which you can use to specify the format and other configuration settings for your
streaming write operation.
You must specify the following configuration settings to write to MongoDB:
Setting | Description |
---|---|
writeStream.format() | Specifies the format of the underlying output data source. Use mongodb
to write to MongoDB. |
writeStream.option() | Specifies stream settings, including the MongoDB deployment connection string, MongoDB database and collection, and checkpoint directory. For a list of write stream configuration options, see the Streaming Write Configuration Options guide. |
writeStream.outputMode() | Specifies how the Spark Connector writes a streaming DataFrame
to a streaming sink. To view a list of all
supported output modes, see the pyspark outputMode documentation. |
writeStream.trigger() | Specifies how often the Spark Connector writes results
to the streaming sink. Call this method on the To use continuous processing, pass the function a time value
using the To view a list of all supported processing policies, see the pyspark trigger documentation. |
The following code snippet shows how to use the previous configuration settings to stream data to MongoDB:
<streaming DataFrame>.writeStream \ .format("mongodb") \ .option("spark.mongodb.connection.uri", <mongodb-connection-string>) \ .option("spark.mongodb.database", <database-name>) \ .option("spark.mongodb.collection", <collection-name>) \ .outputMode("append")
For a complete list of functions, see the pyspark Structured Streaming reference.
To write data to MongoDB, call the write
method on your
DataFrame
object. This method returns a
DataStreamWriter
object, which you can use to specify the format and other configuration settings
for your streaming write operation.
You must specify the following configuration settings to write to MongoDB:
Setting | Description |
---|---|
writeStream.format() | Specifies the format of the underlying output data source. Use mongodb
to write to MongoDB. |
writeStream.option() | Specifies stream settings, including the MongoDB deployment connection string, MongoDB database and collection, and checkpoint directory. For a list of write stream configuration options, see the Streaming Write Configuration Options guide. |
writeStream.outputMode() | Specifies how the Spark Connector writes a streaming DataFrame
to a streaming sink. To view a list of all supported output modes, see
Scala outputMode documentation. |
writeStream.trigger() | Specifies how often the Spark Connector writes results
to the streaming sink. Call this method on the To use continuous processing, pass To view a list of all supported processing policies, see the Scala trigger documentation. |
The following code snippet shows how to use the previous configuration settings to stream data to MongoDB:
<streaming DataFrame>.writeStream .format("mongodb") .option("spark.mongodb.connection.uri", <mongodb-connection-string>) .option("spark.mongodb.database", <database-name>) .option("spark.mongodb.collection", <collection-name>) .outputMode("append")
For a complete list of methods, see the Scala Structured Streaming reference.
Example
The following example shows how to stream data from a CSV file to MongoDB:
Create a
DataStreamReader
object that reads from the CSV file.To create a
DataStreamWriter
object, call thewriteStream()
method on the streamingDataset<Row>
that you created with theDataStreamReader
. Use theformat()
method to specifymongodb
as the underlying data format.Call the
start()
method on theDataStreamWriter
object to begin the stream.
As the connector reads data from the CSV file, it adds that
data to MongoDB according to the outputMode
you specify.
// create a local SparkSession SparkSession spark = SparkSession.builder() .appName("writeExample") .master("spark://spark-master:<port>") .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") .getOrCreate(); // define a streaming query DataStreamWriter<Row> dataStreamWriter = spark.readStream() .format("csv") .option("header", "true") .schema("<csv-schema>") .load("<csv-file-name>") // manipulate your streaming data .writeStream() .format("mongodb") .option("checkpointLocation", "/tmp/") .option("forceDeleteTempCheckpointLocation", "true") .option("spark.mongodb.connection.uri", "<mongodb-connection-string>") .option("spark.mongodb.database", "<database-name>") .option("spark.mongodb.collection", "<collection-name>") .outputMode("append"); // run the query StreamingQuery query = dataStreamWriter.start();
Create a
DataStreamReader
object that reads from the CSV file.To create a
DataStreamWriter
object, call thewriteStream
function on the streamingDataFrame
that you created with theDataStreamReader
. Use theformat()
function to specifymongodb
as the underlying data format.Call the
start()
function on theDataStreamWriter
instance to begin the stream.
As the connector reads data from the CSV file, it adds that
data to MongoDB according to the outputMode
you specify.
# create a local SparkSession spark = SparkSession.builder \ .appName("writeExample") \ .master("spark://spark-master:<port>") \ .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") \ .getOrCreate() # define a streaming query dataStreamWriter = (spark.readStream .format("csv") .option("header", "true") .schema(<csv-schema>) .load(<csv-file-name>) # manipulate your streaming data .writeStream .format("mongodb") .option("checkpointLocation", "/tmp/pyspark/") .option("forceDeleteTempCheckpointLocation", "true") .option("spark.mongodb.connection.uri", <mongodb-connection-string>) .option("spark.mongodb.database", <database-name>) .option("spark.mongodb.collection", <collection-name>) .outputMode("append") ) # run the query query = dataStreamWriter.start()
Create a
DataStreamReader
object that reads from the CSV file.To create a
DataStreamWriter
object, call thewriteStream
method on the streamingDataFrame
that you created with theDataStreamReader
. Use theformat()
method to specifymongodb
as the underlying data format.Call the
start()
method on theDataStreamWriter
instance to begin the stream.
As the connector reads data from the CSV file, it adds that
data to MongoDB according to the outputMode
you specify.
// create a local SparkSession val spark = SparkSession.builder .appName("writeExample") .master("spark://spark-master:<port>") .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") .getOrCreate() // define a streaming query val dataStreamWriter = spark.readStream .format("csv") .option("header", "true") .schema(<csv-schema>) .load(<csv-file-name>) // manipulate your streaming data .writeStream .format("mongodb") .option("checkpointLocation", "/tmp/") .option("forceDeleteTempCheckpointLocation", "true") .option("spark.mongodb.connection.uri", <mongodb-connection-string>) .option("spark.mongodb.database", <database-name>) .option("spark.mongodb.collection", <collection-name>) .outputMode("append") // run the query val query = dataStreamWriter.start()
API Documentation
To learn more about the types used in these examples, see the following Apache Spark API documentation: