Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 134 additions & 2 deletions app/api/session/stop/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ const AGENT_WORKER_READINESS_TIMEOUT_MS = readPositiveIntEnv(
10_000
);
const AGENT_WORKER_LOG_TAIL_BYTES = readPositiveIntEnv('AGENT_WORKER_LOG_TAIL_BYTES', 256 * 1024);
const ROOM_INPUT_STOP_TIMEOUT_MS = readPositiveIntEnv('ROOM_INPUT_STOP_TIMEOUT_MS', 3_000);

type StopResult = {
target: string;
ok: boolean;
skipped?: boolean;
fatal?: boolean;
status?: number;
error?: string;
dispatch_ids?: string[];
Expand Down Expand Up @@ -134,6 +136,77 @@ function shouldWaitForLocalAgentWorkerReadiness(): boolean {
return inputSource !== 'browser' && !(inputSource === 'mixed' && usesBrowserOnlyMixedInput());
}

function shouldStopRoomInput(): boolean {
return shouldWaitForLocalAgentWorkerReadiness();
}

function normalizeRoomInputControlUrl(rawUrl: string, action: 'start' | 'stop'): string {
const value = rawUrl.trim();
if (!value) {
return '';
}

try {
const url = new URL(value);
const pathname = url.pathname.replace(/\/+$/, '');
const otherAction = action === 'stop' ? 'start' : 'stop';
if (pathname.endsWith(`/${otherAction}`)) {
url.pathname = `${pathname.slice(0, -1 * (otherAction.length + 1))}/${action}`;
} else if (!pathname.endsWith(`/${action}`)) {
url.pathname = `${pathname}/${action}`;
}
url.search = '';
url.hash = '';
return url.toString();
} catch {
const withoutTrailingSlash = value.replace(/\/+$/, '');
if (withoutTrailingSlash.endsWith(`/${action}`)) {
return withoutTrailingSlash;
}

const otherAction = action === 'stop' ? 'start' : 'stop';
if (withoutTrailingSlash.endsWith(`/${otherAction}`)) {
return `${withoutTrailingSlash.slice(0, -1 * (otherAction.length + 1))}/${action}`;
}
return `${withoutTrailingSlash}/${action}`;
}
}

function resolveRoomInputStopUrls(): string[] {
if (!shouldStopRoomInput()) {
return [];
}

const inputSource = readStopInputSource();
const candidateUrls = [
readStopEnv('ROOM_AUDIO_INPUT_URL'),
readStopEnv('ROOM_VISION_INPUT_URL'),
readStopEnv('ROOM_INPUT_URL'),
];

if (inputSource === 'xunfei' || inputSource === 'mixed') {
candidateUrls.push(
readStopEnv('FRONTDESK_INPUT_PARTICIPANT_URL'),
readStopEnv('FACE_SERVICE_URL')
);
}
if (inputSource === 'generic' || inputSource === 'mixed') {
candidateUrls.push(readStopEnv('GENERIC_CAMERA_PARTICIPANT_URL'));
}
if (inputSource === 'primebot' || inputSource === 'mixed') {
candidateUrls.push(readStopEnv('PRIMEBOT_INPUT_PARTICIPANT_URL'));
}

const urls = new Set<string>();
for (const candidate of candidateUrls) {
const stopUrl = normalizeRoomInputControlUrl(candidate, 'stop');
if (stopUrl) {
urls.add(stopUrl);
}
}
return [...urls];
}

function resolveLocalLiveKitServerLogPath(): string {
const runLogDir = process.env.LEXVOICE_RUN_LOG_DIR?.trim();
return runLogDir ? path.join(runLogDir, 'server.log') : '';
Expand Down Expand Up @@ -272,16 +345,69 @@ async function waitForPendingDispatches(roomName: string, sessionId: string): Pr
return { target: 'agent_dispatch_barrier', ok: true };
}

async function postRoomInputStop(
stopUrl: string,
roomName: string,
sessionId: string
): Promise<StopResult> {
const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), ROOM_INPUT_STOP_TIMEOUT_MS);
try {
const response = await fetch(stopUrl, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ room_name: roomName, session_id: sessionId }),
signal: controller.signal,
});

if (response.ok) {
return { target: 'room_input', ok: true, status: response.status };
}

return {
target: 'room_input',
ok: false,
fatal: false,
status: response.status,
error: `room-input stop returned HTTP ${response.status}`,
};
} catch (error) {
return {
target: 'room_input',
ok: false,
fatal: false,
error: error instanceof Error ? error.message : String(error),
};
} finally {
clearTimeout(timeout);
}
}

async function stopRoomInput(roomName: string, sessionId: string): Promise<StopResult[]> {
const stopUrls = resolveRoomInputStopUrls();
if (stopUrls.length === 0) {
return [{ target: 'room_input', ok: true, skipped: true }];
}

return Promise.all(stopUrls.map((stopUrl) => postRoomInputStop(stopUrl, roomName, sessionId)));
}

