Read from MongoDB in Streaming Mode
On this page
Overview
When reading a stream from a MongoDB database, the MongoDB Spark Connector supports both micro-batch processing and continuous processing. Micro-batch processing, the default processing engine, achieves end-to-end latencies as low as 100 milliseconds with exactly-once fault-tolerance guarantees. Continuous processing is an experimental feature introduced in Spark version 2.3 that achieves end-to-end latencies as low as 1 millisecond with at-least-once guarantees.
To learn more about continuous processing, see the Spark documentation.
Note
The connector reads from your MongoDB deployment's change stream. To generate change events on the change stream, perform update operations on your database.
To learn more about change streams, see Change Streams in the MongoDB manual.
To read data from MongoDB, call the readStream()
method on your
SparkSession
object. This method returns a
DataStreamReader
object, which you can use to specify the format and other
configuration settings for your streaming read operation.
You must specify the following configuration settings to read from MongoDB:
Setting | Description |
---|---|
readStream.format() | Specifies the format of the underlying input data source. Use mongodb
to read from MongoDB. |
readStream.option() | Specifies stream settings, including the MongoDB deployment connection string, MongoDB database and collection, and aggregation pipeline stages. For a list of read stream configuration options, see the Streaming Read Configuration Options guide. |
readStream.schema() | Specifies the input schema. |
The following code snippet shows how to use the preceding
configuration settings to continuously process data streamed from MongoDB.
The connector appends all new data to the existing data and asynchronously
writes checkpoints to /tmp/checkpointDir
once per second. Passing the
Trigger.Continuous
parameter to the trigger()
method enables continuous
processing.
import org.apache.spark.sql.streaming.Trigger; Dataset<Row> streamingDataset = <local SparkSession>.readStream() .format("mongodb") .load(); DataStreamWriter<Row> dataStreamWriter = streamingDataset.writeStream() .trigger(Trigger.Continuous("1 second")) .format("memory") .option("checkpointLocation", "/tmp/checkpointDir") .outputMode("append"); StreamingQuery query = dataStreamWriter.start();
Note
Spark does not begin streaming until you call the
start()
method on a streaming query.
For a complete list of methods, see the Java Structured Streaming reference.
To read data from MongoDB, call the readStream
function on your
SparkSession
object. This function returns a
DataStreamReader
object, which you can use to specify the format and other configuration settings for your
streaming read operation.
You must specify the following configuration settings to read from MongoDB:
Setting | Description |
---|---|
readStream.format() | Specifies the format of the underlying input data source. Use mongodb
to read from MongoDB. |
readStream.option() | Specifies stream settings, including the MongoDB deployment connection string, MongoDB database and collection, and aggregation pipeline stages. For a list of read stream configuration options, see the Streaming Read Configuration Options guide. |
readStream.schema() | Specifies the input schema. |
The following code snippet shows how to use the preceding
configuration settings to continuously process data streamed from MongoDB.
The connector appends all new data to the existing data and asynchronously
writes checkpoints to /tmp/checkpointDir
once per second. Passing the
continuous
parameter to the trigger()
method enables continuous
processing.
streamingDataFrame = (<local SparkSession>.readStream .format("mongodb") .load() ) dataStreamWriter = (streamingDataFrame.writeStream .trigger(continuous="1 second") .format("memory") .option("checkpointLocation", "/tmp/checkpointDir") .outputMode("append") ) query = dataStreamWriter.start()
Note
Spark does not begin streaming until you call the
start()
method on a streaming query.
For a complete list of methods, see the pyspark Structured Streaming reference.
To read data from MongoDB, call the readStream
method on your
SparkSession
object. This method returns a
DataStreamReader
object, which you can use to specify the format and other configuration settings for your
streaming read operation.
You must specify the following configuration settings to read from MongoDB:
Setting | Description |
---|---|
readStream.format() | Specifies the format of the underlying input data source. Use mongodb
to read from MongoDB. |
readStream.option() | Specifies stream settings, including the MongoDB deployment connection string, MongoDB database and collection, and aggregation pipeline stages. For a list of read stream configuration options, see the Streaming Read Configuration Options guide. |
readStream.schema() | Specifies the input schema. |
The following code snippet shows how to use the preceding
configuration settings to continuously process data streamed from MongoDB.
The connector appends all new data to the existing data and asynchronously
writes checkpoints to /tmp/checkpointDir
once per second. Passing the
Trigger.Continuous
parameter to the trigger()
method enables continuous
processing.
import org.apache.spark.sql.streaming.Trigger val streamingDataFrame = <local SparkSession>.readStream .format("mongodb") .load() val dataStreamWriter = streamingDataFrame.writeStream .trigger(Trigger.Continuous("1 second")) .format("memory") .option("checkpointLocation", "/tmp/checkpointDir") .outputMode("append") val query = dataStreamWriter.start()
Note
Spark does not begin streaming until you call the
start()
method on a streaming query.
For a complete list of methods, see the Scala Structured Streaming reference.
Example
The following example shows how to stream data from MongoDB to your console.
Create a
DataStreamReader
object that reads from MongoDB.Create a
DataStreamWriter
object by calling thewriteStream()
method on the streamingDataset
object that you created with aDataStreamReader
. Specify the formatconsole
using theformat()
method.Call the
start()
method on theDataStreamWriter
instance to begin the stream.
As new data is inserted into MongoDB, MongoDB streams that
data out to your console according to the outputMode
you specify.
Important
Avoid streaming large Datasets to your console. Streaming to your console is memory intensive and intended only for testing purposes.
// create a local SparkSession SparkSession spark = SparkSession.builder() .appName("readExample") .master("spark://spark-master:<port>") .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") .getOrCreate(); // define the schema of the source collection StructType readSchema = new StructType() .add("company_symbol", DataTypes.StringType) .add("company_name", DataTypes.StringType) .add("price", DataTypes.DoubleType) .add("tx_time", DataTypes.TimestampType); // define a streaming query DataStreamWriter<Row> dataStreamWriter = spark.readStream() .format("mongodb") .option("spark.mongodb.connection.uri", "<mongodb-connection-string>") .option("spark.mongodb.database", "<database-name>") .option("spark.mongodb.collection", "<collection-name>") .schema(readSchema) .load() // manipulate your streaming data .writeStream() .format("console") .trigger(Trigger.Continuous("1 second")) .outputMode("append"); // run the query StreamingQuery query = dataStreamWriter.start();
Create a
DataStreamReader
object that reads from MongoDB.Create a
DataStreamWriter
object by calling thewriteStream()
method on the streamingDataFrame
that you created with aDataStreamReader
. Specify the formatconsole
by using theformat()
method.Call the
start()
method on theDataStreamWriter
instance to begin the stream.
As new data is inserted into MongoDB, MongoDB streams that
data out to your console according to the outputMode
you specify.
Important
Avoid streaming large Datasets to your console. Streaming to your console is memory intensive and intended only for testing purposes.
# create a local SparkSession spark = SparkSession.builder \ .appName("readExample") \ .master("spark://spark-master:<port>") \ .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") \ .getOrCreate() # define the schema of the source collection readSchema = (StructType() .add('company_symbol', StringType()) .add('company_name', StringType()) .add('price', DoubleType()) .add('tx_time', TimestampType()) ) # define a streaming query dataStreamWriter = (spark.readStream .format("mongodb") .option("spark.mongodb.connection.uri", <mongodb-connection-string>) .option('spark.mongodb.database', <database-name>) .option('spark.mongodb.collection', <collection-name>) .schema(readSchema) .load() # manipulate your streaming data .writeStream .format("console") .trigger(continuous="1 second") .outputMode("append") ) # run the query query = dataStreamWriter.start()
Create a
DataStreamReader
object that reads from MongoDB.Create a
DataStreamWriter
object by calling thewriteStream()
method on the streamingDataFrame
object that you created by using theDataStreamReader
. Specify the formatconsole
by using theformat()
method.Call the
start()
method on theDataStreamWriter
instance to begin the stream.
As new data is inserted into MongoDB, MongoDB streams that
data out to your console according to the outputMode
you specify.
Important
Avoid streaming large Datasets to your console. Streaming to your console is memory intensive and intended only for testing purposes.
// create a local SparkSession val spark = SparkSession.builder .appName("readExample") .master("spark://spark-master:<port>") .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") .getOrCreate() // define the schema of the source collection val readSchema = StructType() .add("company_symbol", StringType()) .add("company_name", StringType()) .add("price", DoubleType()) .add("tx_time", TimestampType()) // define a streaming query val dataStreamWriter = spark.readStream .format("mongodb") .option("spark.mongodb.connection.uri", <mongodb-connection-string>) .option("spark.mongodb.database", <database-name>) .option("spark.mongodb.collection", <collection-name>) .schema(readSchema) .load() // manipulate your streaming data .writeStream .format("console") .trigger(Trigger.Continuous("1 second")) .outputMode("append") // run the query val query = dataStreamWriter.start()
Important
Inferring the Schema of a Change Stream
When the Spark Connector infers the schema of a DataFrame
read from a change stream, by default,
it uses the schema of the underlying collection rather than that
of the change stream. If you set the change.stream.publish.full.document.only
option to true
, the connector uses the schema of the
change stream instead.
For more information about this setting, and to see a full list of change stream configuration options, see the Read Configuration Options guide.
API Documentation
To learn more about the types used in these examples, see the following Apache Spark API documentation: