Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 46 additions & 50 deletions apps/server/src/preview/PortScanner.test.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,34 @@
import * as net from "node:net";

import { it as effectIt } from "@effect/vitest";
import { assert, describe, it } from "@effect/vitest";
import { ThreadId } from "@t3tools/contracts";
import * as Net from "@t3tools/shared/Net";
import { Effect, Layer } from "effect";
import { describe, expect, it } from "vite-plus/test";

import { ProcessRunner } from "../processRunner.ts";
import { ProcessRunner, ProcessSpawnError } from "../processRunner.ts";
import * as PortScanner from "./PortScanner.ts";

const { parseLsofOutput, parsePortFromLsofName, parseWindowsListenerOutput, serversEqual } =
PortScanner.__testing;
const TestProcessRunner = Layer.succeed(ProcessRunner, {
run: () => Effect.die("ProcessRunner should not be used by Windows TCP probe tests"),
run: (input) =>
Effect.fail(
new ProcessSpawnError({
command: input.command,
args: input.args,
cwd: input.cwd,
cause: "process command unavailable in fallback test",
}),
),
});
const TestPortDiscoveryLive = PortScanner.layer.pipe(
Layer.provide(Layer.mergeAll(TestProcessRunner, Net.layer)),
Layer.provide(
Layer.mergeAll(
TestProcessRunner,
Net.layer,
Layer.succeed(PortScanner.CurrentPlatform, "win32"),
),
),
);

const openServer = (port: number): Effect.Effect<net.Server | null> =>
Expand All @@ -37,21 +50,6 @@ const closeServer = (server: net.Server): Effect.Effect<void> =>
server.close(() => resume(Effect.void));
});

const windowsPlatform = Effect.acquireRelease(
Effect.sync(() => {
const originalPlatform = process.platform;
Object.defineProperty(process, "platform", { value: "win32", configurable: true });
return originalPlatform;
}),
(originalPlatform) =>
Effect.sync(() => {
Object.defineProperty(process, "platform", {
value: originalPlatform,
configurable: true,
});
}),
);

