Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 2 additions & 51 deletions packages/server/test/fs-watch.e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -255,56 +255,7 @@ describe('WS fs watch (W12 / Chain 14)', () => {
conn.ws.close();
});

it('AC #2: burst > 500 changes inside 200ms window → truncated:true', async () => {
const r = await bootDaemon();
const sid = await createSession(r);
const conn = await openConn(wsUrl(r.address));
await helloAndSubscribe(conn, 'A', sid);

// Watch the whole workspace root so every new file lands in scope.
conn.ws.send(
JSON.stringify({
type: 'watch_fs_add',
id: 'w2',
payload: { session_id: sid, paths: ['.'] },
}),
);
await receiveType(conn, 'ack', 1000);

await sleep(WATCH_SETTLE_MS);

// Slam 600 files into a fresh dir; chokidar emits >500 add events well
// inside one 200ms window.
const burstDir = join(workspace, 'burst');
mkdirSync(burstDir, { recursive: true });
for (let i = 0; i < 600; i++) {
writeFileSync(join(burstDir, `f${i}.txt`), `x${i}`);
}

// Drain frames until we see truncated:true OR run out of time.
const deadline = Date.now() + 4000;
let sawTruncated = false;
while (Date.now() < deadline) {
const remaining = deadline - Date.now();
let frame: WsFrame;
try {
frame = await receive(conn, remaining);
} catch {
break;
}
if (frame.type !== 'event.fs.changed') continue;
const payload = frame.payload as { truncated?: boolean; count?: number };
if (payload.truncated === true) {
expect(payload.count).toBeGreaterThan(500);
sawTruncated = true;
break;
}
}
expect(sawTruncated).toBe(true);
conn.ws.close();
});

it('AC #3: two clients on disjoint paths receive only their own changes', async () => {
it('AC #2: two clients on disjoint paths receive only their own changes', async () => {
const r = await bootDaemon();
const sid = await createSession(r);
const a = await openConn(wsUrl(r.address));
Expand Down Expand Up @@ -368,7 +319,7 @@ describe('WS fs watch (W12 / Chain 14)', () => {
b.ws.close();
});

it('AC #4: > 100 paths on one connection → 42902 fs.watch_limit_exceeded', async () => {
it('AC #3: > 100 paths on one connection → 42902 fs.watch_limit_exceeded', async () => {
const r = await bootDaemon();
const sid = await createSession(r);
const conn = await openConn(wsUrl(r.address));
Expand Down
40 changes: 34 additions & 6 deletions packages/server/test/services.test.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@


import { EventEmitter } from 'node:events';
import { mkdtempSync, rmSync } from 'node:fs';
import { mkdtemp, rm } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';

import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';

import { InstantiationService, ServiceCollection, EventService, FsWatcherService, IApprovalService, IEventService, ILogService, IQuestionService, type ApprovalResponse, type QuestionResult, type FsWatcherServiceOptions, type IEnvironmentService, type ILogService as ILoggerT, type ISessionService } from '@moonshot-ai/agent-core';
import { InstantiationService, ServiceCollection, EventService, FsWatcherService, IApprovalService, IEventService, ILogService, IQuestionService, type ApprovalResponse, type FsWatcherServiceOptions, type IEnvironmentService, type ILogService as ILoggerT, type ISessionService, type QuestionResult, type FsChangedFrame } from '@moonshot-ai/agent-core';
import type { Event } from '@moonshot-ai/protocol';

import { ApprovalService } from '#/services/approval/approvalService';
Expand Down Expand Up @@ -105,7 +106,7 @@ function captureThrown(fn: () => void): unknown {
}
}

class FakeWatcher {
class FakeWatcher extends EventEmitter {
readonly added: string[][] = [];
readonly unwatched: string[][] = [];
readonly unwatchErrors = new Map<string, Error>();
Expand All @@ -124,10 +125,6 @@ class FakeWatcher {
return this;
}

on(): this {
return this;
}

async close(): Promise<void> {
this.closeCalls += 1;
}
Expand Down Expand Up @@ -516,6 +513,37 @@ describe('FsWatcherService', () => {
expect(watcher.closeCalls).toBe(0);
service.dispose();
});

it('truncates when more than maxChangesPerWindow changes arrive in one debounce window', async () => {
const watcher = new FakeWatcher();
const sent: FsChangedFrame[] = [];
const service = new FsWatcherService(
{ resolve: () => ({ send: (frame: FsChangedFrame) => sent.push(frame) }) },
{
watcherFactory: () => watcher as unknown as TestFsWatcher,
debounceMs: 10,
maxChangesPerWindow: 50,
},
testLogger,
{} as ISessionService,
);

service.addPaths('sid', 'conn', ['/workspace']);
service.bindSessionCwd('sid', '/workspace');

for (let i = 0; i < 60; i++) {
watcher.emit('all', 'add', `/workspace/f${i}.txt`);
}

await new Promise((r) => setTimeout(r, 30));

expect(sent.length).toBe(1);
expect(sent[0].type).toBe('event.fs.changed');
expect(sent[0].payload.truncated).toBe(true);
expect(sent[0].payload.count).toBe(60);

service.dispose();
});
});

describe('ApprovalService (broadcasts + resolve-by-approval_id)', () => {
Expand Down
15 changes: 11 additions & 4 deletions packages/server/test/start.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ describe('startServer — lock + healthz smoke', () => {
expect(existsSync(lockPath)).toBe(false);
});

it('retries on port+1 and updates the lock when the requested port is held by a third party', async () => {
it('retries to a higher port and updates the lock when the requested port is held by a third party', async () => {
// Occupy the requested port with a raw TCP server (a "third-party" process
// from the server's point of view — it does NOT hold the lock).
const { port, next } = await allocateAdjacentFreePair();
Expand All @@ -179,10 +179,17 @@ describe('startServer — lock + healthz smoke', () => {
});
running.push(r);

// Bound to the next port, and the lock advertises it so status/kill/ps work.
expect(r.address).toBe(`http://127.0.0.1:${String(next)}`);
// The server must bind to a higher port because `port` is occupied.
// The exact port can be `next` or higher: in a concurrent test run another
// listener may grab `next` before the server does. The +1 retry strategy is
// covered by unit tests for listenWithPortRetry.
expect(r.address).toMatch(/^http:\/\/127\.0\.0\.1:\d+$/);
const actualPort = Number(new URL(r.address).port);
expect(actualPort).toBeGreaterThanOrEqual(next);

// The lock advertises the real bound port so status/kill/ps work.
const stored = JSON.parse(readFileSync(thirdPartyLockPath, 'utf8')) as LockContents;
expect(stored.port).toBe(next);
expect(stored.port).toBe(actualPort);
} finally {
await closeNetServer(occupant);
}
Expand Down