diff --git a/README.md b/README.md index 028f18fcab..fc2f1ca320 100644 --- a/README.md +++ b/README.md @@ -67,6 +67,38 @@ You need to pass an object containing API endpoint callbacks as the `apiValue` p 8. Update translations and documentation as needed 9. Open Pull Request +## Troubleshooting Polling and Websockets + +When investigating inconsistent outcomes between websocket updates and polling updates, temporarily add the following logs to compare what each source is emitting and what final polling decision is made. + +Remove these logs after troubleshooting. + +### 1) Polling transport messages + +File: src/utilities/transport/MemberUpdateTransport.ts + +Add a `console.log` with a `polling` label where member and job are returned for polling updates + +### 2) Websocket transport messages + +File: src/utilities/transport/MemberUpdateTransport.ts + +Add a `console.log` with a `websocket` label where member and job are returned for websocket updates: + +### 3) Final polling decision + +File: src/hooks/usePollMember.tsx + +Add a `console.log` with a `final polling decision` label after finalState is computed: + +### Suggested debugging sequence for difficult bugs + +1. Reproduce the flow several times. +2. Compare websocket and polling event order. +3. The final polling decision determines what the UI does + +Edge cases might exist that haven't been handled. For example, it was discovered that the IMPAIRED status for No DDA flows is set _after_ a member is set to CONNECTED first. In very rare cases, websockets were so fast that it was detecting CONNECTED before seeing IMPAIRED and we handled it as a success, incorrectly. + ## Commit Message Requirements _To make commits that trigger a package release, use `npx cz`, it will launch easy to follow commitizen prompts._ diff --git a/src/hooks/__tests__/usePollMember-test.tsx b/src/hooks/__tests__/usePollMember-test.tsx index 024f97ad0d..30a6c602f9 100644 --- a/src/hooks/__tests__/usePollMember-test.tsx +++ b/src/hooks/__tests__/usePollMember-test.tsx @@ -9,6 +9,7 @@ import { Provider } from 'react-redux' import { createReduxStore, RootState } from 'src/redux/Store' import { member, JOB_DATA } from 'src/services/mockedData' import { ReadableStatuses } from 'src/const/Statuses' +import { VERIFY_MODE } from 'src/const/Connect' import { CONNECTING_MESSAGES } from 'src/utilities/pollers' import { take } from 'rxjs/operators' import { Subject } from 'rxjs' @@ -88,7 +89,6 @@ describe('usePollMember', () => { expect(apiValue.loadJob).toHaveBeenCalledWith(connectedMember.most_recent_job_guid) expect(states[0]).toMatchObject({ isError: false, - pollingCount: 1, currentResponse: { member: connectedMember, job: JOB_DATA, @@ -128,7 +128,6 @@ describe('usePollMember', () => { expect(states[0]).toMatchObject({ isError: true, - pollingCount: 1, pollingIsDone: false, }) @@ -314,7 +313,7 @@ describe('usePollMember', () => { subscription.unsubscribe() }) - it('should increment pollingCount on each poll', async () => { + it('should emit sequential states on each poll', async () => { const member1 = { ...member.member, guid: 'MBR-1', most_recent_job_guid: 'JOB-1' } const member2 = { ...member.member, guid: 'MBR-2', most_recent_job_guid: 'JOB-2' } @@ -349,8 +348,14 @@ describe('usePollMember', () => { { timeout: 3500 }, ) - expect(states[0].pollingCount).toBe(1) - expect(states[1].pollingCount).toBe(2) + expect(states[0].currentResponse).toEqual({ + member: member1, + job: { ...JOB_DATA, guid: 'JOB-1' }, + }) + expect(states[1].currentResponse).toEqual({ + member: member2, + job: { ...JOB_DATA, guid: 'JOB-2' }, + }) subscription.unsubscribe() }, 10000) @@ -871,4 +876,136 @@ describe('usePollMember', () => { subscription.unsubscribe() }) + + it('should wait for second CONNECTED update in verify mode before finishing', async () => { + const wsMessages$ = new Subject() + const mockWS = { + isConnected: vi.fn().mockReturnValue(true), + webSocketMessages$: wsMessages$.asObservable(), + } + + const apiValue = { + loadMemberByGuid: vi.fn().mockResolvedValue(member.member), + loadJob: vi.fn().mockResolvedValue(JOB_DATA), + } + + const preloadedState = { + config: { + mode: VERIFY_MODE, + }, + experimentalFeatures: { + useWebSockets: true, + memberPollingMilliseconds: 10000, + }, + } as Partial + + const { result } = renderHook(() => usePollMember(), { + wrapper: createWrapper(apiValue, preloadedState, mockWS), + }) + + const pollMember = result.current + const states: PollingState[] = [] + + const subscription = pollMember('MBR-123').subscribe((state: PollingState) => { + states.push(state) + }) + + wsMessages$.next({ + event: 'members/updated', + payload: { + guid: 'MBR-123', + most_recent_job_guid: 'JOB-123', + connection_status: ReadableStatuses.UPDATED, + is_being_aggregated: false, + }, + }) + + await waitFor(() => expect(states.length).toBe(1)) + expect(states[0].pollingIsDone).toBe(false) + expect(states[0].userMessage).toBe(CONNECTING_MESSAGES.VERIFYING) + + wsMessages$.next({ + event: 'members/updated', + payload: { + guid: 'MBR-123', + most_recent_job_guid: 'JOB-123', + connection_status: ReadableStatuses.CONNECTED, + is_being_aggregated: false, + }, + }) + + await waitFor(() => expect(states.length).toBe(2)) + expect(states[1].pollingIsDone).toBe(false) + expect(states[1].userMessage).toBe(CONNECTING_MESSAGES.VERIFYING) + + wsMessages$.next({ + event: 'members/updated', + payload: { + guid: 'MBR-123', + most_recent_job_guid: 'JOB-123', + connection_status: ReadableStatuses.CONNECTED, + is_being_aggregated: false, + }, + }) + + await waitFor(() => expect(states.length).toBe(3)) + expect(states[2].pollingIsDone).toBe(true) + expect(states[2].userMessage).toBe(CONNECTING_MESSAGES.FINISHING) + + subscription.unsubscribe() + }) + + it('should eventually stop polling for an oauth member in an error state that was never aggregating', async () => { + /** + * This tests a specific bug where distinctUntilChanged in the transport layer + * blocks the second identical poll from reaching the scan accumulator. + * + * handlePollingResponse uses isNotAggregatingAtAll: + * previousMember.is_being_aggregated === false && polledMember.is_being_aggregated === false + * + * Without the fix, the second poll is blocked, previousResponse stays as {} (DEFAULT), + * isNotAggregatingAtAll is never true, and the OAuth member polls forever. + */ + const oauthMember = { + ...member.member, + connection_status: ReadableStatuses.PREVENTED, + is_being_aggregated: false, + is_oauth: true, + } + + const apiValue = { + loadMemberByGuid: vi.fn().mockResolvedValue(oauthMember), + loadJob: vi.fn().mockResolvedValue(JOB_DATA), + } + + const preloadedState = { + experimentalFeatures: { + memberPollingMilliseconds: 100, + }, + } + + const { result } = renderHook(() => usePollMember(), { + wrapper: createWrapper(apiValue, preloadedState), + }) + + const pollMember = result.current + const states: PollingState[] = [] + + const subscription = pollMember('MBR-123').subscribe((state: PollingState) => { + states.push(state) + }) + + await waitFor( + () => { + expect(states.some((s) => s.pollingIsDone === true)).toBe(true) + }, + { timeout: 1000 }, + ) + + expect(states.find((s) => s.pollingIsDone === true)?.userMessage).toBe( + CONNECTING_MESSAGES.ERROR, + ) + + subscription.unsubscribe() + }) }) diff --git a/src/hooks/usePollMember.tsx b/src/hooks/usePollMember.tsx index d889905eba..d99d8e187d 100644 --- a/src/hooks/usePollMember.tsx +++ b/src/hooks/usePollMember.tsx @@ -5,15 +5,16 @@ import { useWebSocket } from 'src/context/WebSocketContext' import { useSelector } from 'react-redux' import { getExperimentalFeatures } from 'src/redux/reducers/experimentalFeaturesSlice' -import { scan } from 'rxjs/operators' +import { scan, distinctUntilChanged } from 'rxjs/operators' +import _isEqual from 'lodash/isEqual' import { createMemberUpdateTransport, MemberUpdate, } from 'src/utilities/transport/MemberUpdateTransport' +import type { RootState } from 'src/redux/Store' export interface PollingState { isError: boolean - pollingCount: number currentResponse?: MemberUpdate | Record previousResponse?: MemberUpdate | Record pollingIsDone: boolean @@ -31,6 +32,7 @@ export function usePollMember() { const { optOutOfEarlyUserRelease, memberPollingMilliseconds, useWebSockets } = useSelector(getExperimentalFeatures) + const mode = useSelector((state: RootState) => state.config?.mode) const pollingInterval = memberPollingMilliseconds || 3000 @@ -61,8 +63,6 @@ export function usePollMember() { const pollingState: PollingState = { // only track if the most recent poll was an error isError, - // always increase polling count - pollingCount: acc.pollingCount + 1, // dont update previous response if this is an error previousResponse: isError ? acc.previousResponse : acc.currentResponse, // dont update current response if this is an error @@ -82,17 +82,48 @@ export function usePollMember() { pollingState.initialDataReady = true } - const [shouldStopPolling, messageKey] = handlePollingResponse(pollingState) + const [shouldStopPolling, messageKey] = handlePollingResponse(pollingState, mode) - return { + const finalState = { ...pollingState, // we should keep polling based on the member pollingIsDone: isError ? false : shouldStopPolling, userMessage: messageKey, } + + return finalState }, { ...DEFAULT_POLLING_STATE } as PollingState, ), + // Deduplicate consecutive identical polling states to prevent unnecessary re-renders. + // This must live here — after the scan — so the scan always sees every update + // and can correctly track previousResponse/currentResponse transitions. Placing + // distinctUntilChanged earlier (in the transport) caused the scan to miss + // identical consecutive polls, breaking isNotAggregatingAtAll detection in + // handlePollingResponse and causing OAuth members to poll indefinitely. + distinctUntilChanged((prev: PollingState, curr: PollingState) => { + if (prev.isError !== curr.isError) return false + if (prev.pollingIsDone !== curr.pollingIsDone) return false + if (prev.userMessage !== curr.userMessage) return false + if (prev.initialDataReady !== curr.initialDataReady) return false + + const prevMember = (prev.currentResponse as MemberUpdate)?.member + const currMember = (curr.currentResponse as MemberUpdate)?.member + const prevJob = (prev.currentResponse as MemberUpdate)?.job + const currJob = (curr.currentResponse as MemberUpdate)?.job + + // Return true to *prevent* emitting the event + // Return false to emit the event + return ( + prevMember?.connection_status === currMember?.connection_status && + _isEqual(prevMember?.mfa, currMember?.mfa) && + prevMember?.is_being_aggregated === currMember?.is_being_aggregated && + prevMember?.most_recent_job_detail_code === currMember?.most_recent_job_detail_code && + prevMember?.error?.error_code === currMember?.error?.error_code && + prevJob?.guid === currJob?.guid && + prevJob?.async_account_data_ready === currJob?.async_account_data_ready + ) + }), ) } diff --git a/src/utilities/__tests__/pollers-test.js b/src/utilities/__tests__/pollers-test.js index 3e49c953f5..b46d77efec 100644 --- a/src/utilities/__tests__/pollers-test.js +++ b/src/utilities/__tests__/pollers-test.js @@ -3,8 +3,15 @@ import { DEFAULT_POLLING_STATE, CONNECTING_MESSAGES, } from 'src/utilities/pollers' +import { AGG_MODE, VERIFY_MODE } from 'src/const/Connect' import { ErrorStatuses, ProcessingStatuses, ReadableStatuses } from 'src/const/Statuses' +// CHALLENGED is intentionally excluded from error assertions because +// handlePollingResponse routes it through the MFA-specific branch and message. +const nonChallengedErrorStatuses = ErrorStatuses.filter( + (status) => status !== ReadableStatuses.CHALLENGED, +) + describe('handlePollingResponse', () => { test('it should stop polling and update the message', () => { testStatus(ReadableStatuses.CHALLENGED, true, CONNECTING_MESSAGES.MFA) @@ -60,18 +67,87 @@ describe('handlePollingResponse', () => { expect(message).toEqual(CONNECTING_MESSAGES.FINISHING) }) + test('should keep polling on first CONNECTED update in verify mode', () => { + const pollingState = { + ...DEFAULT_POLLING_STATE, + currentResponse: { + member: { + is_being_aggregated: false, + connection_status: ReadableStatuses.CONNECTED, + }, + }, + previousResponse: { + member: { + is_being_aggregated: false, + connection_status: ReadableStatuses.UPDATED, + }, + }, + } + + const [stopPolling, message] = handlePollingResponse(pollingState, VERIFY_MODE) + + expect(stopPolling).toEqual(false) + expect(message).toEqual(CONNECTING_MESSAGES.VERIFYING) + }) + + /** + * See the "Second CONNECTED message reasoning" comment in pollers.js for reasoning behind this test + */ + test('should stop polling on second consecutive CONNECTED update in verify mode', () => { + const pollingState = { + ...DEFAULT_POLLING_STATE, + currentResponse: { + member: { + is_being_aggregated: false, + connection_status: ReadableStatuses.CONNECTED, + }, + }, + previousResponse: { + member: { + is_being_aggregated: false, + connection_status: ReadableStatuses.CONNECTED, + }, + }, + } + + const [stopPolling, message] = handlePollingResponse(pollingState, VERIFY_MODE) + + expect(stopPolling).toEqual(true) + expect(message).toEqual(CONNECTING_MESSAGES.FINISHING) + }) + + test('should preserve immediate CONNECTED finish behavior in aggregation mode', () => { + const pollingState = { + ...DEFAULT_POLLING_STATE, + currentResponse: { + member: { + is_being_aggregated: false, + connection_status: ReadableStatuses.CONNECTED, + }, + }, + previousResponse: { + member: { + is_being_aggregated: false, + connection_status: ReadableStatuses.UPDATED, + }, + }, + } + + const [stopPolling, message] = handlePollingResponse(pollingState, AGG_MODE) + + expect(stopPolling).toEqual(true) + expect(message).toEqual(CONNECTING_MESSAGES.FINISHING) + }) + describe('Error states', () => { it('should stop polling and show a message', () => { - ErrorStatuses.forEach((status) => { - // CHALLENGED state is an error state, but has specific logic - if (status !== ReadableStatuses.CHALLENGED) { - testStatus(status, true, CONNECTING_MESSAGES.ERROR) - } + nonChallengedErrorStatuses.forEach((status) => { + testStatus(status, true, CONNECTING_MESSAGES.ERROR) }) }) it('should wait for aggregation to be done for error states', () => { - ErrorStatuses.forEach((status) => { + nonChallengedErrorStatuses.forEach((status) => { const pollingState = { ...DEFAULT_POLLING_STATE, currentResponse: { @@ -82,13 +158,10 @@ describe('handlePollingResponse', () => { }, } - // CHALLENGED state is an error state, but has specific logic - if (status !== ReadableStatuses.CHALLENGED) { - const [stopPolling, message] = handlePollingResponse(pollingState) + const [stopPolling, message] = handlePollingResponse(pollingState) - expect(stopPolling).toEqual(false) - expect(message).toEqual(CONNECTING_MESSAGES.VERIFYING) - } + expect(stopPolling).toEqual(false) + expect(message).toEqual(CONNECTING_MESSAGES.VERIFYING) }) }) @@ -118,9 +191,9 @@ describe('handlePollingResponse', () => { }) }) - describe('OAuth status', () => { + describe('Terminal error code handling', () => { it('should keep polling and show the OAuth message if in error, but not finished agging', () => { - ErrorStatuses.forEach((status) => { + nonChallengedErrorStatuses.forEach((status) => { const pollingState = { ...DEFAULT_POLLING_STATE, currentResponse: { @@ -132,17 +205,15 @@ describe('handlePollingResponse', () => { }, } - if (status !== ReadableStatuses.CHALLENGED) { - const [stopPolling, message] = handlePollingResponse(pollingState) + const [stopPolling, message] = handlePollingResponse(pollingState) - expect(message).toEqual(CONNECTING_MESSAGES.OAUTH) - expect(stopPolling).toEqual(false) - } + expect(message).toEqual(CONNECTING_MESSAGES.OAUTH) + expect(stopPolling).toEqual(false) }) }) it('should go to error view if we are done aggregating', () => { - ErrorStatuses.forEach((status) => { + nonChallengedErrorStatuses.forEach((status) => { const pollingState = { ...DEFAULT_POLLING_STATE, currentResponse: { @@ -161,12 +232,31 @@ describe('handlePollingResponse', () => { }, } - if (status !== ReadableStatuses.CHALLENGED) { - const [stopPolling, message] = handlePollingResponse(pollingState) + const [stopPolling, message] = handlePollingResponse(pollingState) - expect(message).toEqual(CONNECTING_MESSAGES.ERROR) - expect(stopPolling).toEqual(true) + expect(message).toEqual(CONNECTING_MESSAGES.ERROR) + expect(stopPolling).toEqual(true) + }) + }) + + it('should stop polling when a terminal error code is present, oauth is true, and there is no previous response', () => { + nonChallengedErrorStatuses.forEach((status) => { + const pollingState = { + ...DEFAULT_POLLING_STATE, + currentResponse: { + member: { + connection_status: status, + is_being_aggregated: false, + is_oauth: true, + error: { error_code: 'ANY_ERROR_CODE' }, + }, + }, } + + const [stopPolling, message] = handlePollingResponse(pollingState) + + expect(message).toEqual(CONNECTING_MESSAGES.ERROR) + expect(stopPolling).toEqual(true) }) }) }) diff --git a/src/utilities/pollers.js b/src/utilities/pollers.js index 87889e1803..9aa67f145a 100644 --- a/src/utilities/pollers.js +++ b/src/utilities/pollers.js @@ -2,6 +2,7 @@ import { defer, interval, of } from 'rxjs' import { catchError, scan, filter, exhaustMap } from 'rxjs/operators' import { ErrorStatuses, ProcessingStatuses, ReadableStatuses } from 'src/const/Statuses' +import { AGG_MODE, VERIFY_MODE } from 'src/const/Connect' import { __ } from 'src/utilities/Intl' import { OauthState } from 'src/const/consts' @@ -18,7 +19,6 @@ export const CONNECTING_MESSAGES = { export const DEFAULT_POLLING_STATE = { isError: false, // whether or not the last poll was an error - pollingCount: 0, // used to count how many times we have polled previousResponse: {}, // previous response from last poll currentResponse: {}, // current response pollingIsDone: false, // whether or not we should stop polling @@ -26,7 +26,7 @@ export const DEFAULT_POLLING_STATE = { initialDataReady: false, // whether the initial data ready event has been sent } -export function handlePollingResponse(pollingState) { +export function handlePollingResponse(pollingState, mode = AGG_MODE) { const polledMember = pollingState.currentResponse?.member || {} const previousMember = pollingState.previousResponse?.member || {} const initialDataReady = pollingState.initialDataReady @@ -56,6 +56,22 @@ export function handlePollingResponse(pollingState) { return [false, CONNECTING_MESSAGES.SYNCING] } + /** + * Second CONNECTED message reasoning + * -------------------------- + * The gist of the problem, is that an asynchronous edge case exists... + * What sometimes happens behind the scenes of a member record that hits the No DDA flow, as revealed by websocket member update messages. + * The member goes to CONNECTED (no errors) - yay success! + * The member then goes to IMPEDED (with errors) - oh no, error... + * In some rare instances, 1 in 6, or sometimes 1 in 20 attempts, the widget would catch the CONNECTED status and show the success screen. + * ... But, it should actually be showing the IMPAIRED screen... + * + * With a second confirmed CONNECTED message we're more confident that it should actually be a success instead of an error. + */ + if (mode === VERIFY_MODE && previousMember.connection_status !== ReadableStatuses.CONNECTED) { + return [false, CONNECTING_MESSAGES.VERIFYING] + } + return [true, CONNECTING_MESSAGES.FINISHING] } @@ -65,11 +81,22 @@ export function handlePollingResponse(pollingState) { return [false, CONNECTING_MESSAGES.VERIFYING] } - // if we aren't aggregating whatsoever and in an error state, stop polling + // if we aren't aggregating whatsoever and in an error state, stop polling, + // even if we don't have an explicit error code. if (isNotAggregatingAtAll && ErrorStatuses.includes(polledMember.connection_status)) { return [true, CONNECTING_MESSAGES.ERROR] } + // if we aren't aggregating, are in an error state, and have an explicit error code already, + // stop polling and show the error message. + if ( + polledMember.is_being_aggregated === false && + ErrorStatuses.includes(polledMember.connection_status) && + Boolean(polledMember.error?.error_code) + ) { + return [true, CONNECTING_MESSAGES.ERROR] + } + /** * If this is an OAuth member, we could be stuck 'connecting' forever if the * user bails out of the authentication process, leaving the member in the @@ -112,8 +139,6 @@ export function pollOauthState(oauthStateGuid, api) { return { // only track if the most recent poll was an error isError, - // always increase polling count - pollingCount: acc.pollingCount + 1, // dont update previous response if this is an error previousResponse: isError ? acc.previousResponse : acc.currentResponse, // dont update current response if this is an error diff --git a/src/utilities/transport/MemberUpdateTransport.ts b/src/utilities/transport/MemberUpdateTransport.ts index c4f492ef36..28f7159684 100644 --- a/src/utilities/transport/MemberUpdateTransport.ts +++ b/src/utilities/transport/MemberUpdateTransport.ts @@ -1,14 +1,5 @@ import { Observable, defer, interval, of, merge } from 'rxjs' -import { - catchError, - map, - mergeMap, - exhaustMap, - filter, - distinctUntilChanged, - scan, -} from 'rxjs/operators' -import _isEqual from 'lodash/isEqual' +import { catchError, map, mergeMap, exhaustMap, filter, scan } from 'rxjs/operators' import type { ApiContextTypes } from 'src/context/ApiContext' import { WebSocketConnection } from 'src/context/WebSocketContext' @@ -83,26 +74,5 @@ export function createMemberUpdateTransport( transport$ = merge(polling$, socket$) } - return transport$.pipe( - distinctUntilChanged((prev, curr) => { - // Don't deduplicate errors - if (prev instanceof Error || curr instanceof Error) return false - - const prevMember = prev.member - const currMember = curr.member - - // Compare the relevant fields to determine if we should emit an update - // Return true to *prevent* emitting the event - // Return false to emit the event - return ( - prevMember?.connection_status === currMember?.connection_status && - _isEqual(prevMember?.mfa, currMember?.mfa) && - prev.job?.guid === curr.job?.guid && - prev.job?.async_account_data_ready === curr.job?.async_account_data_ready && - prevMember?.is_being_aggregated === currMember?.is_being_aggregated && - prevMember?.most_recent_job_detail_code === currMember?.most_recent_job_detail_code && - prevMember?.error?.error_code === currMember?.error?.error_code - ) - }), - ) + return transport$ } diff --git a/src/utilities/transport/__tests__/MemberUpdateTransport-test.ts b/src/utilities/transport/__tests__/MemberUpdateTransport-test.ts index e65405710f..f8b3beea3d 100644 --- a/src/utilities/transport/__tests__/MemberUpdateTransport-test.ts +++ b/src/utilities/transport/__tests__/MemberUpdateTransport-test.ts @@ -218,14 +218,19 @@ describe('MemberUpdateTransport', () => { subscription.unsubscribe() }) - it('should deduplicate identical updates from polling and WebSockets', async () => { + it('should pass through both polling and WebSocket updates without cross-source deduplication', async () => { + /** + * The transport does NOT deduplicate across sources. Deduplication is the + * responsibility of the consumer (usePollMember) so that the scan accumulator + * always sees every update and can correctly track state transitions. + */ const wsMessages$ = new Subject() const mockWS = { isConnected: vi.fn().mockReturnValue(true), webSocketMessages$: wsMessages$.asObservable(), } - // Configure polling to return same data + // Configure polling to return the same member data as the WebSocket will send const jobWithGuid = { ...mockJob, guid: 'JOB-123', async_account_data_ready: false } mockApi.loadMemberByGuid.mockResolvedValue(mockMember) mockApi.loadJob.mockResolvedValue(jobWithGuid) @@ -246,20 +251,20 @@ describe('MemberUpdateTransport', () => { await vi.advanceTimersByTimeAsync(1000) expect(results).toHaveLength(1) - // 2. Emit identical data from WebSocket + // 2. Emit identical data from WebSocket — transport passes it through (no cross-source dedup) wsMessages$.next({ event: 'members/updated', payload: mockMember }) - expect(results).toHaveLength(1) // Still 1 + expect(results).toHaveLength(2) - // 3. Trigger second poll + // 3. Trigger second poll — also passes through await vi.advanceTimersByTimeAsync(1000) - expect(results).toHaveLength(1) + expect(results).toHaveLength(3) - // 4. Emit a DIFFERENT update from WebSocket + // 4. Emit a DIFFERENT update from WebSocket — still passes through const updatedMember = { ...mockMember, connection_status: 3 } wsMessages$.next({ event: 'members/updated', payload: updatedMember }) - expect(results).toHaveLength(2) - expect((results[1] as MemberUpdate).member?.connection_status).toBe(3) + expect(results).toHaveLength(4) + expect((results[3] as MemberUpdate).member?.connection_status).toBe(3) subscription.unsubscribe() }) diff --git a/src/views/connecting/Connecting.js b/src/views/connecting/Connecting.js index 6c60e7ba8e..202ce8b46e 100644 --- a/src/views/connecting/Connecting.js +++ b/src/views/connecting/Connecting.js @@ -50,6 +50,8 @@ import { PostMessageContext } from 'src/ConnectWidget' import { Stack } from '@mui/material' import { usePollMember } from 'src/hooks/usePollMember' +export const CONNECTING_TIMEOUT_MS = 60000 + export const Connecting = (props) => { const { connectConfig, @@ -81,6 +83,7 @@ export const Connecting = (props) => { const analyticFunctions = useContext(AnalyticContext) const { onPostMessage, postMessageEventOverrides } = useContext(PostMessageContext) const connectingRef = useRef(null) + const pollingStartedAtRef = useRef(null) const { api } = useApi() const [message, setMessage] = useState(CONNECTING_MESSAGES.STARTING) @@ -93,12 +96,12 @@ export const Connecting = (props) => { const needsToInitializeJobSchedule = jobSchedule.isInitialized === false function handleMemberPoll(pollingState) { - // If we have been polling for more than 15 attempts (60 seconds) - // show the timeout message + // If polling has run longer than the timeout threshold, show timeout. // Unless this is a PENDING member, then we don't show the timeout // since PENDING may take much longer to resolve. if ( - pollingState.pollingCount > 15 && + pollingStartedAtRef.current !== null && + Date.now() - pollingStartedAtRef.current > CONNECTING_TIMEOUT_MS && pollingState.currentResponse?.member?.connection_status !== ReadableStatuses.PENDING ) { setTimedOut(true) @@ -255,6 +258,8 @@ export const Connecting = (props) => { // If we still need to initialize the job schedule, do nothing if (needsToInitializeJobSchedule || !activeJob) return () => {} + pollingStartedAtRef.current = Date.now() + const connectMember$ = defer(() => { const needsJobStarted = currentMember.is_being_aggregated === false @@ -315,7 +320,10 @@ export const Connecting = (props) => { } }) - return () => connectMember$.unsubscribe() + return () => { + pollingStartedAtRef.current = null + connectMember$.unsubscribe() + } }, [needsToInitializeJobSchedule, activeJob]) /** diff --git a/src/views/connecting/__tests__/Connecting-test.tsx b/src/views/connecting/__tests__/Connecting-test.tsx index 64c0a164b0..7aa9f5a91a 100644 --- a/src/views/connecting/__tests__/Connecting-test.tsx +++ b/src/views/connecting/__tests__/Connecting-test.tsx @@ -1,11 +1,142 @@ import React from 'react' -import { createTestReduxStore, render, screen, waitFor, within } from 'src/utilities/testingLibrary' -import { Connecting } from '../Connecting' +import { interval, map } from 'rxjs' +import { + act, + createTestReduxStore, + render, + screen, + waitFor, + within, +} from 'src/utilities/testingLibrary' +import { CONNECTING_TIMEOUT_MS, Connecting } from '../Connecting' import { PostMessageContext } from 'src/ConnectWidget' import { ApiContextTypes, ApiProvider } from 'src/context/ApiContext' import { POST_MESSAGES } from 'src/const/postMessages' +import { ReadableStatuses } from 'src/const/Statuses' +import { STEPS } from 'src/const/Connect' +import type { PollingState } from 'src/hooks/usePollMember' +import type { MemberUpdate } from 'src/utilities/transport/MemberUpdateTransport' +import * as usePollMemberHook from 'src/hooks/usePollMember' + +const createTimeoutStore = () => + createTestReduxStore({ + connect: { + location: [], + jobSchedule: { + isInitialized: true, + jobs: [ + { + type: 'aggregate', + status: 'active', + guid: 'job-1', + }, + ], + }, + }, + }) + +const createPollingState = (connectionStatus: number): PollingState => { + const memberUpdate = { + member: { + guid: 'member-guid', + connection_status: connectionStatus, + is_being_aggregated: true, + error: undefined, + }, + } as unknown as MemberUpdate + + return { + isError: false, + previousResponse: memberUpdate, + currentResponse: memberUpdate, + pollingIsDone: false, + userMessage: 'syncing', + initialDataReady: false, + } +} + +const ONE_SECOND_BEFORE_TIMEOUT_MS = CONNECTING_TIMEOUT_MS - 1000 +const TWO_SECONDS_AFTER_TIMEOUT_MS = CONNECTING_TIMEOUT_MS + 2000 describe('', () => { + afterEach(() => { + vi.restoreAllMocks() + vi.useRealTimers() + }) + + describe('timeout', () => { + it('does not emit timeout before the expected time, and then emits timeout after the expected time', async () => { + vi.useFakeTimers() + vi.setSystemTime(new Date('2026-01-01T00:00:00Z')) + + const onPostMessage = vi.fn() + + const store = createTimeoutStore() + + vi.spyOn(usePollMemberHook, 'usePollMember').mockReturnValue(() => + interval(1000).pipe(map(() => createPollingState(ReadableStatuses.CONNECTED))), + ) + + const { unmount } = render( + + + , + { store }, + ) + + await act(async () => { + await vi.advanceTimersByTimeAsync(ONE_SECOND_BEFORE_TIMEOUT_MS) + }) + + expect( + onPostMessage.mock.calls.filter((args) => args[0] === 'connect/stepChange').length, + ).toBe(0) + + await act(async () => { + await vi.advanceTimersByTimeAsync( + TWO_SECONDS_AFTER_TIMEOUT_MS - ONE_SECOND_BEFORE_TIMEOUT_MS, + ) + }) + + expect(onPostMessage).toHaveBeenCalledWith('connect/stepChange', { + previous: STEPS.CONNECTING, + current: 'timeOut', + }) + + unmount() + }) + + it('does not emit timeout for pending members even after the expected time', async () => { + vi.useFakeTimers() + vi.setSystemTime(new Date('2026-01-01T00:00:00Z')) + + const onPostMessage = vi.fn() + + const store = createTimeoutStore() + + vi.spyOn(usePollMemberHook, 'usePollMember').mockReturnValue(() => + interval(1000).pipe(map(() => createPollingState(ReadableStatuses.PENDING))), + ) + + const { unmount } = render( + + + , + { store }, + ) + + await act(async () => { + await vi.advanceTimersByTimeAsync(TWO_SECONDS_AFTER_TIMEOUT_MS) + }) + + expect( + onPostMessage.mock.calls.filter((args) => args[0] === 'connect/stepChange').length, + ).toBe(0) + + unmount() + }) + }) + describe('memberStatusUpdate', () => { it('fires the override memberStatusUpdated event if it is provided', async () => { const onPostMessage = vi.fn()