Docs Menu
Docs Home
/
MongoDB Spark Connector
/

Read from MongoDB in Streaming Mode

On this page

  • Overview
  • Example
  • API Documentation

When reading a stream from a MongoDB database, the MongoDB Spark Connector supports both micro-batch processing and continuous processing. Micro-batch processing, the default processing engine, achieves end-to-end latencies as low as 100 milliseconds with exactly-once fault-tolerance guarantees. Continuous processing is an experimental feature introduced in Spark version 2.3 that achieves end-to-end latencies as low as 1 millisecond with at-least-once guarantees.

To learn more about continuous processing, see the Spark documentation.

Note

The connector reads from your MongoDB deployment's change stream. To generate change events on the change stream, perform update operations on your database.

To learn more about change streams, see Change Streams in the MongoDB manual.

To read data from MongoDB, call the readStream() method on your SparkSession object. This method returns a DataStreamReader object, which you can use to specify the format and other configuration settings for your streaming read operation.

You must specify the following configuration settings to read from MongoDB:

Setting
Description
readStream.format()
Specifies the format of the underlying input data source. Use mongodb to read from MongoDB.
readStream.option()

Specifies stream settings, including the MongoDB deployment connection string, MongoDB database and collection, and aggregation pipeline stages.

For a list of read stream configuration options, see the Streaming Read Configuration Options guide.

readStream.schema()
Specifies the input schema.

The following code snippet shows how to use the preceding configuration settings to continuously process data streamed from MongoDB. The connector appends all new data to the existing data and asynchronously writes checkpoints to /tmp/checkpointDir once per second. Passing the Trigger.Continuous parameter to the trigger() method enables continuous processing.

import org.apache.spark.sql.streaming.Trigger;
Dataset<Row> streamingDataset = <local SparkSession>.readStream()
.format("mongodb")
.load();
DataStreamWriter<Row> dataStreamWriter = streamingDataset.writeStream()
.trigger(Trigger.Continuous("1 second"))
.format("memory")
.option("checkpointLocation", "/tmp/checkpointDir")
.outputMode("append");
StreamingQuery query = dataStreamWriter.start();

Note

Spark does not begin streaming until you call the start() method on a streaming query.

For a complete list of methods, see the Java Structured Streaming reference.

To read data from MongoDB, call the readStream function on your SparkSession object. This function returns a DataStreamReader object, which you can use to specify the format and other configuration settings for your streaming read operation.

You must specify the following configuration settings to read from MongoDB:

Setting
Description
readStream.format()
Specifies the format of the underlying input data source. Use mongodb to read from MongoDB.
readStream.option()

Specifies stream settings, including the MongoDB deployment connection string, MongoDB database and collection, and aggregation pipeline stages.

For a list of read stream configuration options, see the Streaming Read Configuration Options guide.

readStream.schema()
Specifies the input schema.

The following code snippet shows how to use the preceding configuration settings to continuously process data streamed from MongoDB. The connector appends all new data to the existing data and asynchronously writes checkpoints to /tmp/checkpointDir once per second. Passing the continuous parameter to the trigger() method enables continuous processing.

streamingDataFrame = (<local SparkSession>.readStream
.format("mongodb")
.load()
)
dataStreamWriter = (streamingDataFrame.writeStream
.trigger(continuous="1 second")
.format("memory")
.option("checkpointLocation", "/tmp/checkpointDir")
.outputMode("append")
)
query = dataStreamWriter.start()

Note

Spark does not begin streaming until you call the start() method on a streaming query.

For a complete list of methods, see the pyspark Structured Streaming reference.

To read data from MongoDB, call the readStream method on your SparkSession object. This method returns a DataStreamReader object, which you can use to specify the format and other configuration settings for your streaming read operation.

You must specify the following configuration settings to read from MongoDB:

Setting
Description
readStream.format()
Specifies the format of the underlying input data source. Use mongodb to read from MongoDB.
readStream.option()

Specifies stream settings, including the MongoDB deployment connection string, MongoDB database and collection, and aggregation pipeline stages.

For a list of read stream configuration options, see the Streaming Read Configuration Options guide.

readStream.schema()
Specifies the input schema.

The following code snippet shows how to use the preceding configuration settings to continuously process data streamed from MongoDB. The connector appends all new data to the existing data and asynchronously writes checkpoints to /tmp/checkpointDir once per second. Passing the Trigger.Continuous parameter to the trigger() method enables continuous processing.

import org.apache.spark.sql.streaming.Trigger
val streamingDataFrame = <local SparkSession>.readStream
.format("mongodb")
.load()
val dataStreamWriter = streamingDataFrame.writeStream
.trigger(Trigger.Continuous("1 second"))
.format("memory")
.option("checkpointLocation", "/tmp/checkpointDir")
.outputMode("append")
val query = dataStreamWriter.start()

