Hello!, I am trying to run the new Spark connector org.mongodb.spark:mongo-spark-connector_2.12:10.2.2 using Apache Spark 3.4.1 and MongoDB 6.0.12, the connection is established and with the initial offset:
MongoOffsetStore: Initial offset: {"version": 1, "offset": {"$timestamp": {"t": 4294967295, "i": 4294967295}}}
then I am receiving empty batches:
[2024-03-07T13:03:10.013+0000] {spark_submit.py:571} INFO - 24/03/07 13:03:10 INFO ContinuousExecution: New epoch 67 is starting.
[2024-03-07T13:03:10.037+0000] {spark_submit.py:571} INFO - 24/03/07 13:03:10 INFO CheckpointFileManager: Writing atomically to file:/tmp/temporary-8a4800b5-a8d3-4615-8992-52662e33e3e0/offsets/66 using temp file file:/tmp/temporary-8a4800b5-a8d3-4615-8992-52662e33e3e0/offsets/.66.577d8986-f6d1-4412-8b83-37002aa402b2.tmp
[2024-03-07T13:03:10.068+0000] {spark_submit.py:571} INFO - 24/03/07 13:03:10 INFO CheckpointFileManager: Renamed temp file file:/tmp/temporary-8a4800b5-a8d3-4615-8992-52662e33e3e0/offsets/.66.577d8986-f6d1-4412-8b83-37002aa402b2.tmp to file:/tmp/temporary-8a4800b5-a8d3-4615-8992-52662e33e3e0/offsets/66
[2024-03-07T13:03:10.068+0000] {spark_submit.py:571} INFO - -------------------------------------------
[2024-03-07T13:03:10.069+0000] {spark_submit.py:571} INFO - Batch: 66
[2024-03-07T13:03:10.069+0000] {spark_submit.py:571} INFO - -------------------------------------------
[2024-03-07T13:03:10.080+0000] {spark_submit.py:571} INFO - +---+-----------+---+------+----+---------------+----+-------------+----+
[2024-03-07T13:03:10.080+0000] {spark_submit.py:571} INFO - |_id|specversion| id|source|type|datacontenttype|time|triggerStatus|data|
[2024-03-07T13:03:10.080+0000] {spark_submit.py:571} INFO - +---+-----------+---+------+----+---------------+----+-------------+----+
[2024-03-07T13:03:10.080+0000] {spark_submit.py:571} INFO -
[2024-03-07T13:03:10.085+0000] {spark_submit.py:571} INFO - 24/03/07 13:03:10 INFO CheckpointFileManager: Writing atomically to file:/tmp/temporary-8a4800b5-a8d3-4615-8992-52662e33e3e0/commits/66 using temp file file:/tmp/temporary-8a4800b5-a8d3-4615-8992-52662e33e3e0/commits/.66.3a39c92a-a430-456d-9e1e-1e16e4b277f7.tmp
[2024-03-07T13:03:10.101+0000] {spark_submit.py:571} INFO - 24/03/07 13:03:10 INFO CheckpointFileManager: Renamed temp file file:/tmp/temporary-8a4800b5-a8d3-4615-8992-52662e33e3e0/commits/.66.3a39c92a-a430-456d-9e1e-1e16e4b277f7.tmp to file:/tmp/temporary-8a4800b5-a8d3-4615-8992-52662e33e3e0/commits/66
[2024-03-07T13:03:10.101+0000] {spark_submit.py:571} INFO - 24/03/07 13:03:10 INFO MongoContinuousStream: ContinuousStream commit: {"version": 1, "offset": {"$timestamp": {"t": 4294967295, "i": 4294967295}}}
Something that I can see is the MongoContinuousStream $timestamp, because for example the value 4294967295 in date format is GMT : Sunday, 7 February 2106 6:28:15 , 82 years later.
This is the pyspark code:
dataStreamWriter = (spark.readStream
.format("mongodb")
.option("spark.mongodb.connection.uri", "mongodb://root:root@mongodb-0.mongodb-headless.mongodb.svc.cluster.local:27017")
.option('spark.mongodb.database', "my_db")
.option('spark.mongodb.collection', "my_collection")
.option('spark.mongodb.read.readPreference.name', "primaryPreferred")
.option("spark.mongodb.change.stream.publish.full.document.only", "true")
.option("change.stream.publish.full.document.only", "true")
#.schema(readSchema)
.load()
# manipulate your streaming data
.writeStream
.format("console")
.trigger(continuous="10 second")
.outputMode("append")
)
query = dataStreamWriter.start()
query.awaitTermination()
I’ve tried with and without schema, but the result is the same.
Please let me know if you need more details
Regards,
Fabri