Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 83 additions & 11 deletions tests/csapi/thread_notifications_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,25 @@ func TestThreadedReceipts(t *testing.T) {
)
}

// Regression test for https://github.com/matrix-org/matrix-spec/issues/1727
// Servers should always prefer the unthreaded receipt when there is a clash of receipts
// Regression test for https://github.com/matrix-org/matrix-spec/issues/1727 (and
// https://github.com/element-hq/synapse/pull/19838).
//
// Servers should always prefer the unthreaded receipt when there is a clash of
// receipts for the same event, and that preference must be *durable*: it must
// hold even when the unthreaded and threaded receipts are served in separate
// `/sync` responses. This happens when the two land at different stream
// positions, e.g. when they arrive over federation as separate EDUs. A
// non-durable server that only dedupes at read-time will serve the threaded
// receipt on its own in a later `/sync`, letting it incorrectly win.
Comment on lines +326 to +332

@MadLittleMods MadLittleMods Jun 15, 2026

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unclear to me that this is true for all cases.

As far as I can tell, for example, we could also tackle this problem at read-time. In Synapse, get_linearized_receipts_for_room(...)/get_linearized_receipts_for_rooms(...), could de-duplicate if a previous unthreaded receipt existed. Even in Synapse, we store things as (room_id, receipt_type, user_id, thread_id) after all which accommodates holding onto both

Which also makes me question whether our approach in element-hq/synapse#19838 is the best (not sure)

(also plays into some of the comments which make some assumptions as well)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is your point that actually you could correctly implement this at read time by checking for other receipts when pulling from the DB? Yes I suppose you could?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, in which case this comment assumes too much

//
// To test this deterministically (rather than relying on the receipts happening
// to be served in the same `/sync` response), each user is made to observe the
// unthreaded receipt down `/sync` *before* the clashing threaded receipt is
// sent, so the two are guaranteed to fall in different sync windows. A marker
// event sent after the threaded receipt then gives a deterministic point by
// which the threaded receipt must have been delivered if it was going to be. We
// assert it never is, both for the local user and for a remote user over
// federation.
func TestThreadReceiptsInSyncMSC4102(t *testing.T) {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MSC4102 only mentions that the de-duplication should happen when assembling EDU's and there is a unthreaded and threaded receipt in the same response.

It doesn't say anything about an unthreaded read receipt winning out over new threaded read receipts

runtime.SkipIf(t, runtime.Dendrite) // not supported
deployment := complement.Deploy(t, 2)
Expand Down Expand Up @@ -354,23 +371,78 @@ func TestThreadReceiptsInSyncMSC4102(t *testing.T) {
},
},
})
// now send an unthreaded RR for event B and a threaded RR for event B and ensure we see the unthreaded RR
// down /sync. Non-compliant servers will typically send the last one only.
Comment on lines -357 to -358

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this test needs to stay trying to send an unthreaded and threaded read receipt in the same /send federation transaction.

But we can't control how a homeserver sends EDU's so I don't think this test can be made deterministic. We need to update it to accommodate the MSC4102 behavior or them being sent across two transactions.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this test needs to stay trying to send an unthreaded and threaded read receipt in the same /send federation transaction.

Why?

But we can't control how a homeserver sends EDU's so I don't think this test can be made deterministic. We need to update it to accommodate the MSC4102 behavior or them being sent across two transactions.

Sorry, what are you actually suggesting we do?

@MadLittleMods MadLittleMods Jun 16, 2026

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the whole point of the MSC4102 and we want the test to stress this behavior,

When a server is combining receipts into an EDU, if there are multiple receipts for the same (user, event, receipt type), always choose the receipt which is unthreaded (has no thread_id) when aggregating into an EDU.

This change will apply to all m.receipt EDUs, which includes both CSAPI and Federation endpoints.

-- MSC4102

But we can't control how a homeserver emits EDU's so the best we can do is send them quickly in succession and see what it does. Combining into one EDU is fine (per MSC4102) or separating is fine (depends on how slow the server is to process things and when the request comes in).

We need to update the test to accommodate both expectations.

alice.MustDo(t, "POST", []string{"_matrix", "client", "v3", "rooms", roomID, "receipt", "m.read", eventB}, client.WithJSONBody(t, struct{}{}))
alice.MustDo(t, "POST", []string{"_matrix", "client", "v3", "rooms", roomID, "receipt", "m.read", eventB}, client.WithJSONBody(t, map[string]interface{}{"thread_id": eventA}))