const openCommonDevServer = Effect.fn("PortScannerTest.openCommonDevServer")(function* (
ports: ReadonlyArray<number>,
) {
Expand All @@ -71,34 +69,34 @@ const commonDevServer = Effect.acquireRelease(

describe("parsePortFromLsofName", () => {
it("parses *:port", () => {
expect(parsePortFromLsofName("*:5173")).toBe(5173);
assert.equal(parsePortFromLsofName("*:5173"), 5173);
});

it("parses 127.0.0.1:port", () => {
expect(parsePortFromLsofName("127.0.0.1:5173")).toBe(5173);
assert.equal(parsePortFromLsofName("127.0.0.1:5173"), 5173);
});

it("parses localhost:port", () => {
expect(parsePortFromLsofName("localhost:5173")).toBe(5173);
assert.equal(parsePortFromLsofName("localhost:5173"), 5173);
});

it("parses [::1]:port", () => {
expect(parsePortFromLsofName("[::1]:5173")).toBe(5173);
assert.equal(parsePortFromLsofName("[::1]:5173"), 5173);
});

it("ignores non-local hosts", () => {
expect(parsePortFromLsofName("192.168.1.10:5173")).toBeNull();
assert.equal(parsePortFromLsofName("192.168.1.10:5173"), null);
});

it("strips trailing description", () => {
expect(parsePortFromLsofName("*:5173 (LISTEN)")).toBe(5173);
assert.equal(parsePortFromLsofName("*:5173 (LISTEN)"), 5173);
});

it("rejects garbage", () => {
expect(parsePortFromLsofName("")).toBeNull();
expect(parsePortFromLsofName("not-a-port")).toBeNull();
expect(parsePortFromLsofName("*:0")).toBeNull();
expect(parsePortFromLsofName("*:99999")).toBeNull();
assert.equal(parsePortFromLsofName(""), null);
assert.equal(parsePortFromLsofName("not-a-port"), null);
assert.equal(parsePortFromLsofName("*:0"), null);
assert.equal(parsePortFromLsofName("*:99999"), null);
});
});

Expand All @@ -118,7 +116,7 @@ describe("parseLsofOutput", () => {
].join("\n");

const servers = parseLsofOutput(sample);
expect(servers).toEqual([
assert.deepStrictEqual(servers, [
{
host: "localhost",
port: 3000,
Expand Down Expand Up @@ -147,14 +145,14 @@ describe("parseLsofOutput", () => {
});

it("handles empty input", () => {
expect(parseLsofOutput("")).toEqual([]);
assert.deepStrictEqual(parseLsofOutput(""), []);
});

it("dedupes by host:port", () => {
const sample = ["p1", "cnode", "n*:5173", "n127.0.0.1:5173"].join("\n");
const servers = parseLsofOutput(sample);
expect(servers).toHaveLength(1);
expect(servers[0]?.port).toBe(5173);
assert.equal(servers.length, 1);
assert.equal(servers[0]?.port, 5173);
});

it("attributes listeners to a registered terminal process", () => {
Expand All @@ -171,8 +169,8 @@ describe("parseLsofOutput", () => {
]),
);

expect(servers[0]?.terminal).toEqual({
threadId: "thread-1",
assert.deepStrictEqual(servers[0]?.terminal, {
threadId: ThreadId.make("thread-1"),
terminalId: "terminal-1",
});
});
Expand All @@ -188,13 +186,13 @@ describe("serversEqual", () => {
terminal: null,
};
it("returns true for identical lists", () => {
expect(serversEqual([a], [{ ...a }])).toBe(true);
assert.equal(serversEqual([a], [{ ...a }]), true);
});
it("returns false for different lengths", () => {
expect(serversEqual([a], [])).toBe(false);
assert.equal(serversEqual([a], []), false);
});
it("returns false for different processName", () => {
expect(serversEqual([a], [{ ...a, processName: "other" }])).toBe(false);
assert.equal(serversEqual([a], [{ ...a, processName: "other" }]), false);
});
});

Expand All @@ -213,15 +211,15 @@ describe("parseWindowsListenerOutput", () => {
]),
);

expect(servers).toEqual([
assert.deepStrictEqual(servers, [
{
host: "localhost",
port: 5173,
url: "http://localhost:5173",
processName: "node",
pid: 12345,
terminal: {
threadId: "thread-1",
threadId: ThreadId.make("thread-1"),
terminalId: "terminal-1",
},
},
Expand All @@ -230,28 +228,26 @@ describe("parseWindowsListenerOutput", () => {
});

/**
* Integration tests against a real TCP listener. We force the Windows code
* path (TCP-probe fallback) by monkey-patching `process.platform` for the
* duration of the test so we don't depend on `lsof` being installed.
* Integration tests against a real TCP listener. The test layer forces the
* Windows code path (TCP-probe fallback) so we don't depend on `lsof` being
* installed.
*/
effectIt.layer(TestPortDiscoveryLive)("PortDiscovery integration (TCP probe fallback)", (it) => {
it.layer(TestPortDiscoveryLive)("PortDiscovery integration (TCP probe fallback)", (it) => {
it.effect(
"scan() returns a server we just opened on a curated dev port",
Effect.fn("PortScannerTest.scanFindsCommonDevServer")(function* () {
yield* windowsPlatform;
const { port } = yield* commonDevServer;
const scanner = yield* PortScanner.PortDiscovery;
const result = yield* scanner.scan();
const found = result.find((server) => server.port === port);
expect(found).toBeDefined();
expect(found?.host).toBe("localhost");
assert.notEqual(found, undefined);
assert.equal(found?.host, "localhost");
}),
);

it.effect(
"retain drives an immediate broadcast to subscribers",
Effect.fn("PortScannerTest.retainBroadcastsImmediately")(function* () {
yield* windowsPlatform;
const { port } = yield* commonDevServer;
const received: number[] = [];
const scanner = yield* PortScanner.PortDiscovery;
Expand All @@ -261,7 +257,7 @@ effectIt.layer(TestPortDiscoveryLive)("PortDiscovery integration (TCP probe fall
}),
);
yield* scanner.retain;
expect(received).toContain(port);
assert.isTrue(received.includes(port));
}),
);
});
32 changes: 20 additions & 12 deletions apps/server/src/preview/PortScanner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import { ThreadId, type DiscoveredLocalServer } from "@t3tools/contracts";
import * as Net from "@t3tools/shared/Net";
import { LSOF_LOCAL_HOST_TOKENS } from "@t3tools/shared/preview";
import { Cause, Context, Duration, Effect, Layer, Ref, Schedule, Scope } from "effect";
import { Cause, Context, Duration, Effect, Layer, Option, Ref, Schedule, Scope } from "effect";

import { ProcessRunner } from "../processRunner.ts";

Expand All @@ -39,13 +39,20 @@ export class PortDiscovery extends Context.Service<PortDiscovery, PortDiscoveryS
"t3/preview/PortScanner/PortDiscovery",
) {}

export const CurrentPlatform = Context.Reference<NodeJS.Platform>(
"t3/preview/PortScanner/CurrentPlatform",
{
defaultValue: () => process.platform,
},
);

export const COMMON_DEV_PORTS: ReadonlyArray<number> = Object.freeze([
3000, 3001, 3333, 4173, 4200, 4321, 5000, 5173, 5174, 5175, 5500, 8000, 8080, 8081, 8888, 9000,
]);

const POLL_INTERVAL = Duration.seconds(3);
const LSOF_TIMEOUT_MS = 5_000;
const WINDOWS_LISTENER_TIMEOUT_MS = 5_000;
const LSOF_TIMEOUT = Duration.seconds(5);
const WINDOWS_LISTENER_TIMEOUT = Duration.seconds(5);

type Listener = (servers: ReadonlyArray<DiscoveredLocalServer>) => Effect.Effect<void>;

Expand Down Expand Up @@ -182,6 +189,7 @@ const serversEqual = (
const make = Effect.gen(function* PortDiscoveryMake() {
const net = yield* Net.NetService;
const processRunner = yield* ProcessRunner;
const platform = yield* CurrentPlatform;
const stateRef = yield* Ref.make<ScannerState>({
lastSnapshot: [],
listeners: new Set(),
Expand Down Expand Up @@ -221,37 +229,37 @@ const make = Effect.gen(function* PortDiscoveryMake() {
terminalByProcessId.set(processId, registration.owner);
}
}
if (process.platform === "win32") {
if (platform === "win32") {
const command =
'Get-NetTCPConnection -State Listen -ErrorAction Stop | ForEach-Object { $processName = (Get-Process -Id $_.OwningProcess -ErrorAction SilentlyContinue).ProcessName; Write-Output "$($_.LocalAddress)|$($_.LocalPort)|$($_.OwningProcess)|$processName" }';
const listeners = yield* processRunner
.run({
command: "powershell.exe",
args: ["-NoProfile", "-NonInteractive", "-Command", command],
timeout: Duration.millis(WINDOWS_LISTENER_TIMEOUT_MS),
timeout: WINDOWS_LISTENER_TIMEOUT,
maxOutputBytes: 1024 * 1024,
outputMode: "truncate",
})
.pipe(
Effect.map((result) => parseWindowsListenerOutput(result.stdout, terminalByProcessId)),
Effect.catchCause(() => Effect.succeed(null)),
Effect.option,
);
if (listeners !== null) return listeners;
if (Option.isSome(listeners)) return listeners.value;
return yield* probeCommonPorts();
}
const lsofResult = yield* processRunner
.run({
command: "lsof",
args: ["-iTCP", "-sTCP:LISTEN", "-P", "-n", "-F", "pcn"],
timeout: Duration.millis(LSOF_TIMEOUT_MS),
timeout: LSOF_TIMEOUT,
maxOutputBytes: 1024 * 1024,
outputMode: "truncate",
})
.pipe(
Effect.map((result) => parseLsofOutput(result.stdout, terminalByProcessId)),
Effect.catchCause(() => Effect.succeed(null)),
Effect.option,
);
if (lsofResult !== null) return lsofResult;
if (Option.isSome(lsofResult)) return lsofResult.value;
return yield* probeCommonPorts();
});

Expand Down Expand Up @@ -349,13 +357,13 @@ const make = Effect.gen(function* PortDiscoveryMake() {
});
});

return {
return PortDiscovery.of({
scan: scanOnce,
subscribe,
retain,
registerTerminalProcesses,
unregisterTerminal,
} satisfies PortDiscoveryShape;
});
}).pipe(Effect.withSpan("PortDiscovery.make"));

export const layer = Layer.effect(PortDiscovery, make);
Expand Down
Loading