-
Notifications
You must be signed in to change notification settings - Fork 69
Make TestThreadReceiptsInSyncMSC4102 deterministic
#881
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
|
|
@@ -320,8 +320,25 @@ func TestThreadedReceipts(t *testing.T) { | |||||||
| ) | ||||||||
| } | ||||||||
|
|
||||||||
| // Regression test for https://github.com/matrix-org/matrix-spec/issues/1727 | ||||||||
| // Servers should always prefer the unthreaded receipt when there is a clash of receipts | ||||||||
| // Regression test for https://github.com/matrix-org/matrix-spec/issues/1727 (and | ||||||||
| // https://github.com/element-hq/synapse/pull/19838). | ||||||||
| // | ||||||||
| // Servers should always prefer the unthreaded receipt when there is a clash of | ||||||||
| // receipts for the same event, and that preference must be *durable*: it must | ||||||||
| // hold even when the unthreaded and threaded receipts are served in separate | ||||||||
| // `/sync` responses. This happens when the two land at different stream | ||||||||
| // positions, e.g. when they arrive over federation as separate EDUs. A | ||||||||
| // non-durable server that only dedupes at read-time will serve the threaded | ||||||||
| // receipt on its own in a later `/sync`, letting it incorrectly win. | ||||||||
| // | ||||||||
| // To test this deterministically (rather than relying on the receipts happening | ||||||||
| // to be served in the same `/sync` response), each user is made to observe the | ||||||||
| // unthreaded receipt down `/sync` *before* the clashing threaded receipt is | ||||||||
| // sent, so the two are guaranteed to fall in different sync windows. A marker | ||||||||
| // event sent after the threaded receipt then gives a deterministic point by | ||||||||
| // which the threaded receipt must have been delivered if it was going to be. We | ||||||||
| // assert it never is, both for the local user and for a remote user over | ||||||||
| // federation. | ||||||||
| func TestThreadReceiptsInSyncMSC4102(t *testing.T) { | ||||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. MSC4102 only mentions that the de-duplication should happen when assembling EDU's and there is a unthreaded and threaded receipt in the same response. It doesn't say anything about an unthreaded read receipt winning out over new threaded read receipts |
||||||||
| runtime.SkipIf(t, runtime.Dendrite) // not supported | ||||||||
| deployment := complement.Deploy(t, 2) | ||||||||
|
|
@@ -354,23 +371,78 @@ func TestThreadReceiptsInSyncMSC4102(t *testing.T) { | |||||||
| }, | ||||||||
| }, | ||||||||
| }) | ||||||||
| // now send an unthreaded RR for event B and a threaded RR for event B and ensure we see the unthreaded RR | ||||||||
| // down /sync. Non-compliant servers will typically send the last one only. | ||||||||
|
Comment on lines
-357
to
-358
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this test needs to stay trying to send an unthreaded and threaded read receipt in the same But we can't control how a homeserver sends EDU's so I don't think this test can be made deterministic. We need to update it to accommodate the MSC4102 behavior or them being sent across two transactions.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Why?
Sorry, what are you actually suggesting we do?
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the whole point of the MSC4102 and we want the test to stress this behavior,
But we can't control how a homeserver emits EDU's so the best we can do is send them quickly in succession and see what it does. Combining into one EDU is fine (per MSC4102) or separating is fine (depends on how slow the server is to process things and when the request comes in). We need to update the test to accommodate both expectations. |
||||||||
| alice.MustDo(t, "POST", []string{"_matrix", "client", "v3", "rooms", roomID, "receipt", "m.read", eventB}, client.WithJSONBody(t, struct{}{})) | ||||||||
| alice.MustDo(t, "POST", []string{"_matrix", "client", "v3", "rooms", roomID, "receipt", "m.read", eventB}, client.WithJSONBody(t, map[string]interface{}{"thread_id": eventA})) | ||||||||
|
|
||||||||
| alice.MustSyncUntil( | ||||||||
| // Send an unthreaded RR for event B, and wait until both alice and bob have | ||||||||
| // seen it down /sync. This pins the unthreaded receipt to its own stream | ||||||||
| // position / sync window on each server, so the clashing threaded receipt | ||||||||
| // sent next is forced into a *separate* sync response. | ||||||||
| alice.MustDo(t, "POST", []string{"_matrix", "client", "v3", "rooms", roomID, "receipt", "m.read", eventB}, client.WithJSONBody(t, struct{}{})) | ||||||||
| aliceNextBatch := alice.MustSyncUntil( | ||||||||
| t, | ||||||||
| client.SyncReq{}, | ||||||||
| syncHasUnthreadedReadReceipt(roomID, alice.UserID, eventB), | ||||||||
| ) | ||||||||
|
|
||||||||
| // bob over federation must also see the same result, to show that the receipt EDUs over | ||||||||
| // federation are bundled correctly, or are sent as separate EDUs. | ||||||||
| bob.MustSyncUntil( | ||||||||
| bobNextBatch := bob.MustSyncUntil( | ||||||||
| t, | ||||||||
| client.SyncReq{}, | ||||||||
| syncHasUnthreadedReadReceipt(roomID, alice.UserID, eventB), | ||||||||
| ) | ||||||||
|
|
||||||||
| // Now send a clashing threaded RR for the *same* event. Pre-fix this was | ||||||||
| // inserted at a later stream position (and federated as a separate EDU) and | ||||||||
| // so could be served on its own in a later /sync, incorrectly winning. | ||||||||
| alice.MustDo(t, "POST", []string{"_matrix", "client", "v3", "rooms", roomID, "receipt", "m.read", eventB}, client.WithJSONBody(t, map[string]interface{}{"thread_id": eventA})) | ||||||||
|
|
||||||||
| // Send a marker event strictly after the clashing receipt. Once a server has | ||||||||
| // delivered this event down /sync, the (earlier) threaded receipt must also | ||||||||
| // have been processed and delivered if it was going to be. | ||||||||
| eventC := alice.SendEventSynced(t, roomID, b.Event{ | ||||||||
| Type: "m.room.message", | ||||||||
| Content: map[string]interface{}{ | ||||||||
| "msgtype": "m.text", | ||||||||
| "body": "Marker", | ||||||||
| }, | ||||||||
| }) | ||||||||
|
|
||||||||
| // The threaded receipt must never win, neither locally nor over federation. | ||||||||
| assertUnthreadedReceiptWins(t, alice, roomID, aliceNextBatch, alice.UserID, eventB, eventA, eventC) | ||||||||
| assertUnthreadedReceiptWins(t, bob, roomID, bobNextBatch, alice.UserID, eventB, eventA, eventC) | ||||||||
| } | ||||||||
|
|
||||||||
| // assertUnthreadedReceiptWins drives an incremental /sync for `user` from | ||||||||
| // `since` until it observes `markerEventID` in the room timeline, and fails the | ||||||||
| // test if at any point along the way a threaded read receipt for `clashEventID` | ||||||||
| // (in thread `clashThreadID`) from `receiptUserID` was served. The marker event | ||||||||
| // must have been sent *after* the clashing threaded receipt, so that observing | ||||||||
| // it guarantees the receipt has already been delivered if it was going to be. | ||||||||
| func assertUnthreadedReceiptWins(t *testing.T, user *client.CSAPI, roomID, since, receiptUserID, clashEventID, clashThreadID, markerEventID string) { | ||||||||
| t.Helper() | ||||||||
| sawThreadedClash := false | ||||||||
| // We can't use a `syncHasThreadedReadReceipt` check for this: MustSyncUntil | ||||||||
| // drops a check once it passes, whereas we need to keep scanning every | ||||||||
| // response for the forbidden receipt right up until the marker arrives. So | ||||||||
| // we accumulate a flag by hand and use the marker as the (positive) stop | ||||||||
| // condition. | ||||||||
|
Comment on lines
+421
to
+425
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can still use Lines 87 to 89 in bc2b638
|
||||||||
| marker := client.SyncTimelineHasEventID(roomID, markerEventID) | ||||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Although, I don't think it really matters to store and re-use this. |
||||||||
| user.MustSyncUntil(t, client.SyncReq{Since: since}, func(clientUserID string, topLevelSyncJSON gjson.Result) error { | ||||||||
| ephemeral := topLevelSyncJSON.Get("rooms.join." + client.GjsonEscape(roomID) + ".ephemeral.events") | ||||||||
| for _, ev := range ephemeral.Array() { | ||||||||
| if ev.Get("type").Str != "m.receipt" { | ||||||||
| continue | ||||||||
| } | ||||||||
| receipt := ev.Get("content").Get(clashEventID).Get(`m\.read`).Get(receiptUserID) | ||||||||
| if receipt.Exists() && receipt.Get("thread_id").Str == clashThreadID { | ||||||||
| sawThreadedClash = true | ||||||||
| } | ||||||||
| } | ||||||||
| // Wait until the marker event arrives, by which point any receipt for the | ||||||||
| // clashing event must already have been delivered. | ||||||||
| return marker(clientUserID, topLevelSyncJSON) | ||||||||
| }) | ||||||||
| if sawThreadedClash { | ||||||||
| t.Fatalf( | ||||||||
| "%s saw a threaded read receipt for %s (thread %s) win down /sync; the clashing unthreaded receipt should always win (MSC4102)", | ||||||||
| user.UserID, clashEventID, clashThreadID, | ||||||||
| ) | ||||||||
| } | ||||||||
| } | ||||||||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's unclear to me that this is true for all cases.
As far as I can tell, for example, we could also tackle this problem at read-time. In Synapse,
get_linearized_receipts_for_room(...)/get_linearized_receipts_for_rooms(...), could de-duplicate if a previous unthreaded receipt existed. Even in Synapse, we store things as(room_id, receipt_type, user_id, thread_id)after all which accommodates holding onto bothWhich also makes me question whether our approach in element-hq/synapse#19838 is the best (not sure)
(also plays into some of the comments which make some assumptions as well)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is your point that actually you could correctly implement this at read time by checking for other receipts when pulling from the DB? Yes I suppose you could?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, in which case this comment assumes too much