diff --git a/app/api/session/stop/route.ts b/app/api/session/stop/route.ts index d7bbb43a2..184860d7f 100644 --- a/app/api/session/stop/route.ts +++ b/app/api/session/stop/route.ts @@ -22,11 +22,13 @@ const AGENT_WORKER_READINESS_TIMEOUT_MS = readPositiveIntEnv( 10_000 ); const AGENT_WORKER_LOG_TAIL_BYTES = readPositiveIntEnv('AGENT_WORKER_LOG_TAIL_BYTES', 256 * 1024); +const ROOM_INPUT_STOP_TIMEOUT_MS = readPositiveIntEnv('ROOM_INPUT_STOP_TIMEOUT_MS', 3_000); type StopResult = { target: string; ok: boolean; skipped?: boolean; + fatal?: boolean; status?: number; error?: string; dispatch_ids?: string[]; @@ -134,6 +136,77 @@ function shouldWaitForLocalAgentWorkerReadiness(): boolean { return inputSource !== 'browser' && !(inputSource === 'mixed' && usesBrowserOnlyMixedInput()); } +function shouldStopRoomInput(): boolean { + return shouldWaitForLocalAgentWorkerReadiness(); +} + +function normalizeRoomInputControlUrl(rawUrl: string, action: 'start' | 'stop'): string { + const value = rawUrl.trim(); + if (!value) { + return ''; + } + + try { + const url = new URL(value); + const pathname = url.pathname.replace(/\/+$/, ''); + const otherAction = action === 'stop' ? 'start' : 'stop'; + if (pathname.endsWith(`/${otherAction}`)) { + url.pathname = `${pathname.slice(0, -1 * (otherAction.length + 1))}/${action}`; + } else if (!pathname.endsWith(`/${action}`)) { + url.pathname = `${pathname}/${action}`; + } + url.search = ''; + url.hash = ''; + return url.toString(); + } catch { + const withoutTrailingSlash = value.replace(/\/+$/, ''); + if (withoutTrailingSlash.endsWith(`/${action}`)) { + return withoutTrailingSlash; + } + + const otherAction = action === 'stop' ? 'start' : 'stop'; + if (withoutTrailingSlash.endsWith(`/${otherAction}`)) { + return `${withoutTrailingSlash.slice(0, -1 * (otherAction.length + 1))}/${action}`; + } + return `${withoutTrailingSlash}/${action}`; + } +} + +function resolveRoomInputStopUrls(): string[] { + if (!shouldStopRoomInput()) { + return []; + } + + const inputSource = readStopInputSource(); + const candidateUrls = [ + readStopEnv('ROOM_AUDIO_INPUT_URL'), + readStopEnv('ROOM_VISION_INPUT_URL'), + readStopEnv('ROOM_INPUT_URL'), + ]; + + if (inputSource === 'xunfei' || inputSource === 'mixed') { + candidateUrls.push( + readStopEnv('FRONTDESK_INPUT_PARTICIPANT_URL'), + readStopEnv('FACE_SERVICE_URL') + ); + } + if (inputSource === 'generic' || inputSource === 'mixed') { + candidateUrls.push(readStopEnv('GENERIC_CAMERA_PARTICIPANT_URL')); + } + if (inputSource === 'primebot' || inputSource === 'mixed') { + candidateUrls.push(readStopEnv('PRIMEBOT_INPUT_PARTICIPANT_URL')); + } + + const urls = new Set(); + for (const candidate of candidateUrls) { + const stopUrl = normalizeRoomInputControlUrl(candidate, 'stop'); + if (stopUrl) { + urls.add(stopUrl); + } + } + return [...urls]; +} + function resolveLocalLiveKitServerLogPath(): string { const runLogDir = process.env.LEXVOICE_RUN_LOG_DIR?.trim(); return runLogDir ? path.join(runLogDir, 'server.log') : ''; @@ -272,6 +345,53 @@ async function waitForPendingDispatches(roomName: string, sessionId: string): Pr return { target: 'agent_dispatch_barrier', ok: true }; } +async function postRoomInputStop( + stopUrl: string, + roomName: string, + sessionId: string +): Promise { + const controller = new AbortController(); + const timeout = setTimeout(() => controller.abort(), ROOM_INPUT_STOP_TIMEOUT_MS); + try { + const response = await fetch(stopUrl, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ room_name: roomName, session_id: sessionId }), + signal: controller.signal, + }); + + if (response.ok) { + return { target: 'room_input', ok: true, status: response.status }; + } + + return { + target: 'room_input', + ok: false, + fatal: false, + status: response.status, + error: `room-input stop returned HTTP ${response.status}`, + }; + } catch (error) { + return { + target: 'room_input', + ok: false, + fatal: false, + error: error instanceof Error ? error.message : String(error), + }; + } finally { + clearTimeout(timeout); + } +} + +async function stopRoomInput(roomName: string, sessionId: string): Promise { + const stopUrls = resolveRoomInputStopUrls(); + if (stopUrls.length === 0) { + return [{ target: 'room_input', ok: true, skipped: true }]; + } + + return Promise.all(stopUrls.map((stopUrl) => postRoomInputStop(stopUrl, roomName, sessionId))); +} + async function runRemoteSessionCleanup( roomName: string, sessionId: string, @@ -279,9 +399,15 @@ async function runRemoteSessionCleanup( dispatchIds: string[] ): Promise<{ results: StopResult[]; failures: StopResult[] }> { const dispatchBarrierResult = await waitForPendingDispatches(roomName, sessionId); + const roomInputResults = await stopRoomInput(roomName, sessionId); const liveKitRoomResult = await deleteLiveKitRoom(roomName); const agentWorkerReadinessResult = await waitForLocalAgentWorkerReadiness(); - const cleanupResults = [dispatchBarrierResult, liveKitRoomResult, agentWorkerReadinessResult]; + const cleanupResults = [ + dispatchBarrierResult, + ...roomInputResults, + liveKitRoomResult, + agentWorkerReadinessResult, + ]; const results = [ { target: 'session_registry', @@ -291,12 +417,18 @@ async function runRemoteSessionCleanup( dispatchResult, ...cleanupResults, ]; - const failures = results.filter((result) => !result.ok && !result.skipped); + const failures = results.filter( + (result) => !result.ok && !result.skipped && result.fatal !== false + ); + const bestEffortFailures = results.filter( + (result) => !result.ok && !result.skipped && result.fatal === false + ); console.info('agent session remote cleanup completed', { roomName, sessionId, results, failures, + bestEffortFailures, }); markRoomSessionStopped(roomName, sessionId); return { results, failures }; diff --git a/tests/session-stop.test.mjs b/tests/session-stop.test.mjs index dc987072f..91205757b 100644 --- a/tests/session-stop.test.mjs +++ b/tests/session-stop.test.mjs @@ -35,16 +35,25 @@ test('maps livekit websocket URLs to server API URLs', () => { assert.equal(resolveLiveKitHttpUrl('https://livekit.example'), 'https://livekit.example'); }); -test('session stop route does not call the room-input control endpoint', async () => { +test('session stop route can call the room-input control endpoint before deleting the room', async () => { const routeSource = await readFile( new URL('../app/api/session/stop/route.ts', import.meta.url), 'utf8' ); - assert.doesNotMatch(routeSource, /process\.env\.ROOM_INPUT_URL/); - assert.doesNotMatch(routeSource, /resolveRoomInputStopUrl/); - assert.doesNotMatch(routeSource, /stopRoomInput/); - assert.doesNotMatch(routeSource, /GENERIC_CAMERA_PARTICIPANT_URL/); + const cleanupSource = routeSource.match(/async function runRemoteSessionCleanup[\s\S]*?\n}/)?.[0]; + + assert.ok(cleanupSource, 'runRemoteSessionCleanup should be defined'); + assert.match(routeSource, /readStopEnv\('ROOM_INPUT_URL'\)/); + assert.match(routeSource, /resolveRoomInputStopUrls/); + assert.match(routeSource, /stopRoomInput/); + assert.match(routeSource, /FRONTDESK_INPUT_PARTICIPANT_URL/); + assert.match(routeSource, /FACE_SERVICE_URL/); + assert.match(routeSource, /GENERIC_CAMERA_PARTICIPANT_URL/); + assert.match( + cleanupSource, + /const roomInputResults = await stopRoomInput\(roomName, sessionId\);[\s\S]*const liveKitRoomResult = await deleteLiveKitRoom\(roomName\);/ + ); }); test('session stop route cancels room session before remote cleanup', async () => { @@ -84,6 +93,7 @@ test('session stop route deletes the LiveKit room after the dispatch barrier', a ); assert.match(routeSource, /await waitForPendingDispatches\(roomName, sessionId\)/); + assert.match(routeSource, /await stopRoomInput\(roomName, sessionId\)/); assert.match(routeSource, /deleteLiveKitRoom\(roomName\)/); }); @@ -105,7 +115,7 @@ test('session stop route waits for local agent worker readiness before finishing assert.match(cleanupSource, /await waitForLocalAgentWorkerReadiness\(\)/); assert.match( cleanupSource, - /const cleanupResults = \[dispatchBarrierResult, liveKitRoomResult, agentWorkerReadinessResult\]/ + /const cleanupResults = \[\s*dispatchBarrierResult,\s*\.\.\.roomInputResults,\s*liveKitRoomResult,\s*agentWorkerReadinessResult,\s*\]/ ); }); @@ -147,6 +157,7 @@ test('session stop route closes the registry even when remote cleanup is partial assert.ok(cleanupSource, 'runRemoteSessionCleanup should be defined'); assert.match(cleanupSource, /const failures = results\.filter/); + assert.match(cleanupSource, /result\.fatal !== false/); assert.match( cleanupSource, /markRoomSessionStopped\(roomName, sessionId\);\s*return \{ results, failures \};/