alice.MustSyncUntil(
// Send an unthreaded RR for event B, and wait until both alice and bob have
// seen it down /sync. This pins the unthreaded receipt to its own stream
// position / sync window on each server, so the clashing threaded receipt
// sent next is forced into a *separate* sync response.
alice.MustDo(t, "POST", []string{"_matrix", "client", "v3", "rooms", roomID, "receipt", "m.read", eventB}, client.WithJSONBody(t, struct{}{}))
aliceNextBatch := alice.MustSyncUntil(
t,
client.SyncReq{},
syncHasUnthreadedReadReceipt(roomID, alice.UserID, eventB),
)

// bob over federation must also see the same result, to show that the receipt EDUs over
// federation are bundled correctly, or are sent as separate EDUs.
bob.MustSyncUntil(
bobNextBatch := bob.MustSyncUntil(
t,
client.SyncReq{},
syncHasUnthreadedReadReceipt(roomID, alice.UserID, eventB),
)

// Now send a clashing threaded RR for the *same* event. Pre-fix this was
// inserted at a later stream position (and federated as a separate EDU) and
// so could be served on its own in a later /sync, incorrectly winning.
alice.MustDo(t, "POST", []string{"_matrix", "client", "v3", "rooms", roomID, "receipt", "m.read", eventB}, client.WithJSONBody(t, map[string]interface{}{"thread_id": eventA}))

// Send a marker event strictly after the clashing receipt. Once a server has
// delivered this event down /sync, the (earlier) threaded receipt must also
// have been processed and delivered if it was going to be.
eventC := alice.SendEventSynced(t, roomID, b.Event{
Type: "m.room.message",
Content: map[string]interface{}{
"msgtype": "m.text",
"body": "Marker",
},
})

// The threaded receipt must never win, neither locally nor over federation.
assertUnthreadedReceiptWins(t, alice, roomID, aliceNextBatch, alice.UserID, eventB, eventA, eventC)
assertUnthreadedReceiptWins(t, bob, roomID, bobNextBatch, alice.UserID, eventB, eventA, eventC)
}

// assertUnthreadedReceiptWins drives an incremental /sync for `user` from
// `since` until it observes `markerEventID` in the room timeline, and fails the
// test if at any point along the way a threaded read receipt for `clashEventID`
// (in thread `clashThreadID`) from `receiptUserID` was served. The marker event
// must have been sent *after* the clashing threaded receipt, so that observing
// it guarantees the receipt has already been delivered if it was going to be.
func assertUnthreadedReceiptWins(t *testing.T, user *client.CSAPI, roomID, since, receiptUserID, clashEventID, clashThreadID, markerEventID string) {
t.Helper()
sawThreadedClash := false
// We can't use a `syncHasThreadedReadReceipt` check for this: MustSyncUntil
// drops a check once it passes, whereas we need to keep scanning every
// response for the forbidden receipt right up until the marker arrives. So
// we accumulate a flag by hand and use the marker as the (positive) stop
// condition.
Comment on lines +421 to +425

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can still use syncHasThreadedReadReceipt but composed into our own check

// In the unlikely event that you want all the checkers to pass *explicitly* in a single /sync
// response (e.g to assert some form of atomic update which updates multiple parts of the /sync
// response at once) then make your own checker function which does this.

marker := client.SyncTimelineHasEventID(roomID, markerEventID)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
marker := client.SyncTimelineHasEventID(roomID, markerEventID)
syncTimelineHasMarkerEventID := client.SyncTimelineHasEventID(roomID, markerEventID)

Although, I don't think it really matters to store and re-use this.

user.MustSyncUntil(t, client.SyncReq{Since: since}, func(clientUserID string, topLevelSyncJSON gjson.Result) error {
ephemeral := topLevelSyncJSON.Get("rooms.join." + client.GjsonEscape(roomID) + ".ephemeral.events")
for _, ev := range ephemeral.Array() {
if ev.Get("type").Str != "m.receipt" {
continue
}
receipt := ev.Get("content").Get(clashEventID).Get(`m\.read`).Get(receiptUserID)
if receipt.Exists() && receipt.Get("thread_id").Str == clashThreadID {
sawThreadedClash = true
}
}
// Wait until the marker event arrives, by which point any receipt for the
// clashing event must already have been delivered.
return marker(clientUserID, topLevelSyncJSON)
})
if sawThreadedClash {
t.Fatalf(
"%s saw a threaded read receipt for %s (thread %s) win down /sync; the clashing unthreaded receipt should always win (MSC4102)",
user.UserID, clashEventID, clashThreadID,
)
}
}
Loading