Docs Menu
Docs Home
/
MongoDB Spark Connector
/

Write to MongoDB in Streaming Mode

To write data to MongoDB, call the writeStream() method on your Dataset<Row> object. This method returns a DataStreamWriter object, which you can use to specify the format and other configuration settings for your streaming write operation.

You must specify the following configuration settings to write to MongoDB:

Setting
Description
writeStream.format()
Specifies the format of the underlying output data source. Use mongodb to write to MongoDB.
writeStream.option()

Specifies stream settings, including the MongoDB deployment connection string, MongoDB database and collection, and checkpoint directory.

For a list of write stream configuration options, see the Streaming Write Configuration Options guide.

writeStream.outputMode()
Specifies how data of a streaming DataFrame is written to a streaming sink. To view a list of all supported output modes, see the Java outputMode documentation.
writeStream.trigger()

Specifies how often the Spark Connector writes results to the streaming sink. Call this method on the DataStreamWriter object you create from the DataStreamReader you configure.

To use continuous processing, pass Trigger.Continuous(<time value>) as an argument, where <time value> is how often you want the Spark Connector to asynchronously checkpoint. If you pass any other static method of the Trigger class, or if you don't call writeStream.trigger(), the Spark connector uses micro-batch processing instead.

To view a list of all supported processing policies, see the Java trigger documentation.

The following code snippet shows how to use the previous configuration settings to stream data to MongoDB:

<streaming DataFrame>.writeStream()
.format("mongodb")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option("spark.mongodb.database", <database-name>)
.option("spark.mongodb.collection", <collection-name>)
.outputMode("append");

For a complete list of methods, see the Java Structured Streaming reference.

To write data to MongoDB, call the writeStream function on your DataFrame object. This function returns a DataStreamWriter object, which you can use to specify the format and other configuration settings for your streaming write operation.

You must specify the following configuration settings to write to MongoDB:

Setting
Description
writeStream.format()
Specifies the format of the underlying output data source. Use mongodb to write to MongoDB.
writeStream.option()

Specifies stream settings, including the MongoDB deployment connection string, MongoDB database and collection, and checkpoint directory.

For a list of write stream configuration options, see the Streaming Write Configuration Options guide.

writeStream.outputMode()
Specifies how the Spark Connector writes a streaming DataFrame to a streaming sink. To view a list of all supported output modes, see the pyspark outputMode documentation.
writeStream.trigger()

Specifies how often the Spark Connector writes results to the streaming sink. Call this method on the DataStreamWriter object you create from the DataStreamReader you configure.

To use continuous processing, pass the function a time value using the continuous parameter. If you pass any other named parameter, or if you don't call writeStream.trigger(), the Spark Connector uses micro-batch processing instead.

To view a list of all supported processing policies, see the pyspark trigger documentation.

The following code snippet shows how to use the previous configuration settings to stream data to MongoDB:

<streaming DataFrame>.writeStream \
.format("mongodb") \
.option("spark.mongodb.connection.uri", <mongodb-connection-string>) \
.option("spark.mongodb.database", <database-name>) \
.option("spark.mongodb.collection", <collection-name>) \
.outputMode("append")

For a complete list of functions, see the pyspark Structured Streaming reference.

To write data to MongoDB, call the write method on your DataFrame object. This method returns a DataStreamWriter object, which you can use to specify the format and other configuration settings for your streaming write operation.

You must specify the following configuration settings to write to MongoDB:

Setting
Description
writeStream.format()
Specifies the format of the underlying output data source. Use mongodb to write to MongoDB.
writeStream.option()

Specifies stream settings, including the MongoDB deployment connection string, MongoDB database and collection, and checkpoint directory.

For a list of write stream configuration options, see the Streaming Write Configuration Options guide.

writeStream.outputMode()
Specifies how the Spark Connector writes a streaming DataFrame to a streaming sink. To view a list of all supported output modes, see Scala outputMode documentation.
writeStream.trigger()

Specifies how often the Spark Connector writes results to the streaming sink. Call this method on the DataStreamWriter object you create from the DataStreamReader you configure.

To use continuous processing, pass Trigger.Continuous(<time value>) as an argument, where <time value> is how often you want the Spark Connector to asynchronously checkpoint. If you pass any other static method of the Trigger class, or if you don't call writeStream.trigger(), the Spark connector uses micro-batch processing instead.

