Hi - I am currently trying to read the change data from MongoDB and persisting the results to a file sink but getting a java.lang.UnsupportedOperationException: Data source mongodb does not support microbatch processing. error
java.lang.UnsupportedOperationException: Data source mongodb does not support microbatch processing.
at org.apache.spark.sql.errors.QueryExecutionErrors$.microBatchUnsupportedByDataSourceError(QueryExecutionErrors.scala:1579)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:123)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:97)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:575)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:167)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:575)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:268)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:264)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:551)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:519)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.planQuery(MicroBatchExecution.scala:97)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:194)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:194)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:342)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:968)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:323)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:250)
@Krishna_Kumar_Sahu Can you add your requirements to this ticket? Specifically the use case and what destinations you are writing to that require microbatch?
According to ticket https://jira.mongodb.org/browse/SPARK-368 the seems it’s resolved in fixed Version 10.1.0 of mongo spark connector but I can’t find the updated mongo spark connector in Maven please provide the link. or makes available that fixed version of it.
to try version 10.1 on databricks, Do I have to download all these files and add to the cluster? or which files from that list are needed? how can I reference in my spark session? Thank you very much.
The error message indicates that the MongoDB Spark Connector does not support microbatch processing for the mongodb data source. This means that you cannot use the writeStream API with the mongodb data source. Instead, you may want to consider using the foreachBatch API to write the streaming data to MongoDB.
Here’s an example of how you can use foreachBatch to write streaming data to MongoDB:
In this example, the query variable reads data from the MongoDB collection, and the write_mongodb function writes the data to MongoDB. The foreachBatch API is used to write the data in batches to MongoDB. You can modify the write_mongodb function to write the data to a file instead of MongoDB.
I hope this helps! Let me know if you have any further questions.