Streaming Data With Apache Spark and MongoDB
Rate this article
MongoDB has released a version 10 of the MongoDB Connector for Apache Spark that leverages the new Spark Data Sources API V2 with support for Spark Structured Streaming.
The current version of the MongoDB Spark Connector was originally written in 2016 and is based upon V1 of the Spark Data Sources API. While this API version is still supported, Databricks has released an updated version of the API, making it easier for data sources like MongoDB to work with Spark. By having the MongoDB Spark Connector use V2 of the API, an immediate benefit is a tighter integration with Spark Structured Streaming.
Note: With respect to the previous version of the MongoDB Spark Connector that supported the V1 API, MongoDB will continue to support this release until such a time as Databricks depreciates V1 of the Data Source API. While no new features will be implemented, upgrades to the connector will include bug fixes and support for the current versions of Spark only.
The new MongoDB Spark Connector release (Version 10.1) is not intended to be a direct replacement for your applications that use the previous version of MongoDB Spark Connector.
The new Connector uses a different namespace with a short name, “mongodb” (full path is “com.mongodb.spark.sql.connector.MongoTableProvider”), versus “mongo” (full path of “com.mongodb.spark.DefaultSource”). Having a different namespace makes it possible to use both versions of the connector within the same Spark application! This is helpful in unit testing your application with the new Connector and making the transition on your timeline.
Also, we are changing how we version the MongoDB Spark Connector. The previous versions of the MongoDB Spark Connector aligned with the version of Spark that was supported—e.g., Version 2.4 of the MongoDB Spark Connector works with Spark 2.4. Keep in mind that going forward, this will not be the case. The MongoDB documentation will make this clear as to which versions of Spark the connector supports.
Apache Spark comes with a stream processing engine called Structured Streaming, which is based on Spark's SQL engine and DataFrame APIs. Spark Structured Streaming treats each incoming stream of data as a micro-batch, continually appending each micro-batch to the target dataset. This makes it easy to convert existing Spark batch jobs into a streaming job. Structured Streaming has evolved over Spark releases and in Spark 2.3 introduced Continuous Processing mode, which took the micro-batch latency from over 100ms to about 1ms. Note this feature is still in experimental mode according to the official Spark Documentation. In the following example, we’ll show you how to stream data between MongoDB and Spark using Structured Streams and continuous processing. First, we’ll look at reading data from MongoDB.
You can stream data from MongoDB to Spark using the new Spark Connector. Consider the following example that streams stock data from a MongoDB Atlas cluster. A sample document in MongoDB is as follows:
1 { 2 _id: ObjectId("624767546df0f7dd8783f300"), 3 company_symbol: 'HSL', 4 company_name: 'HUNGRY SYNDROME LLC', 5 price: 45.74, 6 tx_time: '2022-04-01T16:57:56Z' 7 }
In this code example, we will use the new MongoDB Spark Connector and read from the StockData collection. When the Spark Connector opens a streaming read connection to MongoDB, it opens the connection and creates a MongoDB Change Stream for the given database and collection. A change stream is used to subscribe to changes in MongoDB. As data is inserted, updated, and deleted, change stream events are created. It’s these change events that are passed back to the client in this case the Spark application. There are configuration options that can change the structure of this event message. For example, if you want to return just the document itself and not include the change stream event metadata, set “spark.mongodb.change.stream.publish.full.document.only” to true.
1 from pyspark import SparkContext 2 from pyspark.streaming import StreamingContext 3 from pyspark.sql import SparkSession 4 from pyspark.sql.functions import * 5 6 spark = SparkSession.\ 7 builder.\ 8 appName("streamingExampleRead").\ 9 config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12::10.1.1').\ 10 getOrCreate() 11 12 query=(spark.readStream.format("mongodb") 13 .option('spark.mongodb.connection.uri', '<CONNECTION STRING>') 14 .option('spark.mongodb.database', 'Stocks') \ 15 .option('spark.mongodb.collection', 'StockData') \ 16 .option('spark.mongodb.change.stream.publish.full.document.only','true') \ 17 .option("forceDeleteTempCheckpointLocation", "true") \ 18 .load()) 19 20 query.printSchema()
The schema is inferred from the MongoDB collection. You can see from the printSchema command that our document structure is as follows:
root: | |||
---|---|---|---|
_id | string | (nullable=true) | |
company_name | string | (nullable=true) | |
company_symbol | string | (nullable=true) | |
price | double | (nullable=true) | |
tx_time | string | (nullable=true) |
We can verify that the dataset is streaming with the isStreaming command.
1 query.isStreaming
Next, let’s read the data on the console as it gets inserted into MongoDB.
1 query2=(query.writeStream \ 2 .outputMode("append") \ 3 .option("forceDeleteTempCheckpointLocation", "true") \ 4 .format("console") \ 5 .trigger(continuous="1 second") 6 .start().awaitTermination());
When the above code was run through spark-submit, the output resembled the following:
… removed for brevity …
+--------------------+--------------------+--------------+-----+-------------------+
| _id| company_name|company_symbol|price| tx_time|
+--------------------+--------------------+--------------+-----+-------------------+
|62476caa6df0f7dd8...| HUNGRY SYNDROME LLC| HSL|45.99|2022-04-01 17:20:42|
|62476caa6df0f7dd8...|APPETIZING MARGIN...| AMP|12.81|2022-04-01 17:20:42|
|62476caa6df0f7dd8...|EMBARRASSED COCKT...| ECC|38.18|2022-04-01 17:20:42|
|62476caa6df0f7dd8...|PERFECT INJURY CO...| PIC|86.85|2022-04-01 17:20:42|
|62476caa6df0f7dd8...|GIDDY INNOVATIONS...| GMI|84.46|2022-04-01 17:20:42|
+--------------------+--------------------+--------------+-----+-------------------+
… removed for brevity …
+--------------------+--------------------+--------------+-----+-------------------+
| _id| company_name|company_symbol|price| tx_time|
+--------------------+--------------------+--------------+-----+-------------------+
|62476cab6df0f7dd8...| HUNGRY SYNDROME LLC| HSL|46.04|2022-04-01 17:20:43|
|62476cab6df0f7dd8...|APPETIZING MARGIN...| AMP| 12.8|2022-04-01 17:20:43|
|62476cab6df0f7dd8...|EMBARRASSED COCKT...| ECC| 38.2|2022-04-01 17:20:43|
|62476cab6df0f7dd8...|PERFECT INJURY CO...| PIC|86.85|2022-04-01 17:20:43|
|62476cab6df0f7dd8...|GIDDY INNOVATIONS...| GMI|84.46|2022-04-01 17:20:43|
+--------------------+--------------------+--------------+-----+-------------------+
Next, let’s consider an example where we stream data from Apache Kafka to MongoDB. Here the source is a kafka topic “stockdata.Stocks.StockData.” As data arrives in this topic, it’s run through Spark with the message contents being parsed, transformed, and written into MongoDB. Here is the code listing with comments in-line:
1 from pyspark import SparkContext 2 from pyspark.streaming import StreamingContext 3 from pyspark.sql import SparkSession 4 from pyspark.sql import functions as F 5 from pyspark.sql.functions import * 6 from pyspark.sql.types import StructType,TimestampType, DoubleType, StringType, StructField 7 8 spark = SparkSession.\ 9 builder.\ 10 appName("streamingExampleWrite").\ 11 config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector:10.1.1').\ 12 config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0').\ 13 getOrCreate() 14 15 df = spark \ 16 .readStream \ 17 .format("kafka") \ 18 .option("startingOffsets", "earliest") \ 19 .option("kafka.bootstrap.servers", "KAFKA BROKER HOST HERE") \ 20 .option("subscribe", "stockdata.Stocks.StockData") \ 21 .load() 22 23 schemaStock = StructType([ \ 24 StructField("_id",StringType(),True), \ 25 StructField("company_name",StringType(), True), \ 26 StructField("company_symbol",StringType(), True), \ 27 StructField("price",StringType(), True), \ 28 StructField("tx_time",StringType(), True)]) 29 30 schemaKafka = StructType([ \ 31 StructField("payload",StringType(),True)])
Note that Kafka topic message arrives in this format -> key (binary), value (binary), topic (string), partition (int), offset (long), timestamp (long), timestamptype (int). See Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher) for more information on the Kafka and Spark integration.
To process the message for consumption into MongoDB, we want to pick out the value which is in binary format and convert it to JSON.
1 stockDF=df.selectExpr("CAST(value AS STRING)")
For reference, here is an example of an event (the value converted into a string) that is on the Kafka topic:
1 { 2 "schema": { 3 "type": "string", 4 "optional": false 5 }, 6 "payload": "{\"_id\": {\"$oid\": \"6249f8096df0f7dd8785d70a\"}, \"company_symbol\": \"GMI\", \"company_name\": \"GIDDY INNOVATIONS\", \"price\": 87.57, \"tx_time\": \"2022-04-03T15:39:53Z\"}" 7 }
We want to isolate the payload field and convert it to a JSON representation leveraging the shcemaStock defined above. For clarity, we have broken up the operation into multiple steps to explain the process. First, we want to convert the value into JSON.
1 stockDF=stockDF.select(from_json(col('value'),schemaKafka).alias("json_data")).selectExpr('json_data.*')
The dataset now contains data that resembles
1 … 2 { 3 _id: ObjectId("624c6206e152b632f88a8ee2"), 4 payload: '{"_id": {"$oid": "6249f8046df0f7dd8785d6f1"}, "company_symbol": "GMI", "company_name": "GIDDY MONASTICISM INNOVATIONS", "price": 87.62, "tx_time": "2022-04-03T15:39:48Z"}' 5 }, …
Next, we want to capture just the value of the payload field and convert that into JSON since it’s stored as a string.
1 stockDF=stockDF.select(from_json(col('payload'),schemaStock).alias("json_data2")).selectExpr('json_data2.*')
Now we can do whatever transforms we would like to do on the data. In this case, let’s convert the tx_time into a timestamp.
1 stockDF=stockDF.withColumn("tx_time",col("tx_time").cast("timestamp"))
The Dataset is in a format that’s ready for consumption into MongoDB, so let’s stream it out to MongoDB. To do this, use the writeStream method. Keep in mind there are various options to set. For example, when present, the “trigger” option processes the results in batches. In this example, it’s every 10 seconds. Removing the trigger field will result in continuous writing. For more information on options and parameters, check out the Structured Streaming Guide.
1 dsw = ( 2 stockDF.writeStream 3 .format("mongodb") 4 .queryName("ToMDB") 5 .option("checkpointLocation", "/tmp/pyspark7/") 6 .option("forceDeleteTempCheckpointLocation", "true") 7 .option('spark.mongodb.connection.uri', ‘<CONNECTION STRING>') 8 .option('spark.mongodb.database', 'Stocks') 9 .option('spark.mongodb.collection', 'Sink') 10 .trigger(continuous="10 seconds") 11 .outputMode("append") 12 .start().awaitTermination());
While continuous mode offers a lot of promise in terms of the latency and performance characteristics, the support for various popular connectors like AWS S3 for example is non-existent. Thus, you might end up using microbatch mode within your solution. The key difference between the two is how spark handles obtaining the data from the stream. As mentioned previously, the data is batched and processed versus using a continuous append to a table. The noticeable difference is the advertised latency of microbatch around 100ms which for most workloads might not be an issue.
Unlike when we specify a write, when we read from MongoDB, there is no special configuration to tell Spark to use microbatch or continuous. This behavior is determined only when you write. Thus, in our code example, to read from MongoDB is the same in both cases, e.g.:
1 query=(spark.readStream.format("mongodb").\ 2 option('spark.mongodb.connection.uri', '<<MONGODB CONNECTION STRING>>').\ 3 option('spark.mongodb.database', 'Stocks').\ 4 option('spark.mongodb.collection', 'StockData').\ 5 option('spark.mongodb.change.stream.publish.full.document.only','true').\ 6 option("forceDeleteTempCheckpointLocation", "true").\ 7 load())
Recall from the previous discussion on reading MongoDB data, when using
spark.readStream.format("mongodb")
, MongoDB opens a change stream and subscribes to changes as they occur in the database. With microbatch each microbatch event opens a new change stream cursor making this form of microbatch streaming less efficient than continuous streams. That said, some consumers of streaming data such as AWS S3 only support data from microbatch streams.Consider the previous writeStream example code:
1 dsw = ( 2 stockDF.writeStream 3 .format("mongodb") 4 .queryName("ToMDB") 5 .option("checkpointLocation", "/tmp/pyspark7/") 6 .option("forceDeleteTempCheckpointLocation", "true") 7 .option('spark.mongodb.connection.uri', '<<MONGODB CONNECTION STRING>>') 8 .option('spark.mongodb.database', 'Stocks') 9 .option('spark.mongodb.collection', 'Sink') 10 .trigger(continuous="10 seconds") 11 .outputMode("append") 12 .start().awaitTermination());
Here the .trigger parameter was used to tell Spark to use Continuous mode streaming, to use microbatch simply remove the .trigger parameter.
Streaming data is a critical component of many types of applications. MongoDB has evolved over the years, continually adding features and functionality to support these types of workloads. With the MongoDB Spark Connector version 10.1, you can quickly stream data to and from MongoDB with a few lines of code.
For more information and examples on the new MongoDB Spark Connector version 10.1, check out the online documentation. Have questions about the connector or MongoDB? Post a question in the MongoDB Developer Community Connectors & Integrations forum.