I am on Mongo6.0, Spark 3.4.1, Mongo Spark Connector 10.3
I am looking to use the mongo spark connector to capture all new/updated documents to a table in our data lake. However, the collection current contains about 1M+ documents that i wish to ingest first. Reading the read streaming configurations, it looks like i should be using
option("spark.mongodb.read.change.stream.startup.mode","timestamp")
option("spark.mongodb.read.change.stream.startup.mode.timestamp.start.at.operation.time", "1")
My assumption that these two configurations would tell the mongo spark connector to ingest everything from the beginning of time (such as how Kafka topics have an “earliest” offset to consume from.)
But it seems like I am incorrect with my assumption because it is only printing empty data frames to my console:
>>> -------------------------------------------
Batch: 0
-------------------------------------------
+---+---+---+---+
|_id| a| b| c|
+---+---+---+---+
+---+---+---+---+
-------------------------------------------
Batch: 1
-------------------------------------------
+---+---+---+---+
|_id| a| b| c|
+---+---+---+---+
+---+---+---+---+
------------------------------------------- (0 + 1) / 1]
Batch: 2
-------------------------------------------
+---+---+---+---+
|_id| a| b| c|
+---+---+---+---+
+---+---+---+---+
-------------------------------------------
Batch: 3
-------------------------------------------
+---+---+---+---+
|_id| a| b| c|
+---+---+---+---+
+---+---+---+---+
-------------------------------------------
Batch: 4
-------------------------------------------
+---+---+---+---+
|_id| a| b| c|
+---+---+---+---+
+---+---+---+---+
-------------------------------------------
Batch: 5
-------------------------------------------
+---+---+---+---+
|_id| a| b| c|
+---+---+---+---+
+---+---+---+---+
Can someone point me in the right direction? Here is a rough snippet of how i am setting up my stream:
dataStreamWriter=spark.readStream\
.option(...)\ # setting up my endpoint, auth, etc.
.option("spark.mongodb.read.change.stream.publish.full.document.only", "true"\
.option("spark.mongodb.read.change.stream.startup.mode", "timestamp")\
.option("spark.mongodb.read.change.stream.startup.mode.timestamp.start.at.operation.time", "1")\
.format("mongodb")\
.load().writeStream\
.format("console")\
.trigger(continuous="1 second")\
.outputMode("append")