I am following the streaming example in the documentation here. I have a similar setup with the only difference that I am writing the streaming data to parquet files. Here is my rough setup
schema=StructType([StructField("_id",StringType(),False),StructField("a",IntegerType())])
dataStreamWriter = spark.readStream\
.format("mongodb")\
.schema(schema)\
.load()\
.writeStream\
.trigger(processingTime="1 second")\
.option("hoodie.table.name","mongo")\
.option("checkpointLocation", "/path/to/checkpoints")\
.option("path", "/path/to/output")\
.format("hudi").start()
I have a mongodb instance setup locally that acts as its own replicaset so it can produce change streams and this what the documents in my test collection look like
rs0 [direct: primary] testdb> db.test.insertMany([{"a":1},{"a":2},{"a":3}])
{
acknowledged: true,
insertedIds: {
'0': ObjectId('664676ad699f140c8c8f6fc2'),
'1': ObjectId('664676ad699f140c8c8f6fc3'),
'2': ObjectId('664676ad699f140c8c8f6fc4')
}
}
As you can see, the documents match the schema I provided to my spark DataStreamReader: it is just a simple column named a
that contains integers. While my DataStreamReader is running, i insert some addition documents with
rs0 [direct: primary] testdb> db.test.insertMany([{"a":4},{"a":5},{"a":6}])
{
acknowledged: true,
insertedIds: {
'0': ObjectId('66467fc3699f140c8c8f6fc5'),
'1': ObjectId('66467fc3699f140c8c8f6fc6'),
'2': ObjectId('66467fc3699f140c8c8f6fc7')
}
}
However, when i load the that was written to my output directory, i see something i do not understand: |_id | a|
|— | —|
|{_data: 8266467FC3000000012B022C0100296E5A1004A3E989AF10E24C4C9FA3283612F5A1A546645F6964006466467FC3699F140C8C8F6FC50004} | null|
|{_data: 8266467FC3000000022B022C0100296E5A1004A3E989AF10E24C4C9FA3283612F5A1A546645F6964006466467FC3699F140C8C8F6FC60004} | null|
|{_data: 8266467FC3000000032B022C0100296E5A1004A3E989AF10E24C4C9FA3283612F5A1A546645F6964006466467FC3699F140C8C8F6FC70004} | null|
why is the “_id” column showing that blob and why is my “a” column all null? If i understand the docs, the schema i provided to the readStreamer matches the documents in my collection. Can someone help me understand what is happening?