Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions apps/web/src/rpc/wsConnectionState.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
import { useAtomValue } from "@effect/atom-react";
import { DEFAULT_RECONNECT_BACKOFF, getReconnectDelayMs } from "@t3tools/client-runtime";
import * as Duration from "effect/Duration";
import * as Option from "effect/Option";
import { Atom } from "effect/unstable/reactivity";

import { appAtomRegistry } from "./atomRegistry";

export type WsConnectionUiState = "connected" | "connecting" | "error" | "offline" | "reconnecting";
export type WsReconnectPhase = "attempting" | "exhausted" | "idle" | "waiting";

export const WS_RECONNECT_INITIAL_DELAY_MS = DEFAULT_RECONNECT_BACKOFF.initialDelayMs;
export const WS_RECONNECT_INITIAL_DELAY_MS = Duration.toMillis(
Duration.fromInputUnsafe(DEFAULT_RECONNECT_BACKOFF.initialDelay),
);
export const WS_RECONNECT_BACKOFF_FACTOR = DEFAULT_RECONNECT_BACKOFF.backoffFactor;
export const WS_RECONNECT_MAX_DELAY_MS = DEFAULT_RECONNECT_BACKOFF.maxDelayMs;
export const WS_RECONNECT_MAX_RETRIES = DEFAULT_RECONNECT_BACKOFF.maxRetries!;
export const WS_RECONNECT_MAX_DELAY_MS = Duration.toMillis(
Duration.fromInputUnsafe(DEFAULT_RECONNECT_BACKOFF.maxDelay),
);
export const WS_RECONNECT_MAX_RETRIES = Option.getOrThrow(DEFAULT_RECONNECT_BACKOFF.maxRetries);
export const WS_RECONNECT_MAX_ATTEMPTS = WS_RECONNECT_MAX_RETRIES + 1;

