Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions src/harperLifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -741,15 +741,21 @@ export async function teardownHarper(ctx: StartedHarperTestContext): Promise<voi
// address is recycled regardless.
if (ctx.harper.hostname) {
const portsFreed = await waitForPortsFree(ctx.harper.hostname, ALL_HARPER_PORTS, DEFAULT_PORT_RELEASE_TIMEOUT_MS);
if (!portsFreed) {
if (portsFreed) {
await releaseLoopbackAddress(ctx.harper.hostname);
} else {
// A Harper child escaped the tree kill and still holds this address's ports. Do NOT
// recycle the address — under SO_REUSEPORT a later suite could silently co-bind it
// (the exact failure the loopback pool exists to prevent). Leave the slot parked under
// this process's PID; it is reclaimed when this (per-file) process exits and its PID
// goes dead. The conflict canary in getNextAvailableLoopbackAddress is the backstop if
// the address is somehow handed out before then.
console.warn(
`Harper ports on ${ctx.harper.hostname} still in use after teardown (${DEFAULT_PORT_RELEASE_TIMEOUT_MS}ms); recycling the address anyway. This usually means a Harper child process outlived the kill.`
`Harper ports on ${ctx.harper.hostname} still in use after teardown (${DEFAULT_PORT_RELEASE_TIMEOUT_MS}ms); NOT recycling the address (a Harper child outlived the kill). The slot will be reclaimed when this process exits.`
);
}
}

await releaseLoopbackAddress(ctx.harper.hostname);

// a few retries are typically necessary, might take a sec for a process to finish, especially since rocksdb may be flushing
try {
await rm(ctx.harper.dataRootDir, { recursive: true, force: true, maxRetries: 10 });
Expand Down
120 changes: 99 additions & 21 deletions src/loopbackAddressPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,23 @@ const HARPER_LOOPBACK_POOL_LOCK_PATH = join(tmpdir(), 'harper-integration-test-l
const LOCK_STALE_TIMEOUT_MS = 10000;
const RETRY_DELAY_MS = 1000;

// Port used as a conflict canary when allocating an address. This MUST be a port that
// Harper binds WITHOUT SO_REUSEPORT (the operations API — `OPERATIONS_API_PORT` in
// harperLifecycle.ts — does exactly that, main-thread-only). Because that port is
// exclusive, a stale/overlapping Harper node still holding the address makes a plain
// (non-reusePort) bind here fail with EADDRINUSE — which is precisely the signal we
// need. Without this check, SO_REUSEPORT on the shared HTTP/replication ports would let
// a freshly-assigned node silently co-bind an address another node still owns, corrupting
// both suites. Override via HARPER_INTEGRATION_TEST_CONFLICT_PROBE_PORT if the operations
// port ever changes. (Hardcoded rather than imported from harperLifecycle to avoid a
// circular import between these two modules.)
const CONFLICT_PROBE_PORT = (() => {
const parsed = parseInt(process.env.HARPER_INTEGRATION_TEST_CONFLICT_PROBE_PORT || '', 10);
// Fall back to the default if unset/non-numeric/out-of-range, so an invalid override
// can't turn into a `server.listen(NaN)` crash during allocation.
return Number.isNaN(parsed) || parsed < 0 || parsed > 65535 ? 9925 : parsed;
})();

// Type definitions
type LoopbackPool = (number | null)[];

Expand Down Expand Up @@ -158,22 +175,6 @@ async function writePoolFile(pool: LoopbackPool): Promise<void> {
await writeFile(HARPER_LOOPBACK_POOL_PATH, JSON.stringify(pool));
}

/**
* Finds the first available (null) index in the pool. This implements a simple
* first-available allocation strategy.
*
* @param pool The loopback pool array
* @returns The first available index, or null if the pool is full
*/
function findAvailableIndex(pool: LoopbackPool): number | null {
for (let i = 0; i < pool.length; i++) {
if (pool[i] === null) {
return i;
}
}
return null;
}

/**
* Validates the format of a loopback address and extracts the pool index.
*
Expand Down Expand Up @@ -227,6 +228,42 @@ function validateLoopbackAddress(loopbackAddress: string): Promise<string> {
});
}

/**
* Conflict canary: detects whether a Harper node is already bound to `loopbackAddress`
* by attempting an exclusive (non-SO_REUSEPORT) bind on the operations-API port. Node's
* `net` server does not set SO_REUSEPORT, so if a stale/overlapping Harper node still
* owns the address's operations port, this bind fails with EADDRINUSE.
*
* Returns `true` if the address appears in use (EADDRINUSE), `false` if it is free.
* Re-throws any other bind error (e.g. EADDRNOTAVAIL) — that's a real configuration
* problem the caller should surface, not a transient conflict to skip.
*
* @param loopbackAddress The loopback IP address to probe (e.g., "127.0.0.2")
*/
function isLoopbackAddressInUse(loopbackAddress: string): Promise<boolean> {
return new Promise((resolve, reject) => {
const server = createServer();
const onError = (error: NodeJS.ErrnoException) => {
if (error.code === 'EADDRINUSE') {
resolve(true);
return;
}
const enhancedError = error as LoopbackAddressError;
enhancedError.loopbackAddress = loopbackAddress;
reject(enhancedError);
};
server.once('error', onError);
server.listen(CONFLICT_PROBE_PORT, loopbackAddress, () => {
// Bind succeeded — the address is free. Drop the error listener so a stray
// post-bind socket error during close() can't flip the resolved verdict.
server.off('error', onError);
server.close(() => {
resolve(false);
});
});
});
}

/**
* This method attempts to validate all loopback addresses in the pool by trying to
* bind to each one. It returns an object containing arrays of successfully bound
Expand Down Expand Up @@ -296,16 +333,31 @@ export async function getNextAvailableLoopbackAddress(): Promise<string> {
// This continues until all loopback addresses are used, at which point new processes will wait until an address becomes available

// Since multiple processes may be trying to get a loopback address at the same time, we need to implement a simple file-based locking mechanism to prevent race conditions

// Indices found in-use by the conflict canary during THIS call. Tracked locally (not by
// parking them in the shared pool) so we never deadlock ourselves: if we parked every
// poisoned slot under our own live PID, the pool could fill with our PID, the free-slot search
// would find nothing, removeDeadProcessesFromPool couldn't reclaim them (we're alive), and we'd
// spin forever. Instead we release poisoned slots back to the pool and just avoid re-trying
// them within this attempt; the set is cleared before each wait so they can be re-probed once
// the lingering node has had time to exit.
const triedIndices = new Set<number>();
while (true) {
const assignedIndex = await withLock(async () => {
// Read the pool file
const loopbackPool = await readPoolFile();

// Find the first available index
const index = findAvailableIndex(loopbackPool);
// Find the first available index we haven't already found in-use this attempt
let index: number | null = null;
for (let i = 0; i < loopbackPool.length; i++) {
if (loopbackPool[i] === null && !triedIndices.has(i)) {
index = i;
break;
}
}

if (index === null) {
// No available addresses - remove any dead processes from the pool and wait for one to become available
// Nothing available - remove any dead processes from the pool and wait for one to become available
removeDeadProcessesFromPool(loopbackPool);
} else {
// Assign the process PID to that index to mark it as used
Expand All @@ -322,14 +374,40 @@ export async function getNextAvailableLoopbackAddress(): Promise<string> {
const loopbackAddress = `127.0.0.${assignedIndex + HARPER_LOOPBACK_POOL_START}`;
try {
await validateLoopbackAddress(loopbackAddress);
return loopbackAddress;
} catch (error) {
// Validation failed - throw a proper error instead of breaking
throw new LoopbackAddressValidationError(loopbackAddress, error as Error);
}

// Conflict canary: a stale or still-running Harper node from an overlapping
// suite may already hold this address (e.g. the pool slot was freed while its
// node lingered). SO_REUSEPORT on Harper's shared HTTP/replication ports would
// otherwise let our node silently co-bind it, splitting connections between two
// nodes and corrupting both suites. Detect that here via the exclusive
// operations port and skip the poisoned address.
let inUse: boolean;
try {
inUse = await isLoopbackAddressInUse(loopbackAddress);
} catch (error) {
throw new LoopbackAddressValidationError(loopbackAddress, error as Error);
}
if (!inUse) {
return loopbackAddress;
}

// Release the slot back to the pool (so it isn't leaked under our PID) and remember
// not to re-try it this attempt; loop to claim the next available one.
await releaseLoopbackAddress(loopbackAddress);
triedIndices.add(assignedIndex);
console.warn(
`[loopback-pool] ${loopbackAddress} is still in use by another Harper node (operations port ${CONFLICT_PROBE_PORT} bound); skipping to avoid an SO_REUSEPORT co-bind.`
);
continue;
}

// No available addresses; wait and retry
// No available addresses; clear the per-attempt skip set so poisoned addresses can be
// re-probed (their lingering node may have exited during the wait), then wait and retry.
triedIndices.clear();
await sleep(RETRY_DELAY_MS);
}
}
Expand Down