diff --git a/tests/csapi/thread_notifications_test.go b/tests/csapi/thread_notifications_test.go index 33434675..1c9c8751 100644 --- a/tests/csapi/thread_notifications_test.go +++ b/tests/csapi/thread_notifications_test.go @@ -320,8 +320,25 @@ func TestThreadedReceipts(t *testing.T) { ) } -// Regression test for https://github.com/matrix-org/matrix-spec/issues/1727 -// Servers should always prefer the unthreaded receipt when there is a clash of receipts +// Regression test for https://github.com/matrix-org/matrix-spec/issues/1727 (and +// https://github.com/element-hq/synapse/pull/19838). +// +// Servers should always prefer the unthreaded receipt when there is a clash of +// receipts for the same event, and that preference must be *durable*: it must +// hold even when the unthreaded and threaded receipts are served in separate +// `/sync` responses. This happens when the two land at different stream +// positions, e.g. when they arrive over federation as separate EDUs. A +// non-durable server that only dedupes at read-time will serve the threaded +// receipt on its own in a later `/sync`, letting it incorrectly win. +// +// To test this deterministically (rather than relying on the receipts happening +// to be served in the same `/sync` response), each user is made to observe the +// unthreaded receipt down `/sync` *before* the clashing threaded receipt is +// sent, so the two are guaranteed to fall in different sync windows. A marker +// event sent after the threaded receipt then gives a deterministic point by +// which the threaded receipt must have been delivered if it was going to be. We +// assert it never is, both for the local user and for a remote user over +// federation. func TestThreadReceiptsInSyncMSC4102(t *testing.T) { runtime.SkipIf(t, runtime.Dendrite) // not supported deployment := complement.Deploy(t, 2) @@ -354,23 +371,78 @@ func TestThreadReceiptsInSyncMSC4102(t *testing.T) { }, }, }) - // now send an unthreaded RR for event B and a threaded RR for event B and ensure we see the unthreaded RR - // down /sync. Non-compliant servers will typically send the last one only. - alice.MustDo(t, "POST", []string{"_matrix", "client", "v3", "rooms", roomID, "receipt", "m.read", eventB}, client.WithJSONBody(t, struct{}{})) - alice.MustDo(t, "POST", []string{"_matrix", "client", "v3", "rooms", roomID, "receipt", "m.read", eventB}, client.WithJSONBody(t, map[string]interface{}{"thread_id": eventA})) - alice.MustSyncUntil( + // Send an unthreaded RR for event B, and wait until both alice and bob have + // seen it down /sync. This pins the unthreaded receipt to its own stream + // position / sync window on each server, so the clashing threaded receipt + // sent next is forced into a *separate* sync response. + alice.MustDo(t, "POST", []string{"_matrix", "client", "v3", "rooms", roomID, "receipt", "m.read", eventB}, client.WithJSONBody(t, struct{}{})) + aliceNextBatch := alice.MustSyncUntil( t, client.SyncReq{}, syncHasUnthreadedReadReceipt(roomID, alice.UserID, eventB), ) - - // bob over federation must also see the same result, to show that the receipt EDUs over - // federation are bundled correctly, or are sent as separate EDUs. - bob.MustSyncUntil( + bobNextBatch := bob.MustSyncUntil( t, client.SyncReq{}, syncHasUnthreadedReadReceipt(roomID, alice.UserID, eventB), ) + // Now send a clashing threaded RR for the *same* event. Pre-fix this was + // inserted at a later stream position (and federated as a separate EDU) and + // so could be served on its own in a later /sync, incorrectly winning. + alice.MustDo(t, "POST", []string{"_matrix", "client", "v3", "rooms", roomID, "receipt", "m.read", eventB}, client.WithJSONBody(t, map[string]interface{}{"thread_id": eventA})) + + // Send a marker event strictly after the clashing receipt. Once a server has + // delivered this event down /sync, the (earlier) threaded receipt must also + // have been processed and delivered if it was going to be. + eventC := alice.SendEventSynced(t, roomID, b.Event{ + Type: "m.room.message", + Content: map[string]interface{}{ + "msgtype": "m.text", + "body": "Marker", + }, + }) + + // The threaded receipt must never win, neither locally nor over federation. + assertUnthreadedReceiptWins(t, alice, roomID, aliceNextBatch, alice.UserID, eventB, eventA, eventC) + assertUnthreadedReceiptWins(t, bob, roomID, bobNextBatch, alice.UserID, eventB, eventA, eventC) +} + +// assertUnthreadedReceiptWins drives an incremental /sync for `user` from +// `since` until it observes `markerEventID` in the room timeline, and fails the +// test if at any point along the way a threaded read receipt for `clashEventID` +// (in thread `clashThreadID`) from `receiptUserID` was served. The marker event +// must have been sent *after* the clashing threaded receipt, so that observing +// it guarantees the receipt has already been delivered if it was going to be. +func assertUnthreadedReceiptWins(t *testing.T, user *client.CSAPI, roomID, since, receiptUserID, clashEventID, clashThreadID, markerEventID string) { + t.Helper() + sawThreadedClash := false + // We can't use a `syncHasThreadedReadReceipt` check for this: MustSyncUntil + // drops a check once it passes, whereas we need to keep scanning every + // response for the forbidden receipt right up until the marker arrives. So + // we accumulate a flag by hand and use the marker as the (positive) stop + // condition. + marker := client.SyncTimelineHasEventID(roomID, markerEventID) + user.MustSyncUntil(t, client.SyncReq{Since: since}, func(clientUserID string, topLevelSyncJSON gjson.Result) error { + ephemeral := topLevelSyncJSON.Get("rooms.join." + client.GjsonEscape(roomID) + ".ephemeral.events") + for _, ev := range ephemeral.Array() { + if ev.Get("type").Str != "m.receipt" { + continue + } + receipt := ev.Get("content").Get(clashEventID).Get(`m\.read`).Get(receiptUserID) + if receipt.Exists() && receipt.Get("thread_id").Str == clashThreadID { + sawThreadedClash = true + } + } + // Wait until the marker event arrives, by which point any receipt for the + // clashing event must already have been delivered. + return marker(clientUserID, topLevelSyncJSON) + }) + if sawThreadedClash { + t.Fatalf( + "%s saw a threaded read receipt for %s (thread %s) win down /sync; the clashing unthreaded receipt should always win (MSC4102)", + user.UserID, clashEventID, clashThreadID, + ) + } }