Read after change stream event does not reflect change (Replica Set, RC/WC: majority)

Hi all!

I am observing a bug in my code which, given my understanding of majority-commits, should not be there. My code reads a document right after receiving an event from a change stream. But sometimes, the field being watched is not (yet) set when reading the field immediately after the change occurred.

This happens under the following conditions:

  • 2-node Replica Set, Version 7.0.12
  • (motor) client initialized with:
    • Read Concern: majority
    • Write Concern: majority
    • Read Preference: nearest
    • Journal: true
  • Code does not use explicit client sessions

I am aware of the following:

  • I will not get Causal Consistency unless using explicit client session
  • Both nodes have to be alive in order to make any successful requests

The code is rather involved, but basically does the following:

event = await motorCollection.watch(
    pipeline=[
        {"$match": {"updateDescription.updatedFields.myField": {"$exists": True}}}
    ],
    full_document=None
).next()
fetched = await motorCollection.aggregate(
    pipeline=[
        {"$match": {"_id": event[documentKey"]["_id"]}}
    ]
).next()
assert fetched.get("myField") # this sometimes fails

(myField will never be changed or deleted after it was written once)

So now I am wondering why myField is sometimes not (yet) set. As I understand it …

  • The watch only emits majority-committed changes
  • The aggregation only reads majority-committed changes
  • The aggregation happens STRICTLY AFTER the event was received

From this, I conclude that the assertion should never raise. Still, it does. Why is this?
I am not concerned with the various hazards emerging from not using sessions. I just want to understand why a majority-committed write is visible to a change stream but not visible to an aggregation happening strictly after the event was received.

To be clear, there might still be an unrelated bug in my code base. Nevertheless, I want to sharpen my understanding of MongoDB commit semantics. That said, my question basically boils down to:

Will a majority-committed write be visible globally and immediately, to all clients reading only majority-committed data?

Kind regards
Johannes

Will a majority-committed write be visible globally and immediately, to all clients reading only majority-committed data?

I believe the answer is only yes if the read happens on the primary. If the read happens on a secondary, then it may return data from an earlier majority commit point. If you change your app to use Read Preference Primary, I would expect this error to go away.

Hi Shane. Thank you for your response.

In the meantime I investigated this issue a bit more and came to a similar conclusion. As a side note, I am using using Read Preference = nearest in order to decrease the load the primary has to handle.

While trying to find a fix for my issue, I was reading about client sessions and advance_cluster_time/advance_operation_time.

Acording to my understanding of the docs, the example above would have to use two sessions as

A single ClientSession cannot be used to run multiple operations concurrently.

(stated here. Am I understanding this correctly? Is the aggregate considered a concurrent operation when the watch is still lingering?)

Concluding from the statement above, I have to wrap the watch in a session s1 . Upon receiving an event, I then have to create a new session s2 , advance its time and then run the aggregate in it like this:

s1 = await motorClient.start_session()
event = await motorCollection.watch(
    pipeline=[
        {"$match": {"updateDescription.updatedFields.myField": {"$exists": True}}}
    ],
    full_document=None,
    session=s1
).next()

s2 = await motorClient.start_session()
s2.advance_cluster_time(s1.cluster_time)
s2.advance_operation_time(s1.operation_time)

fetched = await motorCollection.aggregate(
    pipeline=[
        {"$match": {"_id": event["documentKey"]["_id"]}}
    ],
    session=s2
).next()
assert fetched.get("myField")

(analogous to the second example from here)

I was under the impression, that exactly this behavior would be required to be done by MotorClient as the spec suggests. But reading it a second time, I understand that the internal machinery concerned with gossiping the cluster time does not give any (implicit) causality guarantees to the user. Rather, it merely gets/sets the top level $clusterTime key, which is for internal use only. For causality, a majority read concern is not enough as it only gives durability (and no causality) guarantees. I have to use afterClusterTime (which is set automatically when using sessions) for this. Correct?

Your code and conclusion are correct. As an optimization, it should be possible to use the clusterTime field from the change stream event itself for advance_operation_time which should decrease the latency on the secondary read, for example:

s2 = await motorClient.start_session()
s2.advance_cluster_time(s1.cluster_time)
s2.advance_operation_time(event["clusterTime"])

See https://www.mongodb.com/docs/manual/reference/change-events/insert/ fro the event “clusterTime” field.

PS. The naming of cluster_time, operation_time, the event’s clusterTime, and readConcern’s afterClusterTime is unfortunately inconsistent and confusing.