Skip to content
6 changes: 6 additions & 0 deletions .server-changes/mollifier-drainer-replay.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: feature
---

Mollifier drainer replay: replay buffered entries into `engine.trigger`, stale-entry sweep, a drainer-health gauge, and run-engine cancelled/failed run APIs.
2 changes: 2 additions & 0 deletions apps/webapp/app/entry.server.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { renderToPipeableStream } from "react-dom/server";
import { PassThrough } from "stream";
import * as Worker from "~/services/worker.server";
import { initMollifierDrainerWorker } from "~/v3/mollifierDrainerWorker.server";
import { initMollifierStaleSweepWorker } from "~/v3/mollifierStaleSweepWorker.server";
import { bootstrap } from "./bootstrap";
import { LocaleContextProvider } from "./components/primitives/LocaleProvider";
import {
Expand Down Expand Up @@ -228,6 +229,7 @@ Worker.init().catch((error) => {
});

initMollifierDrainerWorker();
initMollifierStaleSweepWorker();

bootstrap().catch((error) => {
logError(error);
Expand Down
37 changes: 30 additions & 7 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1063,13 +1063,16 @@ const EnvironmentSchema = z
// Separate switch for the drainer (consumer side) so it can be split
// off onto a dedicated worker service. Unset → inherits
// TRIGGER_MOLLIFIER_ENABLED, so single-container self-hosters don't have to
// flip two switches. In multi-replica deployments, set this to "0"
// explicitly on every replica except the one dedicated drainer
// service — otherwise every replica's polling loop races for the
// same buffer entries. `TRIGGER_MOLLIFIER_ENABLED` is still the master kill
// switch; setting this to "1" while `TRIGGER_MOLLIFIER_ENABLED` is "0" is a
// no-op because the gate-side singleton refuses to construct a
// buffer when the system is off.
// flip two switches. Multi-replica drainers are correct — `popAndMarkDraining`
// is an atomic ZPOPMIN + status flip in one Lua call, so only one replica
// can win any given entry — but inefficient: polling load (SMEMBERS +
// per-env scans) multiplies by N, and `TRIGGER_MOLLIFIER_DRAIN_CONCURRENCY`
// is per-process so engine load also multiplies. Splitting the drainer
// onto a dedicated worker keeps that traffic off the request-serving
// replicas. `TRIGGER_MOLLIFIER_ENABLED` is still the master kill switch;
// setting this to "1" while `TRIGGER_MOLLIFIER_ENABLED` is "0" is a
// no-op because the gate-side singleton refuses to construct a buffer
// when the system is off.
TRIGGER_MOLLIFIER_DRAINER_ENABLED: z.string().default(process.env.TRIGGER_MOLLIFIER_ENABLED ?? "0"),
TRIGGER_MOLLIFIER_SHADOW_MODE: z.string().default("0"),
TRIGGER_MOLLIFIER_REDIS_HOST: z
Expand Down Expand Up @@ -1098,6 +1101,26 @@ const EnvironmentSchema = z
TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS: z.coerce.number().int().positive().default(3),
TRIGGER_MOLLIFIER_DRAIN_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().positive().default(30_000),
TRIGGER_MOLLIFIER_DRAIN_MAX_ORGS_PER_TICK: z.coerce.number().int().positive().default(500),
// Periodic sweep that scans buffer queue ZSETs for entries whose
// dwell exceeds the stale threshold. Independent of the drainer —
// its job is exactly to make a stuck/offline drainer visible to
// ops. Defaults: enabled when the mollifier is enabled, run every
// 5 minutes, alert on anything that's been dwelling for 5+ minutes
// (matches the sweep interval — "anything still here when we
// check" is the simplest threshold that converges).
TRIGGER_MOLLIFIER_STALE_SWEEP_ENABLED: z
.string()
.default(process.env.TRIGGER_MOLLIFIER_ENABLED ?? "0"),
TRIGGER_MOLLIFIER_STALE_SWEEP_INTERVAL_MS: z.coerce
.number()
.int()
.positive()
.default(5 * 60_000),
TRIGGER_MOLLIFIER_STALE_SWEEP_THRESHOLD_MS: z.coerce
.number()
.int()
.positive()
.default(5 * 60_000),

BATCH_TRIGGER_PROCESS_JOB_VISIBILITY_TIMEOUT_MS: z.coerce
.number()
Expand Down
27 changes: 27 additions & 0 deletions apps/webapp/app/runEngine/services/triggerFailedTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import type { PrismaClientOrTransaction } from "@trigger.dev/database";
import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { getEventRepository } from "~/v3/eventRepository/index.server";
import { PerformTaskRunAlertsService } from "~/v3/services/alerts/performTaskRunAlerts.server";
import { DefaultQueueManager } from "../concerns/queues.server";
import type { TriggerTaskRequest } from "../types";

Expand Down Expand Up @@ -176,6 +177,14 @@ export class TriggerFailedTaskService {
event.setAttribute("runId", failedRunFriendlyId);
event.failWithError(taskRunError);

// `emitRunFailedEvent: false` because this call site owns the
// trace-event lifecycle via the outer `traceEvent({
// incomplete: false, isError: true })`. Letting the engine
// emit `runFailed` here would race the
// `completeFailedRunEvent` listener against the outer trace
// event's own completion write for the same (traceId, spanId).
// We re-trigger the alerts side directly after the trace
// event closes, below.
return await this.engine.createFailedTaskRun({
friendlyId: failedRunFriendlyId,
environment: {
Expand All @@ -200,12 +209,30 @@ export class TriggerFailedTaskService {
spanId: event.spanId,
traceContext: traceContext as Record<string, unknown>,
taskEventStore: store,
emitRunFailedEvent: false,
...(queueName !== undefined && { queue: queueName }),
...(lockedQueueId !== undefined && { lockedQueueId }),
});
}
);

// Alerts side of `runFailed` — the engine emit was suppressed
// above so the trace-event completion isn't double-written; we
// still need the alert pipeline to fire so customers' ERROR
// channels see the failure. Best-effort: a failed enqueue logs
// but doesn't block returning the friendlyId, mirroring the
// engine handler's behaviour at runEngineHandlers.server.ts:81.
try {
await PerformTaskRunAlertsService.enqueue(failedRun.id);
} catch (alertsError) {
logger.warn("TriggerFailedTaskService: alert enqueue failed", {
taskId: request.taskId,
friendlyId: failedRun.friendlyId,
error:
alertsError instanceof Error ? alertsError.message : String(alertsError),
});
}

return failedRun.friendlyId;
} catch (createError) {
const createErrorMsg =
Expand Down
48 changes: 13 additions & 35 deletions apps/webapp/app/v3/mollifier/mollifierDrainer.server.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
import { createHash } from "node:crypto";
import { MollifierDrainer, serialiseSnapshot } from "@trigger.dev/redis-worker";
import { MollifierDrainer } from "@trigger.dev/redis-worker";
import { prisma } from "~/db.server";
import { env } from "~/env.server";
import { engine as runEngine } from "~/v3/runEngine.server";
import { logger } from "~/services/logger.server";
import { singleton } from "~/utils/singleton";
import { getMollifierBuffer } from "./mollifierBuffer.server";
import type { BufferedTriggerPayload } from "./bufferedTriggerPayload.server";
import {
createDrainerHandler,
isRetryablePgError,
} from "./mollifierDrainerHandler.server";
import type { MollifierSnapshot } from "./mollifierSnapshot.server";

// Distinct error class for the deterministic "fail loud at boot" throws
// below. The bootstrap in `mollifierDrainerWorker.server.ts` catches
Expand All @@ -25,7 +30,7 @@ export class MollifierConfigurationError extends Error {
}
}

function initializeMollifierDrainer(): MollifierDrainer<BufferedTriggerPayload> {
function initializeMollifierDrainer(): MollifierDrainer<MollifierSnapshot> {
const buffer = getMollifierBuffer();
if (!buffer) {
// Unreachable in normal config: getMollifierDrainer() gates on the
Expand Down Expand Up @@ -68,40 +73,13 @@ function initializeMollifierDrainer(): MollifierDrainer<BufferedTriggerPayload>
maxAttempts: env.TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS,
});

// No-op ack handler: the trigger has ALREADY been written to Postgres
// via engine.trigger (dual-write at the call site). Popping + acking
// here proves the dequeue mechanism works end-to-end without duplicating
// the work. A later change replaces this with an engine.trigger replay
// that performs the actual Postgres write.
const drainer = new MollifierDrainer<BufferedTriggerPayload>({
const drainer = new MollifierDrainer<MollifierSnapshot>({
buffer,
handler: async (input) => {
// Hash the (re-serialised, canonical) payload on the drain side rather
// than on the trigger hot path. Burst-time CPU stays with engine.trigger;
// the drainer is the natural place for the audit-equivalence checksum.
// Re-serialisation is identity for the BufferedTriggerPayload shape
// (only strings/numbers/plain objects), so this hash matches what the
// call site wrote into Redis.
const reserialised = serialiseSnapshot(input.payload);
const payloadHash = createHash("sha256").update(reserialised).digest("hex");
logger.info("mollifier.drained", {
runId: input.runId,
envId: input.envId,
orgId: input.orgId,
taskId: input.payload.taskId,
attempts: input.attempts,
ageMs: Date.now() - input.createdAt.getTime(),
payloadBytes: reserialised.length,
payloadHash,
});
},
handler: createDrainerHandler({ engine: runEngine, prisma }),
concurrency: env.TRIGGER_MOLLIFIER_DRAIN_CONCURRENCY,
maxAttempts: env.TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS,
maxOrgsPerTick: env.TRIGGER_MOLLIFIER_DRAIN_MAX_ORGS_PER_TICK,
// A no-op handler shouldn't throw, but if something does (e.g. an
// unexpected deserialise failure), don't loop — let it FAIL terminally
// so the entry is observable in metrics.
isRetryable: () => false,
isRetryable: isRetryablePgError,
});

return drainer;
Expand All @@ -114,7 +92,7 @@ function initializeMollifierDrainer(): MollifierDrainer<BufferedTriggerPayload>
// handler registration, leaving a narrow window where a SIGTERM landing
// between `start()` and `process.once("SIGTERM", ...)` would skip the
// graceful stop. The split is intentional.
export function getMollifierDrainer(): MollifierDrainer<BufferedTriggerPayload> | null {
export function getMollifierDrainer(): MollifierDrainer<MollifierSnapshot> | null {
if (env.TRIGGER_MOLLIFIER_ENABLED !== "1") return null;
return singleton("mollifierDrainer", initializeMollifierDrainer);
}
Loading