I’m receiving the following error, but I’m pretty sure I do in fact have the streams set to publish full document only.
py4j.protocol.Py4JJavaError: An error occurred while calling o112.start.
: com.mongodb.spark.sql.connector.exceptions.ConfigException: Mongo Continuous streams require a schema to be explicitly defined, unless using publish full document only.
Here is the code for the read stream
spark = SparkSession \
.builder \
.appName("My Spark App") \
.getOrCreate()
query = (spark.readStream.format ("mongodb")
.option ("spark.mongodb.collection", "input_collection")
.option ("spark.mongodb.change.stream.publish.full.document.only", "true")
.option("checkpointLocation", "/tmp/pyspark/") \
.option ("forceDeleteTempCheckpointLocation", "true")
.load ())
And for the write stream
query2 = (query.writeStream
.format ("mongodb")
.option ("spark.mongodb.collection", "output_collection")
.option ("spark.mongodb.change.stream.publish.full.document.only", "true")
.option("checkpointLocation", "/tmp/pyspark/") \
.option ("forceDeleteTempCheckpointLocation", "true")
.outputMode ("append")
.trigger(continuous="1 second")
.start().awaitTermination())
Can anybody help me figure out what I’m doing wrong?