export interface WsConnectionStatus {
Expand Down
88 changes: 56 additions & 32 deletions packages/client-runtime/src/reconnectBackoff.test.ts
Original file line number Diff line number Diff line change
@@ -1,65 +1,89 @@
import { describe, expect, it } from "vite-plus/test";
import { assert, describe, it } from "@effect/vitest";
import * as Duration from "effect/Duration";
import * as Option from "effect/Option";

import {
DEFAULT_RECONNECT_BACKOFF,
getReconnectDelay,
getReconnectDelayMs,
type ReconnectBackoffConfig,
} from "./reconnectBackoff.ts";

describe("getReconnectDelayMs", () => {
function assertDelayMs(delay: Option.Option<Duration.Duration>, expectedMs: number) {
if (Option.isNone(delay)) {
assert.fail("Expected reconnect delay to be present");
}
assert.strictEqual(Duration.toMillis(delay.value), expectedMs);
}

describe("getReconnectDelay", () => {
it("returns exponential delays with default config", () => {
expect(getReconnectDelayMs(0)).toBe(1_000);
expect(getReconnectDelayMs(1)).toBe(2_000);
expect(getReconnectDelayMs(2)).toBe(4_000);
expect(getReconnectDelayMs(3)).toBe(8_000);
expect(getReconnectDelayMs(4)).toBe(16_000);
expect(getReconnectDelayMs(5)).toBe(32_000);
expect(getReconnectDelayMs(6)).toBe(64_000);
assertDelayMs(getReconnectDelay(0), 1_000);
assertDelayMs(getReconnectDelay(1), 2_000);
assertDelayMs(getReconnectDelay(2), 4_000);
assertDelayMs(getReconnectDelay(3), 8_000);
assertDelayMs(getReconnectDelay(4), 16_000);
assertDelayMs(getReconnectDelay(5), 32_000);
assertDelayMs(getReconnectDelay(6), 64_000);
});

it("returns null when retry index exceeds maxRetries", () => {
expect(getReconnectDelayMs(7)).toBeNull();
expect(getReconnectDelayMs(100)).toBeNull();
it("returns none when retry index exceeds maxRetries", () => {
assert.strictEqual(Option.isNone(getReconnectDelay(7)), true);
assert.strictEqual(Option.isNone(getReconnectDelay(100)), true);
});

it("returns null for negative indices", () => {
expect(getReconnectDelayMs(-1)).toBeNull();
it("returns none for negative indices", () => {
assert.strictEqual(Option.isNone(getReconnectDelay(-1)), true);
});

it("returns null for non-integer indices", () => {
expect(getReconnectDelayMs(1.5)).toBeNull();
it("returns none for non-integer indices", () => {
assert.strictEqual(Option.isNone(getReconnectDelay(1.5)), true);
});

it("caps delay at maxDelayMs", () => {
it("caps delay at maxDelay", () => {
const config: ReconnectBackoffConfig = {
initialDelayMs: 10_000,
initialDelay: Duration.seconds(10),
backoffFactor: 10,
maxDelayMs: 30_000,
maxRetries: 5,
maxDelay: Duration.seconds(30),
maxRetries: Option.some(5),
};

expect(getReconnectDelayMs(0, config)).toBe(10_000);
expect(getReconnectDelayMs(1, config)).toBe(30_000); // 100_000 capped to 30_000
expect(getReconnectDelayMs(2, config)).toBe(30_000); // 1_000_000 capped to 30_000
assertDelayMs(getReconnectDelay(0, config), 10_000);
assertDelayMs(getReconnectDelay(1, config), 30_000);
assertDelayMs(getReconnectDelay(2, config), 30_000);
});

it("supports unlimited retries when maxRetries is null", () => {
it("supports unlimited retries when maxRetries is none", () => {
const config: ReconnectBackoffConfig = {
...DEFAULT_RECONNECT_BACKOFF,
maxRetries: null,
maxRetries: Option.none(),
};

expect(getReconnectDelayMs(0, config)).toBe(1_000);
expect(getReconnectDelayMs(50, config)).toBe(64_000); // capped at maxDelayMs
expect(getReconnectDelayMs(100, config)).toBe(64_000);
assertDelayMs(getReconnectDelay(0, config), 1_000);
assertDelayMs(getReconnectDelay(50, config), 64_000);
assertDelayMs(getReconnectDelay(100, config), 64_000);
});
});

describe("getReconnectDelayMs", () => {
it("returns millisecond values for compatibility", () => {
assert.strictEqual(getReconnectDelayMs(0), 1_000);
assert.strictEqual(getReconnectDelayMs(1), 2_000);
assert.strictEqual(getReconnectDelayMs(7), null);
});
});

describe("DEFAULT_RECONNECT_BACKOFF", () => {
it("has sensible defaults", () => {
expect(DEFAULT_RECONNECT_BACKOFF.initialDelayMs).toBe(1_000);
expect(DEFAULT_RECONNECT_BACKOFF.backoffFactor).toBe(2);
expect(DEFAULT_RECONNECT_BACKOFF.maxDelayMs).toBe(64_000);
expect(DEFAULT_RECONNECT_BACKOFF.maxRetries).toBe(7);
assert.strictEqual(
Duration.toMillis(Duration.fromInputUnsafe(DEFAULT_RECONNECT_BACKOFF.initialDelay)),
1_000,
);
assert.strictEqual(DEFAULT_RECONNECT_BACKOFF.backoffFactor, 2);
assert.strictEqual(
Duration.toMillis(Duration.fromInputUnsafe(DEFAULT_RECONNECT_BACKOFF.maxDelay)),
64_000,
);
assert.deepStrictEqual(DEFAULT_RECONNECT_BACKOFF.maxRetries, Option.some(7));
});
});
58 changes: 40 additions & 18 deletions packages/client-runtime/src/reconnectBackoff.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
import * as Duration from "effect/Duration";
import * as Option from "effect/Option";

/**
* Configuration for exponential reconnect backoff.
*/
export interface ReconnectBackoffConfig {
/** Base delay in milliseconds before the first retry. */
readonly initialDelayMs: number;
/** Base delay before the first retry. */
readonly initialDelay: Duration.Input;
/** Multiplier applied per retry (exponential factor). */
readonly backoffFactor: number;
/** Hard upper bound on delay in milliseconds. */
readonly maxDelayMs: number;
/** Maximum number of retries (0-based). `null` means unlimited. */
readonly maxRetries: number | null;
/** Hard upper bound on delay. */
readonly maxDelay: Duration.Input;
/** Maximum number of retries (0-based). `Option.none()` means unlimited. */
readonly maxRetries: Option.Option<number>;
}

/**
Expand All @@ -18,30 +21,49 @@ export interface ReconnectBackoffConfig {
* - 1 s initial delay, doubling each retry, capped at 64 s, up to 7 retries.
*/
export const DEFAULT_RECONNECT_BACKOFF: ReconnectBackoffConfig = {
initialDelayMs: 1_000,
initialDelay: Duration.seconds(1),
backoffFactor: 2,
maxDelayMs: 64_000,
maxRetries: 7,
maxDelay: Duration.seconds(64),
maxRetries: Option.some(7),
};

/**
* Calculate the reconnect delay for a given retry index using exponential
* backoff. Returns `null` when `retryIndex` exceeds the configured maximum.
* backoff. Returns `Option.none()` when `retryIndex` exceeds the configured
* maximum.
*/
export function getReconnectDelayMs(
export function getReconnectDelay(
retryIndex: number,
config: ReconnectBackoffConfig = DEFAULT_RECONNECT_BACKOFF,
): number | null {
): Option.Option<Duration.Duration> {
if (!Number.isInteger(retryIndex) || retryIndex < 0) {
return null;
return Option.none();
}

if (config.maxRetries !== null && retryIndex >= config.maxRetries) {
return null;
if (Option.isSome(config.maxRetries) && retryIndex >= config.maxRetries.value) {
return Option.none();
}

return Math.min(
Math.round(config.initialDelayMs * config.backoffFactor ** retryIndex),
config.maxDelayMs,
const initialDelayMs = Duration.toMillis(Duration.fromInputUnsafe(config.initialDelay));
const maxDelayMs = Duration.toMillis(Duration.fromInputUnsafe(config.maxDelay));

return Option.some(
Duration.millis(
Math.min(Math.round(initialDelayMs * config.backoffFactor ** retryIndex), maxDelayMs),
),
);
}

/**
* Compatibility wrapper for UI surfaces that still display reconnect delays in
* milliseconds.
*/
export function getReconnectDelayMs(
retryIndex: number,
config: ReconnectBackoffConfig = DEFAULT_RECONNECT_BACKOFF,
): number | null {
return Option.match(getReconnectDelay(retryIndex, config), {
onNone: () => null,
onSome: Duration.toMillis,
});
}
30 changes: 19 additions & 11 deletions packages/client-runtime/src/wsRpcProtocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ import { WsRpcGroup } from "@t3tools/contracts";
import * as Duration from "effect/Duration";
import * as Effect from "effect/Effect";
import * as Layer from "effect/Layer";
import * as Option from "effect/Option";
import * as Schedule from "effect/Schedule";
import * as Schema from "effect/Schema";
import { RpcClient, RpcSerialization } from "effect/unstable/rpc";
import * as Socket from "effect/unstable/socket/Socket";

import {
DEFAULT_RECONNECT_BACKOFF,
getReconnectDelayMs,
getReconnectDelay,
type ReconnectBackoffConfig,
} from "./reconnectBackoff.ts";

Expand Down Expand Up @@ -69,6 +71,10 @@ export type WsRpcProtocolClient =
RpcClientFactory extends Effect.Effect<infer Client, any, any> ? Client : never;
export type WsRpcProtocolSocketUrlProvider = string | (() => Promise<string>);

const decodeRpcPongMessage = Schema.decodeUnknownOption(
Schema.fromJsonString(Schema.TaggedStruct("Pong", {})),
);

function formatSocketErrorMessage(error: unknown): string {
if (error instanceof Error && error.message.trim().length > 0) {
return error.message;
Expand Down Expand Up @@ -234,13 +240,8 @@ export function createWsRpcProtocolLayer(
{ once: true },
);
socket.addEventListener("message", (event) => {
try {
const message = JSON.parse(String(event.data)) as { readonly _tag?: string };
if (message._tag === "Pong") {
lifecycle.onHeartbeatPong();
}
} catch {
// Ignore malformed messages here; the Effect RPC parser still owns protocol errors.
if (Option.isSome(decodeRpcPongMessage(String(event.data)))) {
lifecycle.onHeartbeatPong();
}
});
socket.addEventListener(
Expand All @@ -266,10 +267,17 @@ export function createWsRpcProtocolLayer(
Layer.provide(trackingWebSocketConstructorLayer),
);

const baseSchedule =
backoff.maxRetries === null ? Schedule.forever : Schedule.recurs(backoff.maxRetries);
const baseSchedule = Option.match(backoff.maxRetries, {
onNone: () => Schedule.forever,
onSome: Schedule.recurs,
});
const retryPolicy = Schedule.addDelay(baseSchedule, (retryCount) =>
Effect.succeed(Duration.millis(getReconnectDelayMs(retryCount, backoff) ?? 0)),
Effect.succeed(
Option.match(getReconnectDelay(retryCount, backoff), {
onNone: () => Duration.zero,
onSome: (delay) => delay,
}),
),
);
const protocolLayer = Layer.effect(
RpcClient.Protocol,
Expand Down
Loading