To view a list of all supported processing policies, see the Scala trigger documentation.

The following code snippet shows how to use the previous configuration settings to stream data to MongoDB:

<streaming DataFrame>.writeStream
.format("mongodb")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option("spark.mongodb.database", <database-name>)
.option("spark.mongodb.collection", <collection-name>)
.outputMode("append")

For a complete list of methods, see the Scala Structured Streaming reference.

The following example shows how to stream data from a CSV file to MongoDB:

  1. Create a DataStreamReader object that reads from the CSV file.

  2. To create a DataStreamWriter object, call the writeStream() method on the streaming Dataset<Row> that you created with the DataStreamReader. Use the format() method to specify mongodb as the underlying data format.

  3. Call the start() method on the DataStreamWriter object to begin the stream.

As the connector reads data from the CSV file, it adds that data to MongoDB according to the outputMode you specify.

// create a local SparkSession
SparkSession spark = SparkSession.builder()
.appName("writeExample")
.master("spark://spark-master:<port>")
.config("spark.jars", "<mongo-spark-connector-JAR-file-name>")
.getOrCreate();
// define a streaming query
DataStreamWriter<Row> dataStreamWriter = spark.readStream()
.format("csv")
.option("header", "true")
.schema("<csv-schema>")
.load("<csv-file-name>")
// manipulate your streaming data
.writeStream()
.format("mongodb")
.option("checkpointLocation", "/tmp/")
.option("forceDeleteTempCheckpointLocation", "true")
.option("spark.mongodb.connection.uri", "<mongodb-connection-string>")
.option("spark.mongodb.database", "<database-name>")
.option("spark.mongodb.collection", "<collection-name>")
.outputMode("append");
// run the query
StreamingQuery query = dataStreamWriter.start();
  1. Create a DataStreamReader object that reads from the CSV file.

  2. To create a DataStreamWriter object, call the writeStream function on the streaming DataFrame that you created with the DataStreamReader. Use the format() function to specify mongodb as the underlying data format.

  3. Call the start() function on the DataStreamWriter instance to begin the stream.

As the connector reads data from the CSV file, it adds that data to MongoDB according to the outputMode you specify.

# create a local SparkSession
spark = SparkSession.builder \
.appName("writeExample") \
.master("spark://spark-master:<port>") \
.config("spark.jars", "<mongo-spark-connector-JAR-file-name>") \
.getOrCreate()
# define a streaming query
dataStreamWriter = (spark.readStream
.format("csv")
.option("header", "true")
.schema(<csv-schema>)
.load(<csv-file-name>)
# manipulate your streaming data
.writeStream
.format("mongodb")
.option("checkpointLocation", "/tmp/pyspark/")
.option("forceDeleteTempCheckpointLocation", "true")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option("spark.mongodb.database", <database-name>)
.option("spark.mongodb.collection", <collection-name>)
.outputMode("append")
)
# run the query
query = dataStreamWriter.start()
  1. Create a DataStreamReader object that reads from the CSV file.

  2. To create a DataStreamWriter object, call the writeStream method on the streaming DataFrame that you created with the DataStreamReader. Use the format() method to specify mongodb as the underlying data format.

  3. Call the start() method on the DataStreamWriter instance to begin the stream.

As the connector reads data from the CSV file, it adds that data to MongoDB according to the outputMode you specify.

// create a local SparkSession
val spark = SparkSession.builder
.appName("writeExample")
.master("spark://spark-master:<port>")
.config("spark.jars", "<mongo-spark-connector-JAR-file-name>")
.getOrCreate()
// define a streaming query
val dataStreamWriter = spark.readStream
.format("csv")
.option("header", "true")
.schema(<csv-schema>)
.load(<csv-file-name>)
// manipulate your streaming data
.writeStream
.format("mongodb")
.option("checkpointLocation", "/tmp/")
.option("forceDeleteTempCheckpointLocation", "true")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option("spark.mongodb.database", <database-name>)
.option("spark.mongodb.collection", <collection-name>)
.outputMode("append")
// run the query
val query = dataStreamWriter.start()

To learn more about the types used in these examples, see the following Apache Spark API documentation:

Back

Streaming Read Configuration Options