Note

Spark does not begin streaming until you call the start() method on a streaming query.

For a complete list of methods, see the Scala Structured Streaming reference.

The following example shows how to stream data from MongoDB to your console.

  1. Create a DataStreamReader object that reads from MongoDB.

  2. Create a DataStreamWriter object by calling the writeStream() method on the streaming Dataset object that you created with a DataStreamReader. Specify the format console using the format() method.

  3. Call the start() method on the DataStreamWriter instance to begin the stream.

As new data is inserted into MongoDB, MongoDB streams that data out to your console according to the outputMode you specify.

Important

Avoid streaming large Datasets to your console. Streaming to your console is memory intensive and intended only for testing purposes.

// create a local SparkSession
SparkSession spark = SparkSession.builder()
.appName("readExample")
.master("spark://spark-master:<port>")
.config("spark.jars", "<mongo-spark-connector-JAR-file-name>")
.getOrCreate();
// define the schema of the source collection
StructType readSchema = new StructType()
.add("company_symbol", DataTypes.StringType)
.add("company_name", DataTypes.StringType)
.add("price", DataTypes.DoubleType)
.add("tx_time", DataTypes.TimestampType);
// define a streaming query
DataStreamWriter<Row> dataStreamWriter = spark.readStream()
.format("mongodb")
.option("spark.mongodb.connection.uri", "<mongodb-connection-string>")
.option("spark.mongodb.database", "<database-name>")
.option("spark.mongodb.collection", "<collection-name>")
.schema(readSchema)
.load()
// manipulate your streaming data
.writeStream()
.format("console")
.trigger(Trigger.Continuous("1 second"))
.outputMode("append");
// run the query
StreamingQuery query = dataStreamWriter.start();
  1. Create a DataStreamReader object that reads from MongoDB.

  2. Create a DataStreamWriter object by calling the writeStream() method on the streaming DataFrame that you created with a DataStreamReader. Specify the format console by using the format() method.

  3. Call the start() method on the DataStreamWriter instance to begin the stream.

As new data is inserted into MongoDB, MongoDB streams that data out to your console according to the outputMode you specify.

Important

Avoid streaming large Datasets to your console. Streaming to your console is memory intensive and intended only for testing purposes.

# create a local SparkSession
spark = SparkSession.builder \
.appName("readExample") \
.master("spark://spark-master:<port>") \
.config("spark.jars", "<mongo-spark-connector-JAR-file-name>") \
.getOrCreate()
# define the schema of the source collection
readSchema = (StructType()
.add('company_symbol', StringType())
.add('company_name', StringType())
.add('price', DoubleType())
.add('tx_time', TimestampType())
)
# define a streaming query
dataStreamWriter = (spark.readStream
.format("mongodb")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option('spark.mongodb.database', <database-name>)
.option('spark.mongodb.collection', <collection-name>)
.schema(readSchema)
.load()
# manipulate your streaming data
.writeStream
.format("console")
.trigger(continuous="1 second")
.outputMode("append")
)
# run the query
query = dataStreamWriter.start()
  1. Create a DataStreamReader object that reads from MongoDB.

  2. Create a DataStreamWriter object by calling the writeStream() method on the streaming DataFrame object that you created by using the DataStreamReader. Specify the format console by using the format() method.

  3. Call the start() method on the DataStreamWriter instance to begin the stream.

As new data is inserted into MongoDB, MongoDB streams that data out to your console according to the outputMode you specify.

Important

Avoid streaming large Datasets to your console. Streaming to your console is memory intensive and intended only for testing purposes.

// create a local SparkSession
val spark = SparkSession.builder
.appName("readExample")
.master("spark://spark-master:<port>")
.config("spark.jars", "<mongo-spark-connector-JAR-file-name>")
.getOrCreate()
// define the schema of the source collection
val readSchema = StructType()
.add("company_symbol", StringType())
.add("company_name", StringType())
.add("price", DoubleType())
.add("tx_time", TimestampType())
// define a streaming query
val dataStreamWriter = spark.readStream
.format("mongodb")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option("spark.mongodb.database", <database-name>)
.option("spark.mongodb.collection", <collection-name>)
.schema(readSchema)
.load()
// manipulate your streaming data
.writeStream
.format("console")
.trigger(Trigger.Continuous("1 second"))
.outputMode("append")
// run the query
val query = dataStreamWriter.start()

Important

Inferring the Schema of a Change Stream

When the Spark Connector infers the schema of a DataFrame read from a change stream, by default, it uses the schema of the underlying collection rather than that of the change stream. If you set the change.stream.publish.full.document.only option to true, the connector uses the schema of the change stream instead.

For more information about this setting, and to see a full list of change stream configuration options, see the Read Configuration Options guide.

To learn more about the types used in these examples, see the following Apache Spark API documentation:

Back

Streaming Mode