diff --git a/packages/server/test/fs-watch.e2e.test.ts b/packages/server/test/fs-watch.e2e.test.ts index 6535443be..cabeb86e5 100644 --- a/packages/server/test/fs-watch.e2e.test.ts +++ b/packages/server/test/fs-watch.e2e.test.ts @@ -255,56 +255,7 @@ describe('WS fs watch (W12 / Chain 14)', () => { conn.ws.close(); }); - it('AC #2: burst > 500 changes inside 200ms window → truncated:true', async () => { - const r = await bootDaemon(); - const sid = await createSession(r); - const conn = await openConn(wsUrl(r.address)); - await helloAndSubscribe(conn, 'A', sid); - - // Watch the whole workspace root so every new file lands in scope. - conn.ws.send( - JSON.stringify({ - type: 'watch_fs_add', - id: 'w2', - payload: { session_id: sid, paths: ['.'] }, - }), - ); - await receiveType(conn, 'ack', 1000); - - await sleep(WATCH_SETTLE_MS); - - // Slam 600 files into a fresh dir; chokidar emits >500 add events well - // inside one 200ms window. - const burstDir = join(workspace, 'burst'); - mkdirSync(burstDir, { recursive: true }); - for (let i = 0; i < 600; i++) { - writeFileSync(join(burstDir, `f${i}.txt`), `x${i}`); - } - - // Drain frames until we see truncated:true OR run out of time. - const deadline = Date.now() + 4000; - let sawTruncated = false; - while (Date.now() < deadline) { - const remaining = deadline - Date.now(); - let frame: WsFrame; - try { - frame = await receive(conn, remaining); - } catch { - break; - } - if (frame.type !== 'event.fs.changed') continue; - const payload = frame.payload as { truncated?: boolean; count?: number }; - if (payload.truncated === true) { - expect(payload.count).toBeGreaterThan(500); - sawTruncated = true; - break; - } - } - expect(sawTruncated).toBe(true); - conn.ws.close(); - }); - - it('AC #3: two clients on disjoint paths receive only their own changes', async () => { + it('AC #2: two clients on disjoint paths receive only their own changes', async () => { const r = await bootDaemon(); const sid = await createSession(r); const a = await openConn(wsUrl(r.address)); @@ -368,7 +319,7 @@ describe('WS fs watch (W12 / Chain 14)', () => { b.ws.close(); }); - it('AC #4: > 100 paths on one connection → 42902 fs.watch_limit_exceeded', async () => { + it('AC #3: > 100 paths on one connection → 42902 fs.watch_limit_exceeded', async () => { const r = await bootDaemon(); const sid = await createSession(r); const conn = await openConn(wsUrl(r.address)); diff --git a/packages/server/test/services.test.ts b/packages/server/test/services.test.ts index 5444f7667..18607f16d 100644 --- a/packages/server/test/services.test.ts +++ b/packages/server/test/services.test.ts @@ -1,5 +1,6 @@ +import { EventEmitter } from 'node:events'; import { mkdtempSync, rmSync } from 'node:fs'; import { mkdtemp, rm } from 'node:fs/promises'; import { tmpdir } from 'node:os'; @@ -7,7 +8,7 @@ import { join } from 'node:path'; import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; -import { InstantiationService, ServiceCollection, EventService, FsWatcherService, IApprovalService, IEventService, ILogService, IQuestionService, type ApprovalResponse, type QuestionResult, type FsWatcherServiceOptions, type IEnvironmentService, type ILogService as ILoggerT, type ISessionService } from '@moonshot-ai/agent-core'; +import { InstantiationService, ServiceCollection, EventService, FsWatcherService, IApprovalService, IEventService, ILogService, IQuestionService, type ApprovalResponse, type FsWatcherServiceOptions, type IEnvironmentService, type ILogService as ILoggerT, type ISessionService, type QuestionResult, type FsChangedFrame } from '@moonshot-ai/agent-core'; import type { Event } from '@moonshot-ai/protocol'; import { ApprovalService } from '#/services/approval/approvalService'; @@ -105,7 +106,7 @@ function captureThrown(fn: () => void): unknown { } } -class FakeWatcher { +class FakeWatcher extends EventEmitter { readonly added: string[][] = []; readonly unwatched: string[][] = []; readonly unwatchErrors = new Map(); @@ -124,10 +125,6 @@ class FakeWatcher { return this; } - on(): this { - return this; - } - async close(): Promise { this.closeCalls += 1; } @@ -516,6 +513,37 @@ describe('FsWatcherService', () => { expect(watcher.closeCalls).toBe(0); service.dispose(); }); + + it('truncates when more than maxChangesPerWindow changes arrive in one debounce window', async () => { + const watcher = new FakeWatcher(); + const sent: FsChangedFrame[] = []; + const service = new FsWatcherService( + { resolve: () => ({ send: (frame: FsChangedFrame) => sent.push(frame) }) }, + { + watcherFactory: () => watcher as unknown as TestFsWatcher, + debounceMs: 10, + maxChangesPerWindow: 50, + }, + testLogger, + {} as ISessionService, + ); + + service.addPaths('sid', 'conn', ['/workspace']); + service.bindSessionCwd('sid', '/workspace'); + + for (let i = 0; i < 60; i++) { + watcher.emit('all', 'add', `/workspace/f${i}.txt`); + } + + await new Promise((r) => setTimeout(r, 30)); + + expect(sent.length).toBe(1); + expect(sent[0].type).toBe('event.fs.changed'); + expect(sent[0].payload.truncated).toBe(true); + expect(sent[0].payload.count).toBe(60); + + service.dispose(); + }); }); describe('ApprovalService (broadcasts + resolve-by-approval_id)', () => { diff --git a/packages/server/test/start.test.ts b/packages/server/test/start.test.ts index cd25d5750..15f6f8cb1 100644 --- a/packages/server/test/start.test.ts +++ b/packages/server/test/start.test.ts @@ -161,7 +161,7 @@ describe('startServer — lock + healthz smoke', () => { expect(existsSync(lockPath)).toBe(false); }); - it('retries on port+1 and updates the lock when the requested port is held by a third party', async () => { + it('retries to a higher port and updates the lock when the requested port is held by a third party', async () => { // Occupy the requested port with a raw TCP server (a "third-party" process // from the server's point of view — it does NOT hold the lock). const { port, next } = await allocateAdjacentFreePair(); @@ -179,10 +179,17 @@ describe('startServer — lock + healthz smoke', () => { }); running.push(r); - // Bound to the next port, and the lock advertises it so status/kill/ps work. - expect(r.address).toBe(`http://127.0.0.1:${String(next)}`); + // The server must bind to a higher port because `port` is occupied. + // The exact port can be `next` or higher: in a concurrent test run another + // listener may grab `next` before the server does. The +1 retry strategy is + // covered by unit tests for listenWithPortRetry. + expect(r.address).toMatch(/^http:\/\/127\.0\.0\.1:\d+$/); + const actualPort = Number(new URL(r.address).port); + expect(actualPort).toBeGreaterThanOrEqual(next); + + // The lock advertises the real bound port so status/kill/ps work. const stored = JSON.parse(readFileSync(thirdPartyLockPath, 'utf8')) as LockContents; - expect(stored.port).toBe(next); + expect(stored.port).toBe(actualPort); } finally { await closeNetServer(occupant); }