Hi Community,
We are trying to perform CDC(Changed Data Capture) and write that to S3 in JSON format, from all of our collections created in MongoDB Atlas(v4.4.23) deployment using Spark Structured Streaming. We are using PySpark in AWS Glue(v3.0) to run this Spark Streaming Job. We used mongo-spark-connector_2.12-10.1.1.
Also passed below jars to the streaming job.
- bson-4.10.2.jar
- mongodb-driver-core-4.10.2.jar
- mongodb-driver-sync-4.10.2.jar
However the job is failing with below exception when its being executed in AWS Glue. I ran the similar streaming job in my local system, but I did not encountered any issue.
java.lang.NoSuchMethodError: org.bson.conversions.Bson.toBsonDocument()Lorg/bson/BsonDocument;
at com.mongodb.spark.sql.connector.read.MongoMicroBatchPartitionReader.getCursor(MongoMicroBatchPartitionReader.java:169)
at com.mongodb.spark.sql.connector.read.MongoMicroBatchPartitionReader.next(MongoMicroBatchPartitionReader.java:103)
at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:79)
at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:112)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:454)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:454)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:454)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:277)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:286)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:210)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Providing below the code for more clarity.
def write_to_filesink(batch_df, batch_id, collection, base_dir):
batch_df.write\
.format('json')\
.mode('append')\
.save(f'{base_dir}/{collection}')
base_s3_path = s3://bucket/dir1/dir2
partial_filesink_writter = partial(write_to_filesink, collection=collection, base_dir=base_s3_path)
streaming_df = spark.readStream\
.format('mongodb')\
.option('spark.mongodb.connection.uri', connection_uri)\
.option('spark.mongodb.database', db_name)\
.option('spark.mongodb.collection', collection_name) \
.option('spark.mongodb.change.stream.publish.full.document.only', 'true')\
.option("forceDeleteTempCheckpointLocation", "true")\
.load()
query = streaming_df.writeStream \
.foreachBatch(partial_filesink_writter) \
.option('checkpointLocation', \
f'{base_dir}/{collection}/_checkpoint') \
.trigger(processingTime='10 seconds') \
.start()
query.awaitTermination()
Would appreciate help in solving this issue.
Thanks.