From 14bbec1456a50aef5a9821c3bb9e524e215ca984 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 15 Jun 2026 11:34:53 +0100 Subject: [PATCH] Make `TestThreadReceiptsInSyncMSC4102` deterministic The test only caught the MSC4102 "prefer unthreaded receipt" bug when the clashing unthreaded and threaded receipts happened to be served in the same /sync response (where read-time dedup hides the threaded one), so it flaked. Force the receipts into separate sync windows by waiting for each user to observe the unthreaded receipt before sending the clashing threaded one, then assert the threaded receipt never wins, both locally and over federation. --- tests/csapi/thread_notifications_test.go | 94 +++++++++++++++++++++--- 1 file changed, 83 insertions(+), 11 deletions(-) diff --git a/tests/csapi/thread_notifications_test.go b/tests/csapi/thread_notifications_test.go index 334346759..1c9c87515 100644 --- a/tests/csapi/thread_notifications_test.go +++ b/tests/csapi/thread_notifications_test.go @@ -320,8 +320,25 @@ func TestThreadedReceipts(t *testing.T) { ) } -// Regression test for https://github.com/matrix-org/matrix-spec/issues/1727 -// Servers should always prefer the unthreaded receipt when there is a clash of receipts +// Regression test for https://github.com/matrix-org/matrix-spec/issues/1727 (and +// https://github.com/element-hq/synapse/pull/19838). +// +// Servers should always prefer the unthreaded receipt when there is a clash of +// receipts for the same event, and that preference must be *durable*: it must +// hold even when the unthreaded and threaded receipts are served in separate +// `/sync` responses. This happens when the two land at different stream +// positions, e.g. when they arrive over federation as separate EDUs. A +// non-durable server that only dedupes at read-time will serve the threaded +// receipt on its own in a later `/sync`, letting it incorrectly win. +// +// To test this deterministically (rather than relying on the receipts happening +// to be served in the same `/sync` response), each user is made to observe the +// unthreaded receipt down `/sync` *before* the clashing threaded receipt is +// sent, so the two are guaranteed to fall in different sync windows. A marker +// event sent after the threaded receipt then gives a deterministic point by +// which the threaded receipt must have been delivered if it was going to be. We +// assert it never is, both for the local user and for a remote user over +// federation. func TestThreadReceiptsInSyncMSC4102(t *testing.T) { runtime.SkipIf(t, runtime.Dendrite) // not supported deployment := complement.Deploy(t, 2) @@ -354,23 +371,78 @@ func TestThreadReceiptsInSyncMSC4102(t *testing.T) { }, }, }) - // now send an unthreaded RR for event B and a threaded RR for event B and ensure we see the unthreaded RR - // down /sync. Non-compliant servers will typically send the last one only. - alice.MustDo(t, "POST", []string{"_matrix", "client", "v3", "rooms", roomID, "receipt", "m.read", eventB}, client.WithJSONBody(t, struct{}{})) - alice.MustDo(t, "POST", []string{"_matrix", "client", "v3", "rooms", roomID, "receipt", "m.read", eventB}, client.WithJSONBody(t, map[string]interface{}{"thread_id": eventA})) - alice.MustSyncUntil( + // Send an unthreaded RR for event B, and wait until both alice and bob have + // seen it down /sync. This pins the unthreaded receipt to its own stream + // position / sync window on each server, so the clashing threaded receipt + // sent next is forced into a *separate* sync response. + alice.MustDo(t, "POST", []string{"_matrix", "client", "v3", "rooms", roomID, "receipt", "m.read", eventB}, client.WithJSONBody(t, struct{}{})) + aliceNextBatch := alice.MustSyncUntil( t, client.SyncReq{}, syncHasUnthreadedReadReceipt(roomID, alice.UserID, eventB), ) - - // bob over federation must also see the same result, to show that the receipt EDUs over - // federation are bundled correctly, or are sent as separate EDUs. - bob.MustSyncUntil( + bobNextBatch := bob.MustSyncUntil( t, client.SyncReq{}, syncHasUnthreadedReadReceipt(roomID, alice.UserID, eventB), ) + // Now send a clashing threaded RR for the *same* event. Pre-fix this was + // inserted at a later stream position (and federated as a separate EDU) and + // so could be served on its own in a later /sync, incorrectly winning. + alice.MustDo(t, "POST", []string{"_matrix", "client", "v3", "rooms", roomID, "receipt", "m.read", eventB}, client.WithJSONBody(t, map[string]interface{}{"thread_id": eventA})) + + // Send a marker event strictly after the clashing receipt. Once a server has + // delivered this event down /sync, the (earlier) threaded receipt must also + // have been processed and delivered if it was going to be. + eventC := alice.SendEventSynced(t, roomID, b.Event{ + Type: "m.room.message", + Content: map[string]interface{}{ + "msgtype": "m.text", + "body": "Marker", + }, + }) + + // The threaded receipt must never win, neither locally nor over federation. + assertUnthreadedReceiptWins(t, alice, roomID, aliceNextBatch, alice.UserID, eventB, eventA, eventC) + assertUnthreadedReceiptWins(t, bob, roomID, bobNextBatch, alice.UserID, eventB, eventA, eventC) +} + +// assertUnthreadedReceiptWins drives an incremental /sync for `user` from +// `since` until it observes `markerEventID` in the room timeline, and fails the +// test if at any point along the way a threaded read receipt for `clashEventID` +// (in thread `clashThreadID`) from `receiptUserID` was served. The marker event +// must have been sent *after* the clashing threaded receipt, so that observing +// it guarantees the receipt has already been delivered if it was going to be. +func assertUnthreadedReceiptWins(t *testing.T, user *client.CSAPI, roomID, since, receiptUserID, clashEventID, clashThreadID, markerEventID string) { + t.Helper() + sawThreadedClash := false + // We can't use a `syncHasThreadedReadReceipt` check for this: MustSyncUntil + // drops a check once it passes, whereas we need to keep scanning every + // response for the forbidden receipt right up until the marker arrives. So + // we accumulate a flag by hand and use the marker as the (positive) stop + // condition. + marker := client.SyncTimelineHasEventID(roomID, markerEventID) + user.MustSyncUntil(t, client.SyncReq{Since: since}, func(clientUserID string, topLevelSyncJSON gjson.Result) error { + ephemeral := topLevelSyncJSON.Get("rooms.join." + client.GjsonEscape(roomID) + ".ephemeral.events") + for _, ev := range ephemeral.Array() { + if ev.Get("type").Str != "m.receipt" { + continue + } + receipt := ev.Get("content").Get(clashEventID).Get(`m\.read`).Get(receiptUserID) + if receipt.Exists() && receipt.Get("thread_id").Str == clashThreadID { + sawThreadedClash = true + } + } + // Wait until the marker event arrives, by which point any receipt for the + // clashing event must already have been delivered. + return marker(clientUserID, topLevelSyncJSON) + }) + if sawThreadedClash { + t.Fatalf( + "%s saw a threaded read receipt for %s (thread %s) win down /sync; the clashing unthreaded receipt should always win (MSC4102)", + user.UserID, clashEventID, clashThreadID, + ) + } }