async function runRemoteSessionCleanup(
roomName: string,
sessionId: string,
dispatchResult: StopResult,
dispatchIds: string[]
): Promise<{ results: StopResult[]; failures: StopResult[] }> {
const dispatchBarrierResult = await waitForPendingDispatches(roomName, sessionId);
const roomInputResults = await stopRoomInput(roomName, sessionId);
const liveKitRoomResult = await deleteLiveKitRoom(roomName);
const agentWorkerReadinessResult = await waitForLocalAgentWorkerReadiness();
const cleanupResults = [dispatchBarrierResult, liveKitRoomResult, agentWorkerReadinessResult];
const cleanupResults = [
dispatchBarrierResult,
...roomInputResults,
liveKitRoomResult,
agentWorkerReadinessResult,
];
const results = [
{
target: 'session_registry',
Expand All @@ -291,12 +417,18 @@ async function runRemoteSessionCleanup(
dispatchResult,
...cleanupResults,
];
const failures = results.filter((result) => !result.ok && !result.skipped);
const failures = results.filter(
(result) => !result.ok && !result.skipped && result.fatal !== false
);
const bestEffortFailures = results.filter(
(result) => !result.ok && !result.skipped && result.fatal === false
);
console.info('agent session remote cleanup completed', {
roomName,
sessionId,
results,
failures,
bestEffortFailures,
});
markRoomSessionStopped(roomName, sessionId);
return { results, failures };
Expand Down
23 changes: 17 additions & 6 deletions tests/session-stop.test.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,25 @@ test('maps livekit websocket URLs to server API URLs', () => {
assert.equal(resolveLiveKitHttpUrl('https://livekit.example'), 'https://livekit.example');
});

test('session stop route does not call the room-input control endpoint', async () => {
test('session stop route can call the room-input control endpoint before deleting the room', async () => {
const routeSource = await readFile(
new URL('../app/api/session/stop/route.ts', import.meta.url),
'utf8'
);

assert.doesNotMatch(routeSource, /process\.env\.ROOM_INPUT_URL/);
assert.doesNotMatch(routeSource, /resolveRoomInputStopUrl/);
assert.doesNotMatch(routeSource, /stopRoomInput/);
assert.doesNotMatch(routeSource, /GENERIC_CAMERA_PARTICIPANT_URL/);
const cleanupSource = routeSource.match(/async function runRemoteSessionCleanup[\s\S]*?\n}/)?.[0];

assert.ok(cleanupSource, 'runRemoteSessionCleanup should be defined');
assert.match(routeSource, /readStopEnv\('ROOM_INPUT_URL'\)/);
assert.match(routeSource, /resolveRoomInputStopUrls/);
assert.match(routeSource, /stopRoomInput/);
assert.match(routeSource, /FRONTDESK_INPUT_PARTICIPANT_URL/);
assert.match(routeSource, /FACE_SERVICE_URL/);
assert.match(routeSource, /GENERIC_CAMERA_PARTICIPANT_URL/);
assert.match(
cleanupSource,
/const roomInputResults = await stopRoomInput\(roomName, sessionId\);[\s\S]*const liveKitRoomResult = await deleteLiveKitRoom\(roomName\);/
);
});

test('session stop route cancels room session before remote cleanup', async () => {
Expand Down Expand Up @@ -84,6 +93,7 @@ test('session stop route deletes the LiveKit room after the dispatch barrier', a
);

assert.match(routeSource, /await waitForPendingDispatches\(roomName, sessionId\)/);
assert.match(routeSource, /await stopRoomInput\(roomName, sessionId\)/);
assert.match(routeSource, /deleteLiveKitRoom\(roomName\)/);
});

Expand All @@ -105,7 +115,7 @@ test('session stop route waits for local agent worker readiness before finishing
assert.match(cleanupSource, /await waitForLocalAgentWorkerReadiness\(\)/);
assert.match(
cleanupSource,
/const cleanupResults = \[dispatchBarrierResult, liveKitRoomResult, agentWorkerReadinessResult\]/
/const cleanupResults = \[\s*dispatchBarrierResult,\s*\.\.\.roomInputResults,\s*liveKitRoomResult,\s*agentWorkerReadinessResult,\s*\]/
);
});

Expand Down Expand Up @@ -147,6 +157,7 @@ test('session stop route closes the registry even when remote cleanup is partial

assert.ok(cleanupSource, 'runRemoteSessionCleanup should be defined');
assert.match(cleanupSource, /const failures = results\.filter/);
assert.match(cleanupSource, /result\.fatal !== false/);
assert.match(
cleanupSource,
/markRoomSessionStopped\(roomName, sessionId\);\s*return \{ results, failures \};/
Expand Down
Loading