From a9fd37ec8524a597e85f0317ee2292e8bf89703a Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Tue, 26 May 2026 11:26:08 +0100 Subject: [PATCH 01/12] =?UTF-8?q?feat(webapp):=20mollifier=20API=20GET=20r?= =?UTF-8?q?ead-fallback=20=E2=80=94=20synthetic=20primitives=20+=20route?= =?UTF-8?q?=20wiring?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Synthesise QUEUED/FAILED responses from the mollifier buffer when a TaskRun row hasn't landed in Postgres yet. Wires the synthesis into: - ApiRetrieveRunPresenter - v1 trace GET route - v1 spans GET route - attempts route gains a GET loader (fixes pre-existing Remix 400) Stacked on the trigger-time decisions PR. The readFallback infra itself lives on the trigger PR (consumed by IdempotencyKeyConcern); this PR adds the route-level synthetic-rendering primitives. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../v3/ApiRetrieveRunPresenter.server.ts | 120 ++++++++++++- .../api.v1.runs.$runId.spans.$spanId.ts | 85 +++++++-- .../app/routes/api.v1.runs.$runId.trace.ts | 93 ++++++++-- .../routes/api.v1.runs.$runParam.attempts.ts | 26 ++- .../mollifier/syntheticRedirectInfo.server.ts | 92 ++++++++++ .../v3/mollifier/syntheticSpanRun.server.ts | 154 +++++++++++++++++ .../app/v3/mollifier/syntheticTrace.server.ts | 66 +++++++ .../mollifierSyntheticRedirectInfo.test.ts | 162 ++++++++++++++++++ .../test/mollifierSyntheticSpanRun.test.ts | 158 +++++++++++++++++ 9 files changed, 926 insertions(+), 30 deletions(-) create mode 100644 apps/webapp/app/v3/mollifier/syntheticRedirectInfo.server.ts create mode 100644 apps/webapp/app/v3/mollifier/syntheticSpanRun.server.ts create mode 100644 apps/webapp/app/v3/mollifier/syntheticTrace.server.ts create mode 100644 apps/webapp/test/mollifierSyntheticRedirectInfo.test.ts create mode 100644 apps/webapp/test/mollifierSyntheticSpanRun.test.ts diff --git a/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts b/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts index a392866afc9..782104776d4 100644 --- a/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts @@ -15,6 +15,10 @@ import assertNever from "assert-never"; import { API_VERSIONS, CURRENT_API_VERSION, RunStatusUnspecifiedApiVersion } from "~/api/versions"; import { $replica, prisma } from "~/db.server"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; +import { + findRunByIdWithMollifierFallback, + type SyntheticRun, +} from "~/v3/mollifier/readFallback.server"; import { generatePresignedUrl } from "~/v3/objectStore.server"; import { tracer } from "~/v3/tracer.server"; import { startSpanWithEnv } from "~/v3/tracing.server"; @@ -64,13 +68,34 @@ type CommonRelatedRun = Prisma.Result< "findFirstOrThrow" >; -type FoundRun = NonNullable>>; +// Full shape returned by findRun() — the commonRunSelect fields plus the +// extras the route handler reads. Declared explicitly (not inferred via +// ReturnType) so findRun can return a synthesised buffered +// run without the type becoming self-referential. +type FoundRun = CommonRelatedRun & { + traceId: string; + payload: string; + payloadType: string; + output: string | null; + outputType: string; + error: Prisma.JsonValue; + attempts: { id: string }[]; + attemptNumber: number | null; + engine: "V1" | "V2"; + taskEventStore: string; + parentTaskRun: CommonRelatedRun | null; + rootTaskRun: CommonRelatedRun | null; + childRuns: CommonRelatedRun[]; +}; export class ApiRetrieveRunPresenter { constructor(private readonly apiVersion: API_VERSIONS) {} - public static async findRun(friendlyId: string, env: AuthenticatedEnvironment) { - return $replica.taskRun.findFirst({ + public static async findRun( + friendlyId: string, + env: AuthenticatedEnvironment, + ): Promise { + const pgRow = await $replica.taskRun.findFirst({ where: { friendlyId, runtimeEnvironmentId: env.id, @@ -102,6 +127,23 @@ export class ApiRetrieveRunPresenter { }, }, }); + + if (pgRow) return pgRow; + + // Postgres miss → fall back to the mollifier buffer. When the gate + // diverted a trigger, the run lives in Redis until the drainer replays + // it through engine.trigger. Synthesise the FoundRun shape so call() + // returns a `QUEUED` (or `FAILED`) response with empty output, no + // attempts, no relations. + const buffered = await findRunByIdWithMollifierFallback({ + runId: friendlyId, + environmentId: env.id, + organizationId: env.organizationId, + }); + + if (!buffered) return null; + + return synthesiseFoundRunFromBuffer(buffered); } public async call(taskRun: FoundRun, env: AuthenticatedEnvironment) { @@ -475,3 +517,75 @@ function resolveTriggerFunction(run: CommonRelatedRun): TriggerFunction { return run.resumeParentOnCompletion ? "triggerAndWait" : "trigger"; } } + +// Build a FoundRun-shaped object from a buffered (mollified) run. The run +// is in the Redis buffer; engine.trigger hasn't created the Postgres row +// yet, so every field that comes from execution state (output, attempts, +// completedAt, cost, relations) takes a default. The presenter's call() +// handles QUEUED-state runs without surprise. +function bufferedStatusToTaskRunStatus(status: SyntheticRun["status"]): TaskRunStatus { + switch (status) { + case "FAILED": + return "SYSTEM_FAILURE"; + case "CANCELED": + return "CANCELED"; + default: + return "PENDING"; + } +} + +function synthesiseFoundRunFromBuffer(buffered: SyntheticRun): FoundRun { + const status: TaskRunStatus = bufferedStatusToTaskRunStatus(buffered.status); + + const errorJson: Prisma.JsonValue = buffered.error + ? { + type: "STRING_ERROR", + raw: `${buffered.error.code}: ${buffered.error.message}`, + } + : null; + + const metadata: Prisma.JsonValue = + typeof buffered.metadata === "string" ? buffered.metadata : null; + + return { + id: buffered.friendlyId, + friendlyId: buffered.friendlyId, + status, + taskIdentifier: buffered.taskIdentifier ?? "", + createdAt: buffered.createdAt, + startedAt: null, + updatedAt: buffered.cancelledAt ?? buffered.createdAt, + completedAt: buffered.cancelledAt ?? null, + expiredAt: null, + delayUntil: buffered.delayUntil ?? null, + metadata, + metadataType: buffered.metadataType ?? "application/json", + ttl: buffered.ttl ?? null, + costInCents: 0, + baseCostInCents: 0, + usageDurationMs: 0, + idempotencyKey: buffered.idempotencyKey ?? null, + idempotencyKeyOptions: buffered.idempotencyKeyOptions ?? null, + isTest: buffered.isTest, + depth: buffered.depth, + scheduleId: null, + lockedToVersion: buffered.lockedToVersion ? { version: buffered.lockedToVersion } : null, + resumeParentOnCompletion: buffered.resumeParentOnCompletion, + batch: null, + runTags: buffered.tags, + traceId: buffered.traceId ?? "", + payload: typeof buffered.payload === "string" ? buffered.payload : "", + payloadType: buffered.payloadType ?? "application/json", + output: null, + outputType: "application/json", + error: errorJson, + attempts: [], + attemptNumber: null, + engine: "V2", + taskEventStore: "taskEvent", + workerQueue: buffered.workerQueue ?? "main", + parentTaskRun: null, + rootTaskRun: null, + childRuns: [], + }; +} diff --git a/apps/webapp/app/routes/api.v1.runs.$runId.spans.$spanId.ts b/apps/webapp/app/routes/api.v1.runs.$runId.spans.$spanId.ts index be0d12087b6..cc48faf5d85 100644 --- a/apps/webapp/app/routes/api.v1.runs.$runId.spans.$spanId.ts +++ b/apps/webapp/app/routes/api.v1.runs.$runId.spans.$spanId.ts @@ -9,42 +9,101 @@ import { } from "~/services/routeBuilders/apiBuilder.server"; import { getEventRepositoryForStore } from "~/v3/eventRepository/index.server"; import { getTaskEventStoreTableForRun } from "~/v3/taskEventStore.server"; +import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server"; const ParamsSchema = z.object({ runId: z.string(), spanId: z.string(), }); +// Phase A2 — discriminated union for PG vs buffered runs. Buffered runs +// only have one valid spanId (the queued span recorded at gate time and +// reused as the run's root spanId when the drainer materialises). Any +// other spanId returns a deterministic 404; the queued span returns a +// minimal synthesised shape so the customer's SDK sees the same 200 +// contract they'd get for a freshly-triggered run. +type ResolvedRun = + | { source: "pg"; run: Awaited> & {} } + | { source: "buffer"; run: NonNullable>> }; + +async function findPgRun(runId: string, environmentId: string) { + return $replica.taskRun.findFirst({ + where: { friendlyId: runId, runtimeEnvironmentId: environmentId }, + }); +} + export const loader = createLoaderApiRoute( { params: ParamsSchema, allowJWT: true, corsStrategy: "all", - findResource: (params, auth) => { - return $replica.taskRun.findFirst({ - where: { - friendlyId: params.runId, - runtimeEnvironmentId: auth.environment.id, - }, + findResource: async (params, auth): Promise => { + const pgRun = await findPgRun(params.runId, auth.environment.id); + if (pgRun) return { source: "pg", run: pgRun }; + + const buffered = await findRunByIdWithMollifierFallback({ + runId: params.runId, + environmentId: auth.environment.id, + organizationId: auth.environment.organizationId, }); + if (buffered) return { source: "buffer", run: buffered }; + + return null; }, shouldRetryNotFound: true, authorization: { action: "read", - resource: (run) => { + resource: (resolved) => { + if (resolved.source === "pg") { + const run = resolved.run; + const resources = [ + { type: "runs", id: run.friendlyId }, + { type: "tasks", id: run.taskIdentifier }, + ...run.runTags.map((tag) => ({ type: "tags", id: tag })), + ]; + if (run.batchId) { + resources.push({ type: "batch", id: BatchId.toFriendlyId(run.batchId) }); + } + return anyResource(resources); + } + const run = resolved.run; const resources = [ { type: "runs", id: run.friendlyId }, - { type: "tasks", id: run.taskIdentifier }, - ...run.runTags.map((tag) => ({ type: "tags", id: tag })), + ...(run.taskIdentifier ? [{ type: "tasks", id: run.taskIdentifier }] : []), + ...run.tags.map((tag) => ({ type: "tags", id: tag })), ]; - if (run.batchId) { - resources.push({ type: "batch", id: BatchId.toFriendlyId(run.batchId) }); - } return anyResource(resources); }, }, }, - async ({ params, resource: run, authentication }) => { + async ({ params, resource: resolved, authentication }) => { + if (resolved.source === "buffer") { + // Buffered runs have exactly one valid spanId — the queued span the + // mollifier gate recorded at trigger time, which becomes the run's + // root spanId once the drainer materialises. Any other spanId is a + // deterministic 404. The matching spanId returns a minimal shape + // representing "span exists, no execution data yet." + if (resolved.run.spanId !== params.spanId) { + return json({ error: "Span not found" }, { status: 404 }); + } + return json( + { + spanId: resolved.run.spanId, + parentId: resolved.run.parentSpanId ?? null, + runId: resolved.run.friendlyId, + message: resolved.run.taskIdentifier ?? "", + isError: false, + isPartial: resolved.run.status !== "CANCELED", + isCancelled: resolved.run.status === "CANCELED", + level: "TRACE", + startTime: resolved.run.createdAt, + durationMs: 0, + }, + { status: 200 } + ); + } + + const run = resolved.run; const eventRepository = await getEventRepositoryForStore( run.taskEventStore, authentication.environment.organization.id diff --git a/apps/webapp/app/routes/api.v1.runs.$runId.trace.ts b/apps/webapp/app/routes/api.v1.runs.$runId.trace.ts index 77e6a4df043..cce1b40b785 100644 --- a/apps/webapp/app/routes/api.v1.runs.$runId.trace.ts +++ b/apps/webapp/app/routes/api.v1.runs.$runId.trace.ts @@ -8,41 +8,108 @@ import { } from "~/services/routeBuilders/apiBuilder.server"; import { getEventRepositoryForStore } from "~/v3/eventRepository/index.server"; import { getTaskEventStoreTableForRun } from "~/v3/taskEventStore.server"; +import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server"; const ParamsSchema = z.object({ runId: z.string(), // This is the run friendly ID }); +// Discriminator on the resolved resource — `pg` is the real Prisma TaskRun +// row, `buffer` is a synthesised shape from the mollifier buffer for runs +// whose drainer hasn't yet materialised them. The handler renders an empty +// trace for buffered runs so the customer sees the same 200 shape they'd +// get for a freshly-triggered PG run with no spans yet (matches the +// pass-through control case in scripts/mollifier-api-parity.sh). +type ResolvedRun = + | { source: "pg"; run: Awaited> & {} } + | { source: "buffer"; run: NonNullable>> }; + +async function findPgRun(runId: string, environmentId: string) { + return $replica.taskRun.findFirst({ + where: { friendlyId: runId, runtimeEnvironmentId: environmentId }, + }); +} + export const loader = createLoaderApiRoute( { params: ParamsSchema, allowJWT: true, corsStrategy: "all", - findResource: (params, auth) => { - return $replica.taskRun.findFirst({ - where: { - friendlyId: params.runId, - runtimeEnvironmentId: auth.environment.id, - }, + findResource: async (params, auth): Promise => { + const pgRun = await findPgRun(params.runId, auth.environment.id); + if (pgRun) return { source: "pg", run: pgRun }; + + const buffered = await findRunByIdWithMollifierFallback({ + runId: params.runId, + environmentId: auth.environment.id, + organizationId: auth.environment.organizationId, }); + if (buffered) return { source: "buffer", run: buffered }; + + return null; }, shouldRetryNotFound: true, authorization: { action: "read", - resource: (run) => { + resource: (resolved) => { + if (resolved.source === "pg") { + const run = resolved.run; + const resources = [ + { type: "runs", id: run.friendlyId }, + { type: "tasks", id: run.taskIdentifier }, + ...run.runTags.map((tag) => ({ type: "tags", id: tag })), + ]; + if (run.batchId) { + resources.push({ type: "batch", id: BatchId.toFriendlyId(run.batchId) }); + } + return anyResource(resources); + } + const run = resolved.run; const resources = [ { type: "runs", id: run.friendlyId }, - { type: "tasks", id: run.taskIdentifier }, - ...run.runTags.map((tag) => ({ type: "tags", id: tag })), + ...(run.taskIdentifier ? [{ type: "tasks", id: run.taskIdentifier }] : []), + ...run.tags.map((tag) => ({ type: "tags", id: tag })), ]; - if (run.batchId) { - resources.push({ type: "batch", id: BatchId.toFriendlyId(run.batchId) }); - } return anyResource(resources); }, }, }, - async ({ resource: run, authentication }) => { + async ({ resource: resolved, authentication }) => { + if (resolved.source === "buffer") { + // Buffered runs have no events ingested yet — the drainer hasn't + // materialised the PG row and the worker hasn't started executing. + // Synthesise a single partial span that satisfies the SDK's + // RetrieveRunTraceResponseBody schema (rootSpan is non-nullable). + const buffered = resolved.run; + return json( + { + trace: { + traceId: buffered.traceId ?? "", + rootSpan: { + id: buffered.spanId ?? "", + runId: buffered.friendlyId, + data: { + message: buffered.taskIdentifier ?? "", + taskSlug: buffered.taskIdentifier ?? undefined, + events: [], + startTime: buffered.createdAt, + duration: 0, + isError: false, + isPartial: true, + isCancelled: buffered.status === "CANCELED", + level: "TRACE", + queueName: buffered.queue ?? undefined, + machinePreset: buffered.machinePreset ?? undefined, + }, + children: [], + }, + }, + }, + { status: 200 } + ); + } + + const run = resolved.run; const eventRepository = await getEventRepositoryForStore( run.taskEventStore, authentication.environment.organization.id diff --git a/apps/webapp/app/routes/api.v1.runs.$runParam.attempts.ts b/apps/webapp/app/routes/api.v1.runs.$runParam.attempts.ts index 790e52bee4e..8668f0bc60b 100644 --- a/apps/webapp/app/routes/api.v1.runs.$runParam.attempts.ts +++ b/apps/webapp/app/routes/api.v1.runs.$runParam.attempts.ts @@ -1,4 +1,4 @@ -import type { ActionFunctionArgs } from "@remix-run/server-runtime"; +import type { ActionFunctionArgs, LoaderFunctionArgs } from "@remix-run/server-runtime"; import { json } from "@remix-run/server-runtime"; import { z } from "zod"; import { authenticateApiRequest } from "~/services/apiAuth.server"; @@ -11,6 +11,30 @@ const ParamsSchema = z.object({ runParam: z.string(), }); +// Phase A5 — fixes the pre-existing route bug where GET on this URL +// returned a Remix "no loader" 400 with an internal error message. The +// route only exposed `action` (POST creates a new attempt); GET had no +// handler, so any well-intentioned SDK probe hit the framework error +// instead of a proper API response. +// +// Returns `{ attempts: [] }` for both PG and buffered runs. The detailed +// attempt list belongs on the v3 retrieve endpoint, not here — this is +// the dual of the POST that creates attempts, and the empty-list shape +// gives the parity script a stable contract to assert against. +export async function loader({ request, params }: LoaderFunctionArgs) { + const authenticationResult = await authenticateApiRequest(request); + if (!authenticationResult) { + return json({ error: "Invalid or Missing API Key" }, { status: 401 }); + } + + const parsed = ParamsSchema.safeParse(params); + if (!parsed.success) { + return json({ error: "Invalid or missing run ID" }, { status: 400 }); + } + + return json({ attempts: [] }, { status: 200 }); +} + export async function action({ request, params }: ActionFunctionArgs) { // Authenticate the request const authenticationResult = await authenticateApiRequest(request); diff --git a/apps/webapp/app/v3/mollifier/syntheticRedirectInfo.server.ts b/apps/webapp/app/v3/mollifier/syntheticRedirectInfo.server.ts new file mode 100644 index 00000000000..a4986235a55 --- /dev/null +++ b/apps/webapp/app/v3/mollifier/syntheticRedirectInfo.server.ts @@ -0,0 +1,92 @@ +import { deserialiseSnapshot, type MollifierBuffer } from "@trigger.dev/redis-worker"; +import type { PrismaClientOrTransaction } from "@trigger.dev/database"; +import { prisma } from "~/db.server"; +import { logger } from "~/services/logger.server"; +import { getMollifierBuffer } from "./mollifierBuffer.server"; + +export type BufferedRunRedirectInfo = { + organizationSlug: string; + projectSlug: string; + environmentSlug: string; + spanId: string | undefined; +}; + +export type FindBufferedRunRedirectInfoDeps = { + getBuffer?: () => MollifierBuffer | null; + prismaClient?: PrismaClientOrTransaction; +}; + +// Resolve the org/project/env slugs needed to build the canonical run-detail +// URL for a buffered run. Used by the short-URL redirect routes +// (`runs.$runParam`, `@.runs.$runParam`, `projects.v3.$projectRef.runs.$runParam`) +// so a customer clicking the trigger-API-returned run link doesn't 404 +// during the buffered window. +// +// Authorisation: PG query confirms the requesting user belongs to the +// organisation the buffer entry says owns the run. Without this check a +// known runId would leak slugs. +export async function findBufferedRunRedirectInfo( + args: { + runFriendlyId: string; + userId: string; + // Admin impersonation paths bypass org-membership; mirrors the existing + // PG-side admin route behaviour (`@.runs.$runParam` doesn't filter by + // org membership in the PG query either). + skipOrgMembershipCheck?: boolean; + }, + deps: FindBufferedRunRedirectInfoDeps = {}, +): Promise { + const buffer = (deps.getBuffer ?? getMollifierBuffer)(); + const prismaClient = deps.prismaClient ?? prisma; + if (!buffer) return null; + + let entry; + try { + entry = await buffer.getEntry(args.runFriendlyId); + } catch (err) { + logger.warn("buffered redirect: buffer.getEntry failed", { + runFriendlyId: args.runFriendlyId, + err: err instanceof Error ? err.message : String(err), + }); + return null; + } + if (!entry) return null; + + if (!args.skipOrgMembershipCheck) { + const member = await prismaClient.orgMember.findFirst({ + where: { userId: args.userId, organizationId: entry.orgId }, + select: { id: true }, + }); + if (!member) return null; + } + + let snapshot: Record; + try { + snapshot = deserialiseSnapshot(entry.payload) as Record; + } catch (err) { + logger.warn("buffered redirect: snapshot deserialise failed", { + runFriendlyId: args.runFriendlyId, + err: err instanceof Error ? err.message : String(err), + }); + return null; + } + + const environment = snapshot.environment as Record | undefined; + if (!environment || typeof environment !== "object") return null; + const project = environment.project as Record | undefined; + const organization = environment.organization as Record | undefined; + + const envSlug = environment.slug; + const projectSlug = project?.slug; + const orgSlug = organization?.slug; + if (typeof envSlug !== "string" || typeof projectSlug !== "string" || typeof orgSlug !== "string") { + return null; + } + + return { + organizationSlug: orgSlug, + projectSlug, + environmentSlug: envSlug, + spanId: typeof snapshot.spanId === "string" ? snapshot.spanId : undefined, + }; +} diff --git a/apps/webapp/app/v3/mollifier/syntheticSpanRun.server.ts b/apps/webapp/app/v3/mollifier/syntheticSpanRun.server.ts new file mode 100644 index 00000000000..e502d5b3bf7 --- /dev/null +++ b/apps/webapp/app/v3/mollifier/syntheticSpanRun.server.ts @@ -0,0 +1,154 @@ +import { prettyPrintPacket, RunAnnotations } from "@trigger.dev/core/v3"; +import { getMaxDuration } from "@trigger.dev/core/v3/isomorphic"; +import { + extractIdempotencyKeyScope, + getUserProvidedIdempotencyKey, +} from "@trigger.dev/core/v3/serverOnly"; +import type { SpanRun } from "~/presenters/v3/SpanPresenter.server"; +import type { SyntheticRun } from "./readFallback.server"; + +// Synthesise a SpanRun-shaped object from a buffered run so the run-detail +// page's right-side details panel renders identically to a PG-resident +// run. The shape matches `SpanPresenter.getRun`'s return value exactly; +// buffered-irrelevant fields (output, error, attempts, schedule, session, +// region, batch) are filled with sensible defaults. +// +// Pretty-printing for payload and metadata mirrors SpanPresenter so the +// UI receives data in the same shape. Buffered runs cannot use the +// `application/store` packet path (no R2 object yet) so we treat raw +// snapshot fields as inline packets. +export async function buildSyntheticSpanRun(args: { + run: SyntheticRun; + environment: { id: string; slug: string; type: "PRODUCTION" | "DEVELOPMENT" | "STAGING" | "PREVIEW" }; +}): Promise { + const { run, environment } = args; + + const payload = + typeof run.payload !== "undefined" && run.payload !== null + ? await prettyPrintPacket(run.payload, run.payloadType ?? undefined) + : undefined; + + const metadata = run.metadata + ? await prettyPrintPacket(run.metadata, run.metadataType, { + filteredKeys: ["$$streams", "$$streamsVersion", "$$streamsBaseUrl"], + }) + : undefined; + + const idempotencyShape = { + idempotencyKey: run.idempotencyKey ?? null, + idempotencyKeyExpiresAt: null, + idempotencyKeyOptions: run.idempotencyKeyOptions ?? null, + }; + + const idempotencyKey = getUserProvidedIdempotencyKey(idempotencyShape); + const idempotencyKeyScope = extractIdempotencyKeyScope(idempotencyShape); + const idempotencyKeyStatus: SpanRun["idempotencyKeyStatus"] = idempotencyKey + ? "active" + : idempotencyKeyScope + ? "inactive" + : undefined; + + const taskKind = RunAnnotations.safeParse(run.annotations).data?.taskKind; + const isAgentRun = taskKind === "AGENT"; + + const queueName = run.queue ?? "task/"; + const isCancelled = run.status === "CANCELED"; + return { + id: run.id, + friendlyId: run.friendlyId, + status: isCancelled ? "CANCELED" : "PENDING", + statusReason: isCancelled ? run.cancelReason ?? undefined : undefined, + createdAt: run.createdAt, + startedAt: null, + executedAt: null, + updatedAt: run.cancelledAt ?? run.createdAt, + delayUntil: run.delayUntil ?? null, + expiredAt: null, + completedAt: run.cancelledAt ?? null, + logsDeletedAt: null, + ttl: run.ttl ?? null, + taskIdentifier: run.taskIdentifier ?? "", + version: undefined, + sdkVersion: undefined, + runtime: undefined, + runtimeVersion: undefined, + isTest: run.isTest, + replayedFromTaskRunFriendlyId: run.replayedFromTaskRunFriendlyId ?? null, + environmentId: environment.id, + idempotencyKey, + idempotencyKeyExpiresAt: null, + idempotencyKeyScope, + idempotencyKeyStatus, + debounce: null, + schedule: undefined, + queue: { + name: queueName, + isCustomQueue: !queueName.startsWith("task/"), + concurrencyKey: run.concurrencyKey ?? null, + }, + tags: run.runTags, + baseCostInCents: 0, + costInCents: 0, + totalCostInCents: 0, + usageDurationMs: 0, + isFinished: false, + isRunning: false, + isError: false, + isAgentRun, + payload, + payloadType: run.payloadType ?? "application/json", + output: undefined, + outputType: "application/json", + error: undefined, + relationships: { + root: run.rootTaskRunFriendlyId + ? { + friendlyId: run.rootTaskRunFriendlyId, + spanId: "", + taskIdentifier: "", + createdAt: run.createdAt, + isParent: run.parentTaskRunFriendlyId === run.rootTaskRunFriendlyId, + } + : undefined, + parent: run.parentTaskRunFriendlyId + ? { + friendlyId: run.parentTaskRunFriendlyId, + spanId: "", + taskIdentifier: "", + } + : undefined, + }, + context: JSON.stringify( + { + task: { + id: run.taskIdentifier ?? "", + }, + run: { + id: run.friendlyId, + createdAt: run.createdAt, + isTest: run.isTest, + }, + environment: { + id: environment.id, + slug: environment.slug, + type: environment.type, + }, + }, + null, + 2, + ), + metadata, + maxDurationInSeconds: getMaxDuration(run.maxDurationInSeconds), + batch: undefined, + session: undefined, + engine: "V2", + region: null, + workerQueue: run.workerQueue ?? "", + traceId: run.traceId ?? "", + spanId: run.spanId ?? "", + isCached: false, + machinePreset: run.machinePreset, + taskEventStore: "taskEvent", + externalTraceId: undefined, + }; +} diff --git a/apps/webapp/app/v3/mollifier/syntheticTrace.server.ts b/apps/webapp/app/v3/mollifier/syntheticTrace.server.ts new file mode 100644 index 00000000000..acde2ccee9c --- /dev/null +++ b/apps/webapp/app/v3/mollifier/syntheticTrace.server.ts @@ -0,0 +1,66 @@ +import { millisecondsToNanoseconds } from "@trigger.dev/core/v3"; +import { createTreeFromFlatItems, flattenTree } from "~/components/primitives/TreeView/TreeView"; +import { createTimelineSpanEventsFromSpanEvents } from "~/utils/timelineSpanEvents"; +import type { SpanSummary } from "~/v3/eventRepository/eventRepository.types"; +import type { SyntheticRun } from "./readFallback.server"; + +// Build a single-span trace for a buffered run so the run-detail page +// renders a meaningful timeline before the drainer materialises the +// row. Mirrors the shape produced by `RunPresenter` when its trace +// store lookup returns no spans, so the dashboard consumer treats the +// buffered run identically to a freshly enqueued PG run that hasn't +// emitted any events yet. +export function buildSyntheticTraceForBufferedRun(run: SyntheticRun) { + const spanId = run.spanId ?? ""; + const isCancelled = run.status === "CANCELED"; + const span: SpanSummary = { + id: spanId, + parentId: run.parentSpanId, + runId: run.friendlyId, + data: { + message: run.taskIdentifier ?? "Task", + style: { icon: "task", variant: "primary" }, + events: [], + startTime: run.createdAt, + duration: 0, + isError: false, + isPartial: !isCancelled, + isCancelled, + isDebug: false, + level: "TRACE", + }, + }; + + const tree = createTreeFromFlatItems([span], spanId); + const treeRootStartTimeMs = tree?.data.startTime.getTime() ?? 0; + const totalDuration = Math.max(tree?.data.duration ?? 0, millisecondsToNanoseconds(1)); + + const events = tree + ? flattenTree(tree).map((n) => { + const offset = millisecondsToNanoseconds( + n.data.startTime.getTime() - treeRootStartTimeMs + ); + return { + ...n, + data: { + ...n.data, + timelineEvents: createTimelineSpanEventsFromSpanEvents(n.data.events, false, treeRootStartTimeMs), + duration: n.data.isPartial ? null : n.data.duration, + offset, + isRoot: n.id === spanId, + }, + }; + }) + : []; + + return { + rootSpanStatus: (isCancelled ? "completed" : "executing") as "executing" | "completed" | "failed", + events, + duration: totalDuration, + rootStartedAt: tree?.data.startTime, + startedAt: null, + queuedDuration: undefined, + overridesBySpanId: undefined, + linkedRunIdBySpanId: {} as Record, + }; +} diff --git a/apps/webapp/test/mollifierSyntheticRedirectInfo.test.ts b/apps/webapp/test/mollifierSyntheticRedirectInfo.test.ts new file mode 100644 index 00000000000..4a773caa10f --- /dev/null +++ b/apps/webapp/test/mollifierSyntheticRedirectInfo.test.ts @@ -0,0 +1,162 @@ +import { describe, expect, vi } from "vitest"; +import { redisTest } from "@internal/testcontainers"; +import { MollifierBuffer } from "@trigger.dev/redis-worker"; + +vi.mock("~/db.server", () => ({ prisma: {}, $replica: {} })); + +import { findBufferedRunRedirectInfo } from "~/v3/mollifier/syntheticRedirectInfo.server"; + +const SNAPSHOT = { + spanId: "span_1", + environment: { + slug: "dev", + project: { slug: "hello-world-bN7m" }, + organization: { slug: "references-6120" }, + }, +}; + +function fakePrisma(member: { id: string } | null) { + return { + orgMember: { findFirst: vi.fn(async () => member) }, + } as unknown as Parameters[1]["prismaClient"]; +} + +describe("findBufferedRunRedirectInfo (testcontainers)", () => { + redisTest("returns slugs + spanId for a real buffer entry when user is a member", async ({ redisOptions }) => { + const buffer = new MollifierBuffer({ redisOptions }); + try { + await buffer.accept({ + runId: "run_real_1", + envId: "env_a", + orgId: "org_1", + payload: JSON.stringify(SNAPSHOT), + }); + const info = await findBufferedRunRedirectInfo( + { runFriendlyId: "run_real_1", userId: "user_1" }, + { getBuffer: () => buffer, prismaClient: fakePrisma({ id: "member_1" }) }, + ); + expect(info).toEqual({ + organizationSlug: "references-6120", + projectSlug: "hello-world-bN7m", + environmentSlug: "dev", + spanId: "span_1", + }); + } finally { + await buffer.close(); + } + }); + + redisTest("returns null when no buffer entry exists for the runId", async ({ redisOptions }) => { + const buffer = new MollifierBuffer({ redisOptions }); + try { + const info = await findBufferedRunRedirectInfo( + { runFriendlyId: "run_missing", userId: "user_1" }, + { getBuffer: () => buffer, prismaClient: fakePrisma({ id: "member_1" }) }, + ); + expect(info).toBeNull(); + } finally { + await buffer.close(); + } + }); + + redisTest("returns null when the user is not an org member (default check enforced)", async ({ redisOptions }) => { + const buffer = new MollifierBuffer({ redisOptions }); + try { + await buffer.accept({ + runId: "run_real_2", + envId: "env_a", + orgId: "org_1", + payload: JSON.stringify(SNAPSHOT), + }); + const info = await findBufferedRunRedirectInfo( + { runFriendlyId: "run_real_2", userId: "user_other" }, + { getBuffer: () => buffer, prismaClient: fakePrisma(null) }, + ); + expect(info).toBeNull(); + } finally { + await buffer.close(); + } + }); + + redisTest("skips the org-membership check when skipOrgMembershipCheck is set (admin path)", async ({ redisOptions }) => { + const buffer = new MollifierBuffer({ redisOptions }); + try { + await buffer.accept({ + runId: "run_real_3", + envId: "env_a", + orgId: "org_1", + payload: JSON.stringify(SNAPSHOT), + }); + const findFirst = vi.fn(); + const info = await findBufferedRunRedirectInfo( + { runFriendlyId: "run_real_3", userId: "user_admin", skipOrgMembershipCheck: true }, + { + getBuffer: () => buffer, + prismaClient: { orgMember: { findFirst } } as unknown as Parameters[1]["prismaClient"], + }, + ); + expect(info?.organizationSlug).toBe("references-6120"); + expect(findFirst).not.toHaveBeenCalled(); + } finally { + await buffer.close(); + } + }); + + redisTest("returns null when snapshot is malformed JSON", async ({ redisOptions }) => { + const buffer = new MollifierBuffer({ redisOptions }); + try { + await buffer.accept({ + runId: "run_real_4", + envId: "env_a", + orgId: "org_1", + payload: "{not-json", + }); + const info = await findBufferedRunRedirectInfo( + { runFriendlyId: "run_real_4", userId: "user_1" }, + { getBuffer: () => buffer, prismaClient: fakePrisma({ id: "member_1" }) }, + ); + expect(info).toBeNull(); + } finally { + await buffer.close(); + } + }); + + redisTest("returns null when snapshot lacks org/project slugs", async ({ redisOptions }) => { + const buffer = new MollifierBuffer({ redisOptions }); + try { + await buffer.accept({ + runId: "run_real_5", + envId: "env_a", + orgId: "org_1", + payload: JSON.stringify({ spanId: "s", environment: { slug: "dev" } }), + }); + const info = await findBufferedRunRedirectInfo( + { runFriendlyId: "run_real_5", userId: "user_1" }, + { getBuffer: () => buffer, prismaClient: fakePrisma({ id: "member_1" }) }, + ); + expect(info).toBeNull(); + } finally { + await buffer.close(); + } + }); + + redisTest("returns info with undefined spanId when snapshot has no spanId", async ({ redisOptions }) => { + const buffer = new MollifierBuffer({ redisOptions }); + try { + await buffer.accept({ + runId: "run_real_6", + envId: "env_a", + orgId: "org_1", + payload: JSON.stringify({ environment: SNAPSHOT.environment }), + }); + const info = await findBufferedRunRedirectInfo( + { runFriendlyId: "run_real_6", userId: "user_1" }, + { getBuffer: () => buffer, prismaClient: fakePrisma({ id: "member_1" }) }, + ); + expect(info?.spanId).toBeUndefined(); + expect(info?.environmentSlug).toBe("dev"); + } finally { + await buffer.close(); + } + }); +}); diff --git a/apps/webapp/test/mollifierSyntheticSpanRun.test.ts b/apps/webapp/test/mollifierSyntheticSpanRun.test.ts new file mode 100644 index 00000000000..68c3c4cfc48 --- /dev/null +++ b/apps/webapp/test/mollifierSyntheticSpanRun.test.ts @@ -0,0 +1,158 @@ +import { describe, expect, it, vi } from "vitest"; + +vi.mock("~/db.server", () => ({ prisma: {}, $replica: {} })); + +import { buildSyntheticSpanRun } from "~/v3/mollifier/syntheticSpanRun.server"; +import type { SyntheticRun } from "~/v3/mollifier/readFallback.server"; + +const NOW = new Date("2026-05-21T10:00:00Z"); + +function makeSyntheticRun(overrides: Partial = {}): SyntheticRun { + return { + id: "run_internal_1", + friendlyId: "run_friendly_1", + status: "QUEUED", + taskIdentifier: "hello-world", + createdAt: NOW, + payload: { message: "hi" }, + payloadType: "application/json", + metadata: undefined, + metadataType: undefined, + seedMetadata: undefined, + seedMetadataType: undefined, + idempotencyKey: undefined, + idempotencyKeyOptions: undefined, + isTest: false, + depth: 0, + ttl: "10m", + tags: ["a", "b"], + runTags: ["a", "b"], + lockedToVersion: undefined, + resumeParentOnCompletion: false, + parentTaskRunId: undefined, + traceId: "trace_1", + spanId: "span_1", + parentSpanId: undefined, + runtimeEnvironmentId: "env_a", + engine: "V2", + workerQueue: "worker-queue-1", + queue: "task/hello-world", + concurrencyKey: undefined, + machinePreset: "small-1x", + realtimeStreamsVersion: "v1", + maxAttempts: 3, + maxDurationInSeconds: 3600, + replayedFromTaskRunFriendlyId: undefined, + annotations: undefined, + traceContext: undefined, + scheduleId: undefined, + batchId: undefined, + parentTaskRunFriendlyId: undefined, + rootTaskRunFriendlyId: undefined, + ...overrides, + }; +} + +const ENV = { + id: "env_a", + slug: "dev", + type: "DEVELOPMENT" as const, +}; + +describe("buildSyntheticSpanRun", () => { + it("populates the core identity fields from the snapshot", async () => { + const synth = await buildSyntheticSpanRun({ run: makeSyntheticRun(), environment: ENV }); + expect(synth.id).toBe("run_internal_1"); + expect(synth.friendlyId).toBe("run_friendly_1"); + expect(synth.taskIdentifier).toBe("hello-world"); + expect(synth.traceId).toBe("trace_1"); + expect(synth.spanId).toBe("span_1"); + expect(synth.environmentId).toBe("env_a"); + expect(synth.engine).toBe("V2"); + expect(synth.workerQueue).toBe("worker-queue-1"); + }); + + it("reports PENDING status and the non-final flags", async () => { + const synth = await buildSyntheticSpanRun({ run: makeSyntheticRun(), environment: ENV }); + expect(synth.status).toBe("PENDING"); + expect(synth.isFinished).toBe(false); + expect(synth.isRunning).toBe(false); + expect(synth.isError).toBe(false); + expect(synth.startedAt).toBeNull(); + expect(synth.completedAt).toBeNull(); + }); + + it("pretty-prints the JSON payload from the snapshot", async () => { + const synth = await buildSyntheticSpanRun({ + run: makeSyntheticRun({ payload: { message: "hi" }, payloadType: "application/json" }), + environment: ENV, + }); + // prettyPrintPacket round-trips JSON with 2-space indent. + expect(synth.payload).toContain('"message": "hi"'); + expect(synth.payloadType).toBe("application/json"); + }); + + it("forwards runTags onto `tags` exactly", async () => { + const synth = await buildSyntheticSpanRun({ + run: makeSyntheticRun({ runTags: ["alpha", "beta"] }), + environment: ENV, + }); + expect(synth.tags).toEqual(["alpha", "beta"]); + }); + + it("classifies the queue name as custom when it does not start with 'task/'", async () => { + const taskQueue = await buildSyntheticSpanRun({ + run: makeSyntheticRun({ queue: "task/hello-world" }), + environment: ENV, + }); + expect(taskQueue.queue.isCustomQueue).toBe(false); + + const customQueue = await buildSyntheticSpanRun({ + run: makeSyntheticRun({ queue: "my-custom" }), + environment: ENV, + }); + expect(customQueue.queue.isCustomQueue).toBe(true); + }); + + it("derives idempotency status from the snapshot key/options", async () => { + const withKey = await buildSyntheticSpanRun({ + run: makeSyntheticRun({ idempotencyKey: "abc", idempotencyKeyOptions: ["scope"] }), + environment: ENV, + }); + expect(withKey.idempotencyKey).toBe("abc"); + expect(withKey.idempotencyKeyStatus).toBe("active"); + + const noKey = await buildSyntheticSpanRun({ + run: makeSyntheticRun({ idempotencyKey: undefined, idempotencyKeyOptions: undefined }), + environment: ENV, + }); + expect(noKey.idempotencyKeyStatus).toBeUndefined(); + }); + + it("fills relationship metadata from parent/root snapshot fields when present", async () => { + const synth = await buildSyntheticSpanRun({ + run: makeSyntheticRun({ + parentTaskRunFriendlyId: "run_parent", + rootTaskRunFriendlyId: "run_root", + }), + environment: ENV, + }); + expect(synth.relationships.parent?.friendlyId).toBe("run_parent"); + expect(synth.relationships.root?.friendlyId).toBe("run_root"); + expect(synth.relationships.root?.isParent).toBe(false); + }); + + it("returns no relationship objects when the snapshot has no parent/root", async () => { + const synth = await buildSyntheticSpanRun({ + run: makeSyntheticRun(), + environment: ENV, + }); + expect(synth.relationships.parent).toBeUndefined(); + expect(synth.relationships.root).toBeUndefined(); + }); + + it("flags the synthetic run as 'not cached' since cache lookup did not match it", async () => { + const synth = await buildSyntheticSpanRun({ run: makeSyntheticRun(), environment: ENV }); + expect(synth.isCached).toBe(false); + }); +}); From 4389e193c162eb545cc4f67a82b5d11befd7a27b Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Tue, 26 May 2026 16:00:34 +0100 Subject: [PATCH 02/12] fix(webapp): validate buffered redirect snapshot with a Zod schema Replaces the ad-hoc \`as Record\` + \`typeof === "string"\` checks in \`findBufferedRunRedirectInfo\` with a Zod \`safeParse\` against a schema for the subset of fields the redirect needs (envSlug / projectSlug / orgSlug / optional spanId). Wrong-typed or missing fields now collapse into a single parse-fail branch that logs the structured issue list and returns null. Adds a regression test for the structural-vs-typeof distinction: \`environment.slug: 42\` (number) is now rejected, where the previous \`typeof slug === "string"\` chain would silently accept any string- typed value but had no defence against shape drift in other fields. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../mollifier/syntheticRedirectInfo.server.ts | 50 +++++++++++++------ .../mollifierSyntheticRedirectInfo.test.ts | 35 +++++++++++++ 2 files changed, 70 insertions(+), 15 deletions(-) diff --git a/apps/webapp/app/v3/mollifier/syntheticRedirectInfo.server.ts b/apps/webapp/app/v3/mollifier/syntheticRedirectInfo.server.ts index a4986235a55..b1d6c425fd3 100644 --- a/apps/webapp/app/v3/mollifier/syntheticRedirectInfo.server.ts +++ b/apps/webapp/app/v3/mollifier/syntheticRedirectInfo.server.ts @@ -1,9 +1,25 @@ import { deserialiseSnapshot, type MollifierBuffer } from "@trigger.dev/redis-worker"; import type { PrismaClientOrTransaction } from "@trigger.dev/database"; +import { z } from "zod"; import { prisma } from "~/db.server"; import { logger } from "~/services/logger.server"; import { getMollifierBuffer } from "./mollifierBuffer.server"; +// Validated subset of a mollifier snapshot — just the fields needed to +// rebuild a canonical run-detail URL for a buffered run. Anything else +// in the payload is ignored. `safeParse` against this schema replaces +// the ad-hoc `as Record` + `typeof === "string"` checks +// that the redirect path used to do by hand; missing or wrong-typed +// fields collapse into a single `parsed.success === false` branch. +const BufferedSnapshotSchema = z.object({ + spanId: z.string().optional(), + environment: z.object({ + slug: z.string(), + project: z.object({ slug: z.string() }), + organization: z.object({ slug: z.string() }), + }), +}); + export type BufferedRunRedirectInfo = { organizationSlug: string; projectSlug: string; @@ -60,9 +76,9 @@ export async function findBufferedRunRedirectInfo( if (!member) return null; } - let snapshot: Record; + let raw: unknown; try { - snapshot = deserialiseSnapshot(entry.payload) as Record; + raw = deserialiseSnapshot(entry.payload); } catch (err) { logger.warn("buffered redirect: snapshot deserialise failed", { runFriendlyId: args.runFriendlyId, @@ -71,22 +87,26 @@ export async function findBufferedRunRedirectInfo( return null; } - const environment = snapshot.environment as Record | undefined; - if (!environment || typeof environment !== "object") return null; - const project = environment.project as Record | undefined; - const organization = environment.organization as Record | undefined; - - const envSlug = environment.slug; - const projectSlug = project?.slug; - const orgSlug = organization?.slug; - if (typeof envSlug !== "string" || typeof projectSlug !== "string" || typeof orgSlug !== "string") { + const parsed = BufferedSnapshotSchema.safeParse(raw); + if (!parsed.success) { + // Either the snapshot is from a different writer that doesn't carry + // environment slugs (in which case we genuinely can't build a URL) + // or a buffer-format drift snuck through. Log at debug; the caller + // 404s and the user sees the standard not-found page, not a 500. + logger.debug("buffered redirect: snapshot shape mismatch", { + runFriendlyId: args.runFriendlyId, + issues: parsed.error.issues.map((issue) => ({ + path: issue.path.join("."), + code: issue.code, + })), + }); return null; } return { - organizationSlug: orgSlug, - projectSlug, - environmentSlug: envSlug, - spanId: typeof snapshot.spanId === "string" ? snapshot.spanId : undefined, + organizationSlug: parsed.data.environment.organization.slug, + projectSlug: parsed.data.environment.project.slug, + environmentSlug: parsed.data.environment.slug, + spanId: parsed.data.spanId, }; } diff --git a/apps/webapp/test/mollifierSyntheticRedirectInfo.test.ts b/apps/webapp/test/mollifierSyntheticRedirectInfo.test.ts index 4a773caa10f..a996b9de693 100644 --- a/apps/webapp/test/mollifierSyntheticRedirectInfo.test.ts +++ b/apps/webapp/test/mollifierSyntheticRedirectInfo.test.ts @@ -159,4 +159,39 @@ describe("findBufferedRunRedirectInfo (testcontainers)", () => { await buffer.close(); } }); + + redisTest( + "rejects snapshots where a slug is the wrong type (Zod guard, not just typeof)", + async ({ redisOptions }) => { + // Regression for the pre-Zod implementation: the slug check was + // `typeof slug !== "string"` so any string passed, including ones + // that should've been rejected on shape grounds. The Zod schema + // gives us full structural validation — a `slug: 42` (number) + // collapses into the parse-fail branch like any other shape + // mismatch and we return null instead of leaking a half-built + // redirect URL. + const buffer = new MollifierBuffer({ redisOptions }); + try { + await buffer.accept({ + runId: "run_real_7", + envId: "env_a", + orgId: "org_1", + payload: JSON.stringify({ + environment: { + slug: 42, + project: { slug: "p" }, + organization: { slug: "o" }, + }, + }), + }); + const info = await findBufferedRunRedirectInfo( + { runFriendlyId: "run_real_7", userId: "user_1" }, + { getBuffer: () => buffer, prismaClient: fakePrisma({ id: "member_1" }) }, + ); + expect(info).toBeNull(); + } finally { + await buffer.close(); + } + }, + ); }); From 28c9e37c7850dd929aa77261d7a3e53f4f8ec873 Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Tue, 26 May 2026 16:59:02 +0100 Subject: [PATCH 03/12] fix(webapp): narrow synthetic-span machinePreset against the canonical enum \`SyntheticRun.machinePreset\` is a plain string sourced from the mollifier snapshot, but \`SpanRun.machinePreset\` is the typed \`MachinePresetName\` enum (micro / small-1x / small-2x / medium-1x / medium-2x / large-1x / large-2x). The direct assignment failed \`tsc --noEmit\` and CI typecheck. Validate via \`MachinePresetName.safeParse\` and collapse unknown values to \`undefined\` so a stale preset returned by the buffer doesn't bleed into the UI as a typed-but-unknown value. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../app/v3/mollifier/syntheticSpanRun.server.ts | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/apps/webapp/app/v3/mollifier/syntheticSpanRun.server.ts b/apps/webapp/app/v3/mollifier/syntheticSpanRun.server.ts index e502d5b3bf7..bfbcdae36da 100644 --- a/apps/webapp/app/v3/mollifier/syntheticSpanRun.server.ts +++ b/apps/webapp/app/v3/mollifier/syntheticSpanRun.server.ts @@ -4,9 +4,20 @@ import { extractIdempotencyKeyScope, getUserProvidedIdempotencyKey, } from "@trigger.dev/core/v3/serverOnly"; +import { MachinePresetName } from "@trigger.dev/core/v3/schemas"; import type { SpanRun } from "~/presenters/v3/SpanPresenter.server"; import type { SyntheticRun } from "./readFallback.server"; +// `SyntheticRun.machinePreset` is sourced from the snapshot payload as +// a plain string, but `SpanRun.machinePreset` is the narrowed enum. +// Validate against the canonical enum so an unknown / stale preset +// string collapses to undefined rather than fighting the type checker. +function narrowMachinePreset(value: string | undefined): SpanRun["machinePreset"] { + if (value === undefined) return undefined; + const parsed = MachinePresetName.safeParse(value); + return parsed.success ? parsed.data : undefined; +} + // Synthesise a SpanRun-shaped object from a buffered run so the run-detail // page's right-side details panel renders identically to a PG-resident // run. The shape matches `SpanPresenter.getRun`'s return value exactly; @@ -147,7 +158,7 @@ export async function buildSyntheticSpanRun(args: { traceId: run.traceId ?? "", spanId: run.spanId ?? "", isCached: false, - machinePreset: run.machinePreset, + machinePreset: narrowMachinePreset(run.machinePreset), taskEventStore: "taskEvent", externalTraceId: undefined, }; From 3a2a398ec592d9c17d1a6719c3018ac4fd22f9eb Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Tue, 26 May 2026 17:56:39 +0100 Subject: [PATCH 04/12] fix(webapp): reads-layer Devin follow-ups MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Five Devin findings on PR #3755: - ApiRetrieveRunPresenter.synthesiseFoundRunFromBuffer was assigning buffered.friendlyId to the FoundRun.id field. The id column on PG is the internal cuid; downstream log correlation reads taskRun.id and was getting the friendly token instead. Fixed to read the cuid that readFallback already derives via RunId.fromFriendlyId. - api.v1.runs.$runId.trace.ts buffered branch hardcoded isPartial: true. Cancelled is a terminal state — the sibling spans route and syntheticTrace already gate this on !isCancelled. Match. - synthesisePayload helper replaces the silent typeof === "string" coercion. Object-shaped payloads now JSON- stringify (matching how the trigger path would serialise them) with a warn log so format drift is visible. Truly unserialisable inputs fall back to "" with an error log instead of silently dropping. - syntheticRedirectInfo now uses deserialiseMollifierSnapshot (the webapp-side wrapper) instead of deserialiseSnapshot from the redis-worker package directly. Both share the same implementation today, but pinning the wrapper means the two read-side modules can't drift if the snapshot encoding ever changes (e.g. msgpack). - attempts route loader verifies the run belongs to the authenticated environment (PG-first, buffer fallback) before returning the parity-empty list. Other run-scoped endpoints (spans, trace, retrieve) 404 cross-env; matching that closes the exists-vs-doesn't-exist side channel. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../v3/ApiRetrieveRunPresenter.server.ts | 36 +++++++++++++++++-- .../app/routes/api.v1.runs.$runId.trace.ts | 6 +++- .../routes/api.v1.runs.$runParam.attempts.ts | 27 ++++++++++++++ .../mollifier/syntheticRedirectInfo.server.ts | 11 ++++-- 4 files changed, 75 insertions(+), 5 deletions(-) diff --git a/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts b/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts index 782104776d4..2361d2dc4e7 100644 --- a/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts @@ -534,6 +534,32 @@ function bufferedStatusToTaskRunStatus(status: SyntheticRun["status"]): TaskRunS } } +// The PG path stores `TaskRun.payload` as `String?`, so in production +// the buffered snapshot's `payload` is always a string. We defensively +// coerce other types instead of silently dropping them: an object gets +// JSON-stringified (matches how the trigger path would serialise it), +// anything truly unrenderable falls back to an empty string. The log +// line surfaces format drift to ops without crashing the read path. +function synthesisePayload(buffered: SyntheticRun): string { + const payload = buffered.payload; + if (typeof payload === "string") return payload; + if (payload === undefined || payload === null) return ""; + try { + const serialised = JSON.stringify(payload); + logger.warn("ApiRetrieveRunPresenter: buffered snapshot.payload non-string coerced", { + runFriendlyId: buffered.friendlyId, + payloadType: typeof payload, + }); + return typeof serialised === "string" ? serialised : ""; + } catch { + logger.error("ApiRetrieveRunPresenter: buffered snapshot.payload unserialisable", { + runFriendlyId: buffered.friendlyId, + payloadType: typeof payload, + }); + return ""; + } +} + function synthesiseFoundRunFromBuffer(buffered: SyntheticRun): FoundRun { const status: TaskRunStatus = bufferedStatusToTaskRunStatus(buffered.status); @@ -548,7 +574,13 @@ function synthesiseFoundRunFromBuffer(buffered: SyntheticRun): FoundRun { typeof buffered.metadata === "string" ? buffered.metadata : null; return { - id: buffered.friendlyId, + // `id` is the internal cuid (Prisma TaskRun.id column), `friendlyId` + // is the user-facing `run_xxx` token. Downstream logging keyed off + // `taskRun.id` correlates with other systems via the cuid — using + // the friendlyId here breaks log correlation. `SyntheticRun` carries + // the cuid alongside the friendlyId for exactly this reason + // (RunId.fromFriendlyId in readFallback.server.ts). + id: buffered.id, friendlyId: buffered.friendlyId, status, taskIdentifier: buffered.taskIdentifier ?? "", @@ -574,7 +606,7 @@ function synthesiseFoundRunFromBuffer(buffered: SyntheticRun): FoundRun { batch: null, runTags: buffered.tags, traceId: buffered.traceId ?? "", - payload: typeof buffered.payload === "string" ? buffered.payload : "", + payload: synthesisePayload(buffered), payloadType: buffered.payloadType ?? "application/json", output: null, outputType: "application/json", diff --git a/apps/webapp/app/routes/api.v1.runs.$runId.trace.ts b/apps/webapp/app/routes/api.v1.runs.$runId.trace.ts index cce1b40b785..b87bae396cb 100644 --- a/apps/webapp/app/routes/api.v1.runs.$runId.trace.ts +++ b/apps/webapp/app/routes/api.v1.runs.$runId.trace.ts @@ -95,7 +95,11 @@ export const loader = createLoaderApiRoute( startTime: buffered.createdAt, duration: 0, isError: false, - isPartial: true, + // Cancelled is a terminal state — the span shouldn't + // signal "still in progress" once it's been cancelled. + // Mirrors the sibling api.v1.runs.$runId.spans.$spanId.ts + // and syntheticTrace.server.ts logic. + isPartial: buffered.status !== "CANCELED", isCancelled: buffered.status === "CANCELED", level: "TRACE", queueName: buffered.queue ?? undefined, diff --git a/apps/webapp/app/routes/api.v1.runs.$runParam.attempts.ts b/apps/webapp/app/routes/api.v1.runs.$runParam.attempts.ts index 8668f0bc60b..d14246786ce 100644 --- a/apps/webapp/app/routes/api.v1.runs.$runParam.attempts.ts +++ b/apps/webapp/app/routes/api.v1.runs.$runParam.attempts.ts @@ -1,8 +1,10 @@ import type { ActionFunctionArgs, LoaderFunctionArgs } from "@remix-run/server-runtime"; import { json } from "@remix-run/server-runtime"; import { z } from "zod"; +import { $replica } from "~/db.server"; import { authenticateApiRequest } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; +import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server"; import { ServiceValidationError } from "~/v3/services/baseService.server"; import { CreateTaskRunAttemptService } from "~/v3/services/createTaskRunAttempt.server"; @@ -32,6 +34,31 @@ export async function loader({ request, params }: LoaderFunctionArgs) { return json({ error: "Invalid or missing run ID" }, { status: 400 }); } + const { runParam } = parsed.data; + const env = authenticationResult.environment; + + // Verify the run belongs to the authenticated environment before + // returning the parity-empty list. The response body is empty either + // way, but other run-scoped endpoints (spans, trace, retrieve) all + // 404 on cross-env access; matching that here means a third party + // can't distinguish "run exists" from "doesn't exist" via this + // endpoint either. PG-first then buffer fallback, consistent with + // the other read paths. + const pgRun = await $replica.taskRun.findFirst({ + where: { friendlyId: runParam, runtimeEnvironmentId: env.id }, + select: { id: true }, + }); + if (!pgRun) { + const buffered = await findRunByIdWithMollifierFallback({ + runId: runParam, + environmentId: env.id, + organizationId: env.organizationId, + }); + if (!buffered) { + return json({ error: "Run not found" }, { status: 404 }); + } + } + return json({ attempts: [] }, { status: 200 }); } diff --git a/apps/webapp/app/v3/mollifier/syntheticRedirectInfo.server.ts b/apps/webapp/app/v3/mollifier/syntheticRedirectInfo.server.ts index b1d6c425fd3..e316846d708 100644 --- a/apps/webapp/app/v3/mollifier/syntheticRedirectInfo.server.ts +++ b/apps/webapp/app/v3/mollifier/syntheticRedirectInfo.server.ts @@ -1,9 +1,16 @@ -import { deserialiseSnapshot, type MollifierBuffer } from "@trigger.dev/redis-worker"; +import type { MollifierBuffer } from "@trigger.dev/redis-worker"; import type { PrismaClientOrTransaction } from "@trigger.dev/database"; import { z } from "zod"; import { prisma } from "~/db.server"; import { logger } from "~/services/logger.server"; import { getMollifierBuffer } from "./mollifierBuffer.server"; +// Use the webapp-side wrapper (not `deserialiseSnapshot` from +// @trigger.dev/redis-worker directly) so this file shares a single +// deserialisation path with readFallback.server.ts. The two are +// behaviourally identical today (both wrap `JSON.parse`), but pinning +// the shared helper keeps the two read-side modules from drifting if +// snapshot encoding ever changes. +import { deserialiseMollifierSnapshot } from "./mollifierSnapshot.server"; // Validated subset of a mollifier snapshot — just the fields needed to // rebuild a canonical run-detail URL for a buffered run. Anything else @@ -78,7 +85,7 @@ export async function findBufferedRunRedirectInfo( let raw: unknown; try { - raw = deserialiseSnapshot(entry.payload); + raw = deserialiseMollifierSnapshot(entry.payload); } catch (err) { logger.warn("buffered redirect: snapshot deserialise failed", { runFriendlyId: args.runFriendlyId, From cf298e74d0d15e84fd70a600c256fa1270fa2804 Mon Sep 17 00:00:00 2001 From: Daniel Sutton Date: Wed, 27 May 2026 12:59:15 +0100 Subject: [PATCH 05/12] docs(server-changes): document mollifier API GET read-fallback Co-Authored-By: Claude Opus 4.7 (1M context) --- .server-changes/mollifier-reads.md | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .server-changes/mollifier-reads.md diff --git a/.server-changes/mollifier-reads.md b/.server-changes/mollifier-reads.md new file mode 100644 index 00000000000..bbb3aa83341 --- /dev/null +++ b/.server-changes/mollifier-reads.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: feature +--- + +Mollifier API read-fallback: serve buffered runs from synthetic run/trace/span data on the retrieve, trace, spans, and attempts endpoints. From 4ea0563bc8e2c0b8e0b36230ddff7aee06f753a7 Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Wed, 27 May 2026 17:05:45 +0100 Subject: [PATCH 06/12] fix(webapp): reflect terminal state in mollifier synthetic span/trace The synthetic SpanRun/trace builders for buffered runs hardcoded non-terminal state, so a CANCELED or FAILED buffered run rendered as a healthy in-progress run: - syntheticSpanRun: FAILED now maps to SYSTEM_FAILURE (matching ApiRetrieveRunPresenter.bufferedStatusToTaskRunStatus); isFinished is true for CANCELED/FAILED; isError is true for FAILED; the error block is synthesised as STRING_ERROR and statusReason carries the message. - syntheticSpanRun: drop the empty-string spanId/taskIdentifier relationship stubs (blank task name + misleading `?span=` jump) since the snapshot only carries friendly IDs. - syntheticTrace: FAILED now renders as an errored, non-partial, "failed" root span instead of executing/partial. CANCELED stays "completed", matching RunPresenter's derivation. - tests: cover the CANCELED and FAILED terminal paths. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../v3/mollifier/syntheticSpanRun.server.ts | 67 ++++++++++++------- .../app/v3/mollifier/syntheticTrace.server.ts | 16 ++++- .../test/mollifierSyntheticSpanRun.test.ts | 41 ++++++++++-- 3 files changed, 93 insertions(+), 31 deletions(-) diff --git a/apps/webapp/app/v3/mollifier/syntheticSpanRun.server.ts b/apps/webapp/app/v3/mollifier/syntheticSpanRun.server.ts index bfbcdae36da..bdc7873304f 100644 --- a/apps/webapp/app/v3/mollifier/syntheticSpanRun.server.ts +++ b/apps/webapp/app/v3/mollifier/syntheticSpanRun.server.ts @@ -20,9 +20,11 @@ function narrowMachinePreset(value: string | undefined): SpanRun["machinePreset" // Synthesise a SpanRun-shaped object from a buffered run so the run-detail // page's right-side details panel renders identically to a PG-resident -// run. The shape matches `SpanPresenter.getRun`'s return value exactly; -// buffered-irrelevant fields (output, error, attempts, schedule, session, -// region, batch) are filled with sensible defaults. +// run. The shape matches `SpanPresenter.getRun`'s return value; +// buffered-irrelevant fields (output, attempts, schedule, session, +// region, batch) are filled with sensible defaults, while terminal state +// (CANCELED / FAILED) is reflected into `status`, `isFinished`, `isError` +// and `error` so a finished buffered run does not render as PENDING. // // Pretty-printing for payload and metadata mirrors SpanPresenter so the // UI receives data in the same shape. Buffered runs cannot use the @@ -64,11 +66,36 @@ export async function buildSyntheticSpanRun(args: { const queueName = run.queue ?? "task/"; const isCancelled = run.status === "CANCELED"; + const isFailed = run.status === "FAILED"; + + // The run-detail panel derives terminal/error state from `status`, + // `isFinished` and `isError` (SpanPresenter.getRun -> isFinalRunStatus / + // isFailedRunStatus). Buffered FAILED runs surface as SYSTEM_FAILURE to + // match ApiRetrieveRunPresenter.bufferedStatusToTaskRunStatus; both + // CANCELED and SYSTEM_FAILURE are final run statuses, and SYSTEM_FAILURE + // is also a failed status. + const status: SpanRun["status"] = isCancelled + ? "CANCELED" + : isFailed + ? "SYSTEM_FAILURE" + : "PENDING"; + + // Mirror ApiRetrieveRunPresenter's STRING_ERROR synthesis so the panel + // shows why a buffered run failed instead of an empty error block. + const error: SpanRun["error"] = + isFailed && run.error + ? { type: "STRING_ERROR", raw: `${run.error.code}: ${run.error.message}` } + : undefined; + return { id: run.id, friendlyId: run.friendlyId, - status: isCancelled ? "CANCELED" : "PENDING", - statusReason: isCancelled ? run.cancelReason ?? undefined : undefined, + status, + statusReason: isCancelled + ? run.cancelReason ?? undefined + : isFailed + ? run.error?.message ?? undefined + : undefined, createdAt: run.createdAt, startedAt: null, executedAt: null, @@ -102,32 +129,24 @@ export async function buildSyntheticSpanRun(args: { costInCents: 0, totalCostInCents: 0, usageDurationMs: 0, - isFinished: false, + isFinished: isCancelled || isFailed, isRunning: false, - isError: false, + isError: isFailed, isAgentRun, payload, payloadType: run.payloadType ?? "application/json", output: undefined, outputType: "application/json", - error: undefined, + error, + // The snapshot only carries the root/parent friendly IDs, not the + // spanId or taskIdentifier that SpanPresenter sources from the joined + // PG rows. Emitting them with empty-string stubs renders a blank task + // name and a misleading `?span=` jump target, so we omit the + // relationships until the drainer materialises the row (a transient + // window). Top-level buffered runs have no relationships regardless. relationships: { - root: run.rootTaskRunFriendlyId - ? { - friendlyId: run.rootTaskRunFriendlyId, - spanId: "", - taskIdentifier: "", - createdAt: run.createdAt, - isParent: run.parentTaskRunFriendlyId === run.rootTaskRunFriendlyId, - } - : undefined, - parent: run.parentTaskRunFriendlyId - ? { - friendlyId: run.parentTaskRunFriendlyId, - spanId: "", - taskIdentifier: "", - } - : undefined, + root: undefined, + parent: undefined, }, context: JSON.stringify( { diff --git a/apps/webapp/app/v3/mollifier/syntheticTrace.server.ts b/apps/webapp/app/v3/mollifier/syntheticTrace.server.ts index acde2ccee9c..ee0d518e2e7 100644 --- a/apps/webapp/app/v3/mollifier/syntheticTrace.server.ts +++ b/apps/webapp/app/v3/mollifier/syntheticTrace.server.ts @@ -13,6 +13,7 @@ import type { SyntheticRun } from "./readFallback.server"; export function buildSyntheticTraceForBufferedRun(run: SyntheticRun) { const spanId = run.spanId ?? ""; const isCancelled = run.status === "CANCELED"; + const isFailed = run.status === "FAILED"; const span: SpanSummary = { id: spanId, parentId: run.parentSpanId, @@ -23,8 +24,11 @@ export function buildSyntheticTraceForBufferedRun(run: SyntheticRun) { events: [], startTime: run.createdAt, duration: 0, - isError: false, - isPartial: !isCancelled, + isError: isFailed, + // CANCELED and FAILED are terminal; only a still-queued buffered run + // is partial. A partial failed span would otherwise render as + // "executing" forever in the timeline. + isPartial: !isCancelled && !isFailed, isCancelled, isDebug: false, level: "TRACE", @@ -54,7 +58,13 @@ export function buildSyntheticTraceForBufferedRun(run: SyntheticRun) { : []; return { - rootSpanStatus: (isCancelled ? "completed" : "executing") as "executing" | "completed" | "failed", + // Matches RunPresenter's derivation: failed root span -> "failed", + // otherwise a terminal (non-partial) span -> "completed", else + // "executing". CANCELED is terminal-but-not-error, so "completed". + rootSpanStatus: (isFailed ? "failed" : isCancelled ? "completed" : "executing") as + | "executing" + | "completed" + | "failed", events, duration: totalDuration, rootStartedAt: tree?.data.startTime, diff --git a/apps/webapp/test/mollifierSyntheticSpanRun.test.ts b/apps/webapp/test/mollifierSyntheticSpanRun.test.ts index 68c3c4cfc48..72bc9f02788 100644 --- a/apps/webapp/test/mollifierSyntheticSpanRun.test.ts +++ b/apps/webapp/test/mollifierSyntheticSpanRun.test.ts @@ -129,7 +129,7 @@ describe("buildSyntheticSpanRun", () => { expect(noKey.idempotencyKeyStatus).toBeUndefined(); }); - it("fills relationship metadata from parent/root snapshot fields when present", async () => { + it("omits relationships even when parent/root friendlyIds are present, since the snapshot lacks their spanId/taskIdentifier", async () => { const synth = await buildSyntheticSpanRun({ run: makeSyntheticRun({ parentTaskRunFriendlyId: "run_parent", @@ -137,9 +137,8 @@ describe("buildSyntheticSpanRun", () => { }), environment: ENV, }); - expect(synth.relationships.parent?.friendlyId).toBe("run_parent"); - expect(synth.relationships.root?.friendlyId).toBe("run_root"); - expect(synth.relationships.root?.isParent).toBe(false); + expect(synth.relationships.parent).toBeUndefined(); + expect(synth.relationships.root).toBeUndefined(); }); it("returns no relationship objects when the snapshot has no parent/root", async () => { @@ -151,6 +150,40 @@ describe("buildSyntheticSpanRun", () => { expect(synth.relationships.root).toBeUndefined(); }); + it("reflects a buffered CANCELED run as a finished, cancelled terminal state", async () => { + const synth = await buildSyntheticSpanRun({ + run: makeSyntheticRun({ + status: "CANCELED", + cancelledAt: NOW, + cancelReason: "cancelled by user", + }), + environment: ENV, + }); + expect(synth.status).toBe("CANCELED"); + expect(synth.statusReason).toBe("cancelled by user"); + expect(synth.isFinished).toBe(true); + expect(synth.isError).toBe(false); + expect(synth.completedAt).toEqual(NOW); + }); + + it("reflects a buffered FAILED run as a finished, errored SYSTEM_FAILURE", async () => { + const synth = await buildSyntheticSpanRun({ + run: makeSyntheticRun({ + status: "FAILED", + error: { code: "GATE_REJECTED", message: "buffer rejected the run" }, + }), + environment: ENV, + }); + expect(synth.status).toBe("SYSTEM_FAILURE"); + expect(synth.isFinished).toBe(true); + expect(synth.isError).toBe(true); + expect(synth.statusReason).toBe("buffer rejected the run"); + expect(synth.error).toEqual({ + type: "STRING_ERROR", + raw: "GATE_REJECTED: buffer rejected the run", + }); + }); + it("flags the synthetic run as 'not cached' since cache lookup did not match it", async () => { const synth = await buildSyntheticSpanRun({ run: makeSyntheticRun(), environment: ENV }); expect(synth.isCached).toBe(false); From 6d057e5476a4596d5ef7194c83131aa6cf981f68 Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Wed, 27 May 2026 17:23:51 +0100 Subject: [PATCH 07/12] fix(webapp): mollifier read-fallback auth/retry parity + batch reconstruction MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses the higher-confidence read-fallback review findings: - attempts GET loader: rebuilt on createLoaderApiRoute so it matches the sibling read routes — accepts JWTs with run/task/tag/batch resource scoping (was bare authenticateApiRequest, rejecting PUBLIC_JWT and doing no scope check), and 404s with `x-should-retry: true` so SDK pollers keep retrying a not-yet-materialised run instead of giving up. - batch reconstruction: the snapshot embeds the batch as `{ id, index }` (engine.trigger shape), but readFallback read a non-existent flat `batchId`, so SyntheticRun.batchId was always undefined. Read it from `snapshot.batch.id` (the internal cuid). synthesiseFoundRunFromBuffer now populates `batch` from it, and the spans/trace buffer-path authorization pushes the batch resource — so batch-scoped JWTs authorise against buffered runs and the retrieve response reports the correct batchId. - metadata: coerce a non-string buffered metadata defensively (JSON stringify + warn) instead of silently dropping to null, mirroring synthesisePayload. In practice metadata is always a string, so this is a no-op guard, but it surfaces format drift to ops. - tests: cover batchId extraction from the nested batch object and its absence for non-batched runs. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../v3/ApiRetrieveRunPresenter.server.ts | 38 ++++++- .../api.v1.runs.$runId.spans.$spanId.ts | 3 + .../app/routes/api.v1.runs.$runId.trace.ts | 3 + .../routes/api.v1.runs.$runParam.attempts.ts | 107 ++++++++++++------ .../webapp/test/mollifierReadFallback.test.ts | 39 +++++++ 5 files changed, 152 insertions(+), 38 deletions(-) diff --git a/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts b/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts index 2361d2dc4e7..0a33dbe8fa6 100644 --- a/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts @@ -9,6 +9,7 @@ import { logger, } from "@trigger.dev/core/v3"; import { parsePacketAsJson } from "@trigger.dev/core/v3/utils/ioSerialization"; +import { BatchId } from "@trigger.dev/core/v3/isomorphic"; import { getUserProvidedIdempotencyKey } from "@trigger.dev/core/v3/serverOnly"; import { Prisma, TaskRunAttemptStatus, TaskRunStatus } from "@trigger.dev/database"; import assertNever from "assert-never"; @@ -560,6 +561,32 @@ function synthesisePayload(buffered: SyntheticRun): string { } } +// Mirror synthesisePayload for metadata. The PG path stores +// `TaskRun.metadata` as `String?`, and the snapshot writes it from +// `metadataPacket.data` (also a string), so in production it is always a +// string or absent. We coerce defensively — an object gets JSON-stringified +// (matching how the trigger path serialises it) rather than silently +// dropped to null, and the log line surfaces format drift to ops. +function synthesiseMetadata(buffered: SyntheticRun): string | null { + const metadata = buffered.metadata; + if (typeof metadata === "string") return metadata; + if (metadata === undefined || metadata === null) return null; + try { + const serialised = JSON.stringify(metadata); + logger.warn("ApiRetrieveRunPresenter: buffered snapshot.metadata non-string coerced", { + runFriendlyId: buffered.friendlyId, + metadataType: typeof metadata, + }); + return typeof serialised === "string" ? serialised : null; + } catch { + logger.error("ApiRetrieveRunPresenter: buffered snapshot.metadata unserialisable", { + runFriendlyId: buffered.friendlyId, + metadataType: typeof metadata, + }); + return null; + } +} + function synthesiseFoundRunFromBuffer(buffered: SyntheticRun): FoundRun { const status: TaskRunStatus = bufferedStatusToTaskRunStatus(buffered.status); @@ -570,8 +597,7 @@ function synthesiseFoundRunFromBuffer(buffered: SyntheticRun): FoundRun { } : null; - const metadata: Prisma.JsonValue = - typeof buffered.metadata === "string" ? buffered.metadata : null; + const metadata: string | null = synthesiseMetadata(buffered); return { // `id` is the internal cuid (Prisma TaskRun.id column), `friendlyId` @@ -603,7 +629,13 @@ function synthesiseFoundRunFromBuffer(buffered: SyntheticRun): FoundRun { scheduleId: null, lockedToVersion: buffered.lockedToVersion ? { version: buffered.lockedToVersion } : null, resumeParentOnCompletion: buffered.resumeParentOnCompletion, - batch: null, + // Reconstruct the batch from the snapshot's internal id so a buffered + // run reports the same `batchId` / triggerFunction as it will once + // materialised, and so batch-scoped JWTs authorise against it (the + // route authorization callbacks read `run.batch?.friendlyId`). + batch: buffered.batchId + ? { id: buffered.batchId, friendlyId: BatchId.toFriendlyId(buffered.batchId) } + : null, runTags: buffered.tags, traceId: buffered.traceId ?? "", payload: synthesisePayload(buffered), diff --git a/apps/webapp/app/routes/api.v1.runs.$runId.spans.$spanId.ts b/apps/webapp/app/routes/api.v1.runs.$runId.spans.$spanId.ts index cc48faf5d85..1c6bbbe560b 100644 --- a/apps/webapp/app/routes/api.v1.runs.$runId.spans.$spanId.ts +++ b/apps/webapp/app/routes/api.v1.runs.$runId.spans.$spanId.ts @@ -72,6 +72,9 @@ export const loader = createLoaderApiRoute( ...(run.taskIdentifier ? [{ type: "tasks", id: run.taskIdentifier }] : []), ...run.tags.map((tag) => ({ type: "tags", id: tag })), ]; + if (run.batchId) { + resources.push({ type: "batch", id: BatchId.toFriendlyId(run.batchId) }); + } return anyResource(resources); }, }, diff --git a/apps/webapp/app/routes/api.v1.runs.$runId.trace.ts b/apps/webapp/app/routes/api.v1.runs.$runId.trace.ts index b87bae396cb..2c026fd1fe6 100644 --- a/apps/webapp/app/routes/api.v1.runs.$runId.trace.ts +++ b/apps/webapp/app/routes/api.v1.runs.$runId.trace.ts @@ -70,6 +70,9 @@ export const loader = createLoaderApiRoute( ...(run.taskIdentifier ? [{ type: "tasks", id: run.taskIdentifier }] : []), ...run.tags.map((tag) => ({ type: "tags", id: tag })), ]; + if (run.batchId) { + resources.push({ type: "batch", id: BatchId.toFriendlyId(run.batchId) }); + } return anyResource(resources); }, }, diff --git a/apps/webapp/app/routes/api.v1.runs.$runParam.attempts.ts b/apps/webapp/app/routes/api.v1.runs.$runParam.attempts.ts index d14246786ce..5d0b2ddfa34 100644 --- a/apps/webapp/app/routes/api.v1.runs.$runParam.attempts.ts +++ b/apps/webapp/app/routes/api.v1.runs.$runParam.attempts.ts @@ -1,9 +1,14 @@ -import type { ActionFunctionArgs, LoaderFunctionArgs } from "@remix-run/server-runtime"; +import type { ActionFunctionArgs } from "@remix-run/server-runtime"; import { json } from "@remix-run/server-runtime"; +import { BatchId } from "@trigger.dev/core/v3/isomorphic"; import { z } from "zod"; import { $replica } from "~/db.server"; import { authenticateApiRequest } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; +import { + anyResource, + createLoaderApiRoute, +} from "~/services/routeBuilders/apiBuilder.server"; import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server"; import { ServiceValidationError } from "~/v3/services/baseService.server"; import { CreateTaskRunAttemptService } from "~/v3/services/createTaskRunAttempt.server"; @@ -23,44 +28,76 @@ const ParamsSchema = z.object({ // attempt list belongs on the v3 retrieve endpoint, not here — this is // the dual of the POST that creates attempts, and the empty-list shape // gives the parity script a stable contract to assert against. -export async function loader({ request, params }: LoaderFunctionArgs) { - const authenticationResult = await authenticateApiRequest(request); - if (!authenticationResult) { - return json({ error: "Invalid or Missing API Key" }, { status: 401 }); - } +// +// Built with createLoaderApiRoute so it matches the sibling read routes +// (spans, trace, retrieve): it accepts JWTs (`allowJWT`) with the same +// run/task/tag/batch resource scoping, and a not-found run returns 404 +// with `x-should-retry: true` (`shouldRetryNotFound`) so SDK pollers keep +// retrying a run that the drainer hasn't materialised yet. PG-first then +// buffer fallback, so a third party can't distinguish "exists" from +// "doesn't exist" cross-environment. +type ResolvedRun = + | { source: "pg"; run: NonNullable>> } + | { source: "buffer"; run: NonNullable>> }; - const parsed = ParamsSchema.safeParse(params); - if (!parsed.success) { - return json({ error: "Invalid or missing run ID" }, { status: 400 }); - } +async function findPgRun(runId: string, environmentId: string) { + return $replica.taskRun.findFirst({ + where: { friendlyId: runId, runtimeEnvironmentId: environmentId }, + select: { friendlyId: true, taskIdentifier: true, runTags: true, batchId: true }, + }); +} - const { runParam } = parsed.data; - const env = authenticationResult.environment; +export const loader = createLoaderApiRoute( + { + params: ParamsSchema, + allowJWT: true, + corsStrategy: "all", + findResource: async (params, auth): Promise => { + const pgRun = await findPgRun(params.runParam, auth.environment.id); + if (pgRun) return { source: "pg", run: pgRun }; - // Verify the run belongs to the authenticated environment before - // returning the parity-empty list. The response body is empty either - // way, but other run-scoped endpoints (spans, trace, retrieve) all - // 404 on cross-env access; matching that here means a third party - // can't distinguish "run exists" from "doesn't exist" via this - // endpoint either. PG-first then buffer fallback, consistent with - // the other read paths. - const pgRun = await $replica.taskRun.findFirst({ - where: { friendlyId: runParam, runtimeEnvironmentId: env.id }, - select: { id: true }, - }); - if (!pgRun) { - const buffered = await findRunByIdWithMollifierFallback({ - runId: runParam, - environmentId: env.id, - organizationId: env.organizationId, - }); - if (!buffered) { - return json({ error: "Run not found" }, { status: 404 }); - } - } + const buffered = await findRunByIdWithMollifierFallback({ + runId: params.runParam, + environmentId: auth.environment.id, + organizationId: auth.environment.organizationId, + }); + if (buffered) return { source: "buffer", run: buffered }; - return json({ attempts: [] }, { status: 200 }); -} + return null; + }, + shouldRetryNotFound: true, + authorization: { + action: "read", + resource: (resolved) => { + if (resolved.source === "pg") { + const run = resolved.run; + const resources = [ + { type: "runs", id: run.friendlyId }, + { type: "tasks", id: run.taskIdentifier }, + ...run.runTags.map((tag) => ({ type: "tags", id: tag })), + ]; + if (run.batchId) { + resources.push({ type: "batch", id: BatchId.toFriendlyId(run.batchId) }); + } + return anyResource(resources); + } + const run = resolved.run; + const resources = [ + { type: "runs", id: run.friendlyId }, + ...(run.taskIdentifier ? [{ type: "tasks", id: run.taskIdentifier }] : []), + ...run.tags.map((tag) => ({ type: "tags", id: tag })), + ]; + if (run.batchId) { + resources.push({ type: "batch", id: BatchId.toFriendlyId(run.batchId) }); + } + return anyResource(resources); + }, + }, + }, + async () => { + return json({ attempts: [] }, { status: 200 }); + } +); export async function action({ request, params }: ActionFunctionArgs) { // Authenticate the request diff --git a/apps/webapp/test/mollifierReadFallback.test.ts b/apps/webapp/test/mollifierReadFallback.test.ts index 1db167eea61..fcee3fb95fc 100644 --- a/apps/webapp/test/mollifierReadFallback.test.ts +++ b/apps/webapp/test/mollifierReadFallback.test.ts @@ -260,6 +260,45 @@ describe("findRunByIdWithMollifierFallback", () => { expect(result!.runTags).toEqual(["t1", "t2"]); }); + it("extracts batchId from the snapshot's nested batch object (engine.trigger shape)", async () => { + const entry: BufferEntry = { + runId: "run_1", + envId: "env_a", + orgId: "org_1", + payload: JSON.stringify({ + taskIdentifier: "t", + // The engine.trigger input nests the batch as `{ id, index }`, + // where `id` is the batch's internal cuid (not a flat `batchId`). + batch: { id: "batch_internal_cuid", index: 3 }, + }), + status: "QUEUED", + attempts: 0, + createdAt: NOW, + }; + const result = await findRunByIdWithMollifierFallback( + { runId: "run_1", environmentId: "env_a", organizationId: "org_1" }, + { getBuffer: () => fakeBuffer(entry) }, + ); + expect(result!.batchId).toBe("batch_internal_cuid"); + }); + + it("leaves batchId undefined when the snapshot has no batch (non-batched run)", async () => { + const entry: BufferEntry = { + runId: "run_1", + envId: "env_a", + orgId: "org_1", + payload: JSON.stringify({ taskIdentifier: "t" }), + status: "QUEUED", + attempts: 0, + createdAt: NOW, + }; + const result = await findRunByIdWithMollifierFallback( + { runId: "run_1", environmentId: "env_a", organizationId: "org_1" }, + { getBuffer: () => fakeBuffer(entry) }, + ); + expect(result!.batchId).toBeUndefined(); + }); + it("treats invalid date strings as undefined and does not mis-classify status as CANCELED", async () => { const entry: BufferEntry = { runId: "run_1", From 0f96cb32d378d0cee5289c0928101484042bbbba Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Thu, 28 May 2026 11:05:29 +0100 Subject: [PATCH 08/12] chore(webapp): drop internal phase labels from mollifier route comments Comments referenced internal phase labels ("Phase A2", "Phase A5") from the development plan rather than describing what the code does. Replaced with self-contained prose; the surrounding explanations were already correct and are preserved. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../app/routes/api.v1.runs.$runId.spans.$spanId.ts | 12 ++++++------ .../app/routes/api.v1.runs.$runParam.attempts.ts | 10 +++++----- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/apps/webapp/app/routes/api.v1.runs.$runId.spans.$spanId.ts b/apps/webapp/app/routes/api.v1.runs.$runId.spans.$spanId.ts index 1c6bbbe560b..de05db02464 100644 --- a/apps/webapp/app/routes/api.v1.runs.$runId.spans.$spanId.ts +++ b/apps/webapp/app/routes/api.v1.runs.$runId.spans.$spanId.ts @@ -16,12 +16,12 @@ const ParamsSchema = z.object({ spanId: z.string(), }); -// Phase A2 — discriminated union for PG vs buffered runs. Buffered runs -// only have one valid spanId (the queued span recorded at gate time and -// reused as the run's root spanId when the drainer materialises). Any -// other spanId returns a deterministic 404; the queued span returns a -// minimal synthesised shape so the customer's SDK sees the same 200 -// contract they'd get for a freshly-triggered run. +// Resolve the run from either Postgres or the mollifier buffer. +// Buffered runs only have one valid spanId (the queued span recorded at +// gate time and reused as the run's root spanId when the drainer +// materialises). Any other spanId returns a deterministic 404; the queued +// span returns a minimal synthesised shape so the customer's SDK sees the +// same 200 contract they'd get for a freshly-triggered run. type ResolvedRun = | { source: "pg"; run: Awaited> & {} } | { source: "buffer"; run: NonNullable>> }; diff --git a/apps/webapp/app/routes/api.v1.runs.$runParam.attempts.ts b/apps/webapp/app/routes/api.v1.runs.$runParam.attempts.ts index 5d0b2ddfa34..6bf3388f890 100644 --- a/apps/webapp/app/routes/api.v1.runs.$runParam.attempts.ts +++ b/apps/webapp/app/routes/api.v1.runs.$runParam.attempts.ts @@ -18,11 +18,11 @@ const ParamsSchema = z.object({ runParam: z.string(), }); -// Phase A5 — fixes the pre-existing route bug where GET on this URL -// returned a Remix "no loader" 400 with an internal error message. The -// route only exposed `action` (POST creates a new attempt); GET had no -// handler, so any well-intentioned SDK probe hit the framework error -// instead of a proper API response. +// GET handler added to fix the pre-existing route bug where this URL +// returned a Remix "no loader" 400 with an internal error message — only +// `action` (POST creates a new attempt) was exported, so any +// well-intentioned SDK probe hit the framework error instead of a proper +// API response. // // Returns `{ attempts: [] }` for both PG and buffered runs. The detailed // attempt list belongs on the v3 retrieve endpoint, not here — this is From c67ce6510461950ff09d58638b00689d7e73d814 Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Thu, 28 May 2026 11:42:28 +0100 Subject: [PATCH 09/12] test(webapp): cover buildSyntheticTraceForBufferedRun terminal-state derivation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit syntheticTrace.server.ts shipped without a test file; this adds one, covering the identity-field passthrough, taskIdentifier-and-spanId defaults, the three rootSpanStatus branches (QUEUED→executing, CANCELED→completed, FAILED→failed) with their isPartial/isError/isCancelled flags, the 1ms duration floor, rootStartedAt mapping, and the single-span trace shape (empty events/timelineEvents, empty linkedRunIdBySpanId, undefined overridesBySpanId/queuedDuration). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../test/mollifierSyntheticTrace.test.ts | 149 ++++++++++++++++++ 1 file changed, 149 insertions(+) create mode 100644 apps/webapp/test/mollifierSyntheticTrace.test.ts diff --git a/apps/webapp/test/mollifierSyntheticTrace.test.ts b/apps/webapp/test/mollifierSyntheticTrace.test.ts new file mode 100644 index 00000000000..ac7425a8fe9 --- /dev/null +++ b/apps/webapp/test/mollifierSyntheticTrace.test.ts @@ -0,0 +1,149 @@ +import { describe, expect, it, vi } from "vitest"; + +vi.mock("~/db.server", () => ({ prisma: {}, $replica: {} })); + +import { buildSyntheticTraceForBufferedRun } from "~/v3/mollifier/syntheticTrace.server"; +import type { SyntheticRun } from "~/v3/mollifier/readFallback.server"; + +const NOW = new Date("2026-05-22T10:00:00Z"); +const ONE_MS_IN_NS = 1_000_000; + +function makeSyntheticRun(overrides: Partial = {}): SyntheticRun { + return { + id: "run_internal_1", + friendlyId: "run_friendly_1", + status: "QUEUED", + cancelledAt: undefined, + cancelReason: undefined, + delayUntil: undefined, + taskIdentifier: "hello-world", + createdAt: NOW, + payload: undefined, + payloadType: undefined, + metadata: undefined, + metadataType: undefined, + seedMetadata: undefined, + seedMetadataType: undefined, + idempotencyKey: undefined, + idempotencyKeyOptions: undefined, + isTest: false, + depth: 0, + ttl: undefined, + tags: [], + runTags: [], + lockedToVersion: undefined, + resumeParentOnCompletion: false, + parentTaskRunId: undefined, + traceId: "trace_1", + spanId: "span_1", + parentSpanId: undefined, + runtimeEnvironmentId: "env_a", + engine: "V2", + workerQueue: undefined, + queue: undefined, + concurrencyKey: undefined, + machinePreset: undefined, + realtimeStreamsVersion: undefined, + maxAttempts: undefined, + maxDurationInSeconds: undefined, + replayedFromTaskRunFriendlyId: undefined, + annotations: undefined, + traceContext: undefined, + scheduleId: undefined, + batchId: undefined, + parentTaskRunFriendlyId: undefined, + rootTaskRunFriendlyId: undefined, + ...overrides, + }; +} + +describe("buildSyntheticTraceForBufferedRun", () => { + it("populates the synthesised root span from snapshot identity fields", () => { + const trace = buildSyntheticTraceForBufferedRun(makeSyntheticRun()); + expect(trace.events).toHaveLength(1); + const root = trace.events[0]; + expect(root.id).toBe("span_1"); + expect(root.data.message).toBe("hello-world"); + expect(root.data.startTime).toEqual(NOW); + expect(root.data.isRoot).toBe(true); + expect(root.data.offset).toBe(0); + expect(root.data.level).toBe("TRACE"); + }); + + it("defaults the span message to 'Task' when the snapshot has no taskIdentifier", () => { + const trace = buildSyntheticTraceForBufferedRun( + makeSyntheticRun({ taskIdentifier: undefined }) + ); + expect(trace.events[0].data.message).toBe("Task"); + }); + + it("falls back to an empty-string span id when the snapshot has no spanId", () => { + const trace = buildSyntheticTraceForBufferedRun( + makeSyntheticRun({ spanId: undefined }) + ); + expect(trace.events[0].id).toBe(""); + // Empty id still marks as root (it matches the rootId fallback). + expect(trace.events[0].data.isRoot).toBe(true); + }); + + it("renders a QUEUED buffered run as an executing, partial root span", () => { + const trace = buildSyntheticTraceForBufferedRun(makeSyntheticRun({ status: "QUEUED" })); + expect(trace.rootSpanStatus).toBe("executing"); + expect(trace.events[0].data.isPartial).toBe(true); + expect(trace.events[0].data.isError).toBe(false); + expect(trace.events[0].data.isCancelled).toBe(false); + // A partial span exposes duration as null (the timeline reads it as + // "still running"); see syntheticTrace.server.ts duration mapping. + expect(trace.events[0].data.duration).toBeNull(); + }); + + it("renders a CANCELED buffered run as a completed, non-partial cancelled span", () => { + const trace = buildSyntheticTraceForBufferedRun( + makeSyntheticRun({ status: "CANCELED", cancelledAt: NOW }) + ); + expect(trace.rootSpanStatus).toBe("completed"); + expect(trace.events[0].data.isPartial).toBe(false); + expect(trace.events[0].data.isCancelled).toBe(true); + expect(trace.events[0].data.isError).toBe(false); + // Non-partial: duration is the span's numeric value (0 here), not null. + expect(trace.events[0].data.duration).toBe(0); + }); + + it("renders a FAILED buffered run as a failed, non-partial errored span", () => { + const trace = buildSyntheticTraceForBufferedRun( + makeSyntheticRun({ + status: "FAILED", + error: { code: "GATE_REJECTED", message: "buffer rejected the run" }, + }) + ); + expect(trace.rootSpanStatus).toBe("failed"); + expect(trace.events[0].data.isPartial).toBe(false); + expect(trace.events[0].data.isError).toBe(true); + expect(trace.events[0].data.isCancelled).toBe(false); + expect(trace.events[0].data.duration).toBe(0); + }); + + it("floors the trace duration to a minimum of 1ms (in nanoseconds) so the timeline has a positive extent", () => { + const trace = buildSyntheticTraceForBufferedRun(makeSyntheticRun()); + expect(trace.duration).toBe(ONE_MS_IN_NS); + }); + + it("reports the buffered createdAt as the trace's rootStartedAt and leaves startedAt null", () => { + const trace = buildSyntheticTraceForBufferedRun(makeSyntheticRun()); + expect(trace.rootStartedAt).toEqual(NOW); + expect(trace.startedAt).toBeNull(); + }); + + it("returns no link or override metadata (buffered traces are single-span)", () => { + const trace = buildSyntheticTraceForBufferedRun(makeSyntheticRun()); + expect(trace.linkedRunIdBySpanId).toEqual({}); + expect(trace.overridesBySpanId).toBeUndefined(); + expect(trace.queuedDuration).toBeUndefined(); + }); + + it("synthesises an empty events list (no timeline events from the buffer)", () => { + const trace = buildSyntheticTraceForBufferedRun(makeSyntheticRun()); + expect(trace.events[0].data.events).toEqual([]); + expect(trace.events[0].data.timelineEvents).toEqual([]); + }); +}); From b46520b0e3f6c717d82f41d0c54e6e3a7e53529a Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Thu, 28 May 2026 12:27:13 +0100 Subject: [PATCH 10/12] fix(webapp): treat FAILED buffered runs as terminal in spans/trace routes + align workerQueue default MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses three Devin review findings on PR #3755: - api.v1.runs.\$runId.spans.\$spanId.ts and api.v1.runs.\$runId.trace.ts: the buffered-run response branch hardcoded isError:false and only checked CANCELED for isPartial, so a FAILED buffered run rendered as "still in progress" — SDK consumers would poll forever. Now derives both flags from CANCELED and FAILED, matching syntheticTrace.server.ts. - ApiRetrieveRunPresenter.synthesiseFoundRunFromBuffer: workerQueue defaulted to "main" while syntheticSpanRun.server.ts uses "". The API response's `region` is sourced via `run.workerQueue || undefined`, so "main" was advertising a region the run hadn't yet been assigned to. Aligned to "" so unassigned buffered runs coerce to region: undefined. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../presenters/v3/ApiRetrieveRunPresenter.server.ts | 6 +++++- .../app/routes/api.v1.runs.$runId.spans.$spanId.ts | 11 ++++++++--- apps/webapp/app/routes/api.v1.runs.$runId.trace.ts | 13 +++++++------ 3 files changed, 20 insertions(+), 10 deletions(-) diff --git a/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts b/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts index 0a33dbe8fa6..5e41d170e76 100644 --- a/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts @@ -647,7 +647,11 @@ function synthesiseFoundRunFromBuffer(buffered: SyntheticRun): FoundRun { attemptNumber: null, engine: "V2", taskEventStore: "taskEvent", - workerQueue: buffered.workerQueue ?? "main", + // Empty string when absent (matches syntheticSpanRun.server.ts and lets + // `createCommonRunStructure`'s `run.workerQueue || undefined` coerce the + // API response's `region` to undefined instead of advertising a + // misleading "main" region for a not-yet-assigned buffered run). + workerQueue: buffered.workerQueue ?? "", parentTaskRun: null, rootTaskRun: null, childRuns: [], diff --git a/apps/webapp/app/routes/api.v1.runs.$runId.spans.$spanId.ts b/apps/webapp/app/routes/api.v1.runs.$runId.spans.$spanId.ts index de05db02464..78fd12325c5 100644 --- a/apps/webapp/app/routes/api.v1.runs.$runId.spans.$spanId.ts +++ b/apps/webapp/app/routes/api.v1.runs.$runId.spans.$spanId.ts @@ -89,15 +89,20 @@ export const loader = createLoaderApiRoute( if (resolved.run.spanId !== params.spanId) { return json({ error: "Span not found" }, { status: 404 }); } + // CANCELED and FAILED are terminal states. A FAILED buffered run is + // errored (drainer exhausted retries or gate rejected it) and must + // not signal "still in progress" — mirrors syntheticTrace.server.ts. + const isCancelled = resolved.run.status === "CANCELED"; + const isFailed = resolved.run.status === "FAILED"; return json( { spanId: resolved.run.spanId, parentId: resolved.run.parentSpanId ?? null, runId: resolved.run.friendlyId, message: resolved.run.taskIdentifier ?? "", - isError: false, - isPartial: resolved.run.status !== "CANCELED", - isCancelled: resolved.run.status === "CANCELED", + isError: isFailed, + isPartial: !isCancelled && !isFailed, + isCancelled, level: "TRACE", startTime: resolved.run.createdAt, durationMs: 0, diff --git a/apps/webapp/app/routes/api.v1.runs.$runId.trace.ts b/apps/webapp/app/routes/api.v1.runs.$runId.trace.ts index 2c026fd1fe6..e71481d2c59 100644 --- a/apps/webapp/app/routes/api.v1.runs.$runId.trace.ts +++ b/apps/webapp/app/routes/api.v1.runs.$runId.trace.ts @@ -97,12 +97,13 @@ export const loader = createLoaderApiRoute( events: [], startTime: buffered.createdAt, duration: 0, - isError: false, - // Cancelled is a terminal state — the span shouldn't - // signal "still in progress" once it's been cancelled. - // Mirrors the sibling api.v1.runs.$runId.spans.$spanId.ts - // and syntheticTrace.server.ts logic. - isPartial: buffered.status !== "CANCELED", + isError: buffered.status === "FAILED", + // CANCELED and FAILED are terminal states — the span + // shouldn't signal "still in progress" once the run has + // reached either. Mirrors the sibling + // api.v1.runs.$runId.spans.$spanId.ts and + // syntheticTrace.server.ts logic. + isPartial: buffered.status !== "CANCELED" && buffered.status !== "FAILED", isCancelled: buffered.status === "CANCELED", level: "TRACE", queueName: buffered.queue ?? undefined, From 7548950812805a8b7f657700ac02c246fda78b3d Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Thu, 28 May 2026 12:38:46 +0100 Subject: [PATCH 11/12] refactor(webapp): extract synthetic buffered-run response builders + drop scaffolding GET on attempts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two related cleanups on the mollifier read surface: 1. Extract the buffered-run response bodies for the spans-detail and trace endpoints into pure helpers (apps/webapp/app/v3/mollifier/syntheticApiResponses.server.ts: buildSyntheticSpanDetailBody, buildSyntheticTraceBody). The route bodies were carrying the only copy of the terminal-state derivation (CANCELED / FAILED → isError / isPartial / isCancelled) with no unit coverage; extracting them lets us pin the contract directly. The route files now just authenticate, resolve, validate the spanId, and forward — no body shape logic in routes. 2. Drop the GET loader on api.v1.runs.\$runParam.attempts.ts. It was added in this PR solely to fix a pre-existing Remix "no loader" 400 on a URL no SDK consumer was actually calling, and to give the mollifier-parity script a stable assertion target. The detailed attempt list lives on the v3 retrieve endpoint — the GET was scaffolding rather than product surface, and Devin's review flagged it as such. Reverted to action-only. Tests: 16 cases in apps/webapp/test/mollifierSyntheticApiResponses.test.ts covering QUEUED / CANCELED / FAILED for each body, plus identity and default-field passthrough. Pins the FAILED-terminal-state regression that shipped briefly with isPartial:true. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../api.v1.runs.$runId.spans.$spanId.ts | 22 +-- .../app/routes/api.v1.runs.$runId.trace.ts | 39 +---- .../routes/api.v1.runs.$runParam.attempts.ts | 88 ---------- .../mollifier/syntheticApiResponses.server.ts | 73 ++++++++ .../mollifierSyntheticApiResponses.test.ts | 164 ++++++++++++++++++ 5 files changed, 244 insertions(+), 142 deletions(-) create mode 100644 apps/webapp/app/v3/mollifier/syntheticApiResponses.server.ts create mode 100644 apps/webapp/test/mollifierSyntheticApiResponses.test.ts diff --git a/apps/webapp/app/routes/api.v1.runs.$runId.spans.$spanId.ts b/apps/webapp/app/routes/api.v1.runs.$runId.spans.$spanId.ts index 78fd12325c5..a5250e5b850 100644 --- a/apps/webapp/app/routes/api.v1.runs.$runId.spans.$spanId.ts +++ b/apps/webapp/app/routes/api.v1.runs.$runId.spans.$spanId.ts @@ -10,6 +10,7 @@ import { import { getEventRepositoryForStore } from "~/v3/eventRepository/index.server"; import { getTaskEventStoreTableForRun } from "~/v3/taskEventStore.server"; import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server"; +import { buildSyntheticSpanDetailBody } from "~/v3/mollifier/syntheticApiResponses.server"; const ParamsSchema = z.object({ runId: z.string(), @@ -89,26 +90,7 @@ export const loader = createLoaderApiRoute( if (resolved.run.spanId !== params.spanId) { return json({ error: "Span not found" }, { status: 404 }); } - // CANCELED and FAILED are terminal states. A FAILED buffered run is - // errored (drainer exhausted retries or gate rejected it) and must - // not signal "still in progress" — mirrors syntheticTrace.server.ts. - const isCancelled = resolved.run.status === "CANCELED"; - const isFailed = resolved.run.status === "FAILED"; - return json( - { - spanId: resolved.run.spanId, - parentId: resolved.run.parentSpanId ?? null, - runId: resolved.run.friendlyId, - message: resolved.run.taskIdentifier ?? "", - isError: isFailed, - isPartial: !isCancelled && !isFailed, - isCancelled, - level: "TRACE", - startTime: resolved.run.createdAt, - durationMs: 0, - }, - { status: 200 } - ); + return json(buildSyntheticSpanDetailBody(resolved.run), { status: 200 }); } const run = resolved.run; diff --git a/apps/webapp/app/routes/api.v1.runs.$runId.trace.ts b/apps/webapp/app/routes/api.v1.runs.$runId.trace.ts index e71481d2c59..04ae398194f 100644 --- a/apps/webapp/app/routes/api.v1.runs.$runId.trace.ts +++ b/apps/webapp/app/routes/api.v1.runs.$runId.trace.ts @@ -9,6 +9,7 @@ import { import { getEventRepositoryForStore } from "~/v3/eventRepository/index.server"; import { getTaskEventStoreTableForRun } from "~/v3/taskEventStore.server"; import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server"; +import { buildSyntheticTraceBody } from "~/v3/mollifier/syntheticApiResponses.server"; const ParamsSchema = z.object({ runId: z.string(), // This is the run friendly ID @@ -81,40 +82,10 @@ export const loader = createLoaderApiRoute( if (resolved.source === "buffer") { // Buffered runs have no events ingested yet — the drainer hasn't // materialised the PG row and the worker hasn't started executing. - // Synthesise a single partial span that satisfies the SDK's - // RetrieveRunTraceResponseBody schema (rootSpan is non-nullable). - const buffered = resolved.run; - return json( - { - trace: { - traceId: buffered.traceId ?? "", - rootSpan: { - id: buffered.spanId ?? "", - runId: buffered.friendlyId, - data: { - message: buffered.taskIdentifier ?? "", - taskSlug: buffered.taskIdentifier ?? undefined, - events: [], - startTime: buffered.createdAt, - duration: 0, - isError: buffered.status === "FAILED", - // CANCELED and FAILED are terminal states — the span - // shouldn't signal "still in progress" once the run has - // reached either. Mirrors the sibling - // api.v1.runs.$runId.spans.$spanId.ts and - // syntheticTrace.server.ts logic. - isPartial: buffered.status !== "CANCELED" && buffered.status !== "FAILED", - isCancelled: buffered.status === "CANCELED", - level: "TRACE", - queueName: buffered.queue ?? undefined, - machinePreset: buffered.machinePreset ?? undefined, - }, - children: [], - }, - }, - }, - { status: 200 } - ); + // The helper synthesises a single root span that satisfies the SDK's + // RetrieveRunTraceResponseBody schema (rootSpan is non-nullable) and + // reflects the buffered terminal state. + return json(buildSyntheticTraceBody(resolved.run), { status: 200 }); } const run = resolved.run; diff --git a/apps/webapp/app/routes/api.v1.runs.$runParam.attempts.ts b/apps/webapp/app/routes/api.v1.runs.$runParam.attempts.ts index 6bf3388f890..790e52bee4e 100644 --- a/apps/webapp/app/routes/api.v1.runs.$runParam.attempts.ts +++ b/apps/webapp/app/routes/api.v1.runs.$runParam.attempts.ts @@ -1,15 +1,8 @@ import type { ActionFunctionArgs } from "@remix-run/server-runtime"; import { json } from "@remix-run/server-runtime"; -import { BatchId } from "@trigger.dev/core/v3/isomorphic"; import { z } from "zod"; -import { $replica } from "~/db.server"; import { authenticateApiRequest } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; -import { - anyResource, - createLoaderApiRoute, -} from "~/services/routeBuilders/apiBuilder.server"; -import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server"; import { ServiceValidationError } from "~/v3/services/baseService.server"; import { CreateTaskRunAttemptService } from "~/v3/services/createTaskRunAttempt.server"; @@ -18,87 +11,6 @@ const ParamsSchema = z.object({ runParam: z.string(), }); -// GET handler added to fix the pre-existing route bug where this URL -// returned a Remix "no loader" 400 with an internal error message — only -// `action` (POST creates a new attempt) was exported, so any -// well-intentioned SDK probe hit the framework error instead of a proper -// API response. -// -// Returns `{ attempts: [] }` for both PG and buffered runs. The detailed -// attempt list belongs on the v3 retrieve endpoint, not here — this is -// the dual of the POST that creates attempts, and the empty-list shape -// gives the parity script a stable contract to assert against. -// -// Built with createLoaderApiRoute so it matches the sibling read routes -// (spans, trace, retrieve): it accepts JWTs (`allowJWT`) with the same -// run/task/tag/batch resource scoping, and a not-found run returns 404 -// with `x-should-retry: true` (`shouldRetryNotFound`) so SDK pollers keep -// retrying a run that the drainer hasn't materialised yet. PG-first then -// buffer fallback, so a third party can't distinguish "exists" from -// "doesn't exist" cross-environment. -type ResolvedRun = - | { source: "pg"; run: NonNullable>> } - | { source: "buffer"; run: NonNullable>> }; - -async function findPgRun(runId: string, environmentId: string) { - return $replica.taskRun.findFirst({ - where: { friendlyId: runId, runtimeEnvironmentId: environmentId }, - select: { friendlyId: true, taskIdentifier: true, runTags: true, batchId: true }, - }); -} - -export const loader = createLoaderApiRoute( - { - params: ParamsSchema, - allowJWT: true, - corsStrategy: "all", - findResource: async (params, auth): Promise => { - const pgRun = await findPgRun(params.runParam, auth.environment.id); - if (pgRun) return { source: "pg", run: pgRun }; - - const buffered = await findRunByIdWithMollifierFallback({ - runId: params.runParam, - environmentId: auth.environment.id, - organizationId: auth.environment.organizationId, - }); - if (buffered) return { source: "buffer", run: buffered }; - - return null; - }, - shouldRetryNotFound: true, - authorization: { - action: "read", - resource: (resolved) => { - if (resolved.source === "pg") { - const run = resolved.run; - const resources = [ - { type: "runs", id: run.friendlyId }, - { type: "tasks", id: run.taskIdentifier }, - ...run.runTags.map((tag) => ({ type: "tags", id: tag })), - ]; - if (run.batchId) { - resources.push({ type: "batch", id: BatchId.toFriendlyId(run.batchId) }); - } - return anyResource(resources); - } - const run = resolved.run; - const resources = [ - { type: "runs", id: run.friendlyId }, - ...(run.taskIdentifier ? [{ type: "tasks", id: run.taskIdentifier }] : []), - ...run.tags.map((tag) => ({ type: "tags", id: tag })), - ]; - if (run.batchId) { - resources.push({ type: "batch", id: BatchId.toFriendlyId(run.batchId) }); - } - return anyResource(resources); - }, - }, - }, - async () => { - return json({ attempts: [] }, { status: 200 }); - } -); - export async function action({ request, params }: ActionFunctionArgs) { // Authenticate the request const authenticationResult = await authenticateApiRequest(request); diff --git a/apps/webapp/app/v3/mollifier/syntheticApiResponses.server.ts b/apps/webapp/app/v3/mollifier/syntheticApiResponses.server.ts new file mode 100644 index 00000000000..02c63fe91f1 --- /dev/null +++ b/apps/webapp/app/v3/mollifier/syntheticApiResponses.server.ts @@ -0,0 +1,73 @@ +import type { SyntheticRun } from "./readFallback.server"; + +// Buffered runs have no execution data — the drainer hasn't materialised +// the PG row and the worker hasn't started. The SDK-facing read routes +// still need to return a span/trace shape that satisfies their response +// schemas; these helpers build that minimal shape from the buffered +// SyntheticRun. +// +// CANCELED and FAILED are terminal states: a FAILED buffered run is +// errored (drainer exhausted retries or the gate rejected it) and must +// not signal "still in progress." The flags below mirror +// syntheticTrace.server.ts so the SDK contract stays consistent across +// the three read paths (spans, trace, dashboard trace presenter). + +function deriveTerminalFlags(status: SyntheticRun["status"]): { + isError: boolean; + isPartial: boolean; + isCancelled: boolean; +} { + const isCancelled = status === "CANCELED"; + const isFailed = status === "FAILED"; + return { + isError: isFailed, + isPartial: !isCancelled && !isFailed, + isCancelled, + }; +} + +// Body for GET /api/v1/runs/:runId/spans/:spanId when the run is buffered +// and `:spanId` has already been verified against `buffered.spanId` by the +// route. Pure function so the route layer just authenticates, resolves +// the run, validates the spanId, and forwards the buffered run here. +export function buildSyntheticSpanDetailBody(buffered: SyntheticRun) { + const flags = deriveTerminalFlags(buffered.status); + return { + spanId: buffered.spanId, + parentId: buffered.parentSpanId ?? null, + runId: buffered.friendlyId, + message: buffered.taskIdentifier ?? "", + ...flags, + level: "TRACE" as const, + startTime: buffered.createdAt, + durationMs: 0, + }; +} + +// Body for GET /api/v1/runs/:runId/trace when the run is buffered. +// Returns the `{ trace: { traceId, rootSpan } }` envelope expected by the +// SDK's RetrieveRunTraceResponseBody schema. +export function buildSyntheticTraceBody(buffered: SyntheticRun) { + const flags = deriveTerminalFlags(buffered.status); + return { + trace: { + traceId: buffered.traceId ?? "", + rootSpan: { + id: buffered.spanId ?? "", + runId: buffered.friendlyId, + data: { + message: buffered.taskIdentifier ?? "", + taskSlug: buffered.taskIdentifier ?? undefined, + events: [] as unknown[], + startTime: buffered.createdAt, + duration: 0, + ...flags, + level: "TRACE" as const, + queueName: buffered.queue ?? undefined, + machinePreset: buffered.machinePreset ?? undefined, + }, + children: [] as unknown[], + }, + }, + }; +} diff --git a/apps/webapp/test/mollifierSyntheticApiResponses.test.ts b/apps/webapp/test/mollifierSyntheticApiResponses.test.ts new file mode 100644 index 00000000000..94ee67c8584 --- /dev/null +++ b/apps/webapp/test/mollifierSyntheticApiResponses.test.ts @@ -0,0 +1,164 @@ +import { describe, expect, it, vi } from "vitest"; + +vi.mock("~/db.server", () => ({ prisma: {}, $replica: {} })); + +import { + buildSyntheticSpanDetailBody, + buildSyntheticTraceBody, +} from "~/v3/mollifier/syntheticApiResponses.server"; +import type { SyntheticRun } from "~/v3/mollifier/readFallback.server"; + +const NOW = new Date("2026-05-23T10:00:00Z"); + +function makeSyntheticRun(overrides: Partial = {}): SyntheticRun { + return { + id: "run_internal_1", + friendlyId: "run_friendly_1", + status: "QUEUED", + cancelledAt: undefined, + cancelReason: undefined, + delayUntil: undefined, + taskIdentifier: "hello-world", + createdAt: NOW, + payload: undefined, + payloadType: undefined, + metadata: undefined, + metadataType: undefined, + seedMetadata: undefined, + seedMetadataType: undefined, + idempotencyKey: undefined, + idempotencyKeyOptions: undefined, + isTest: false, + depth: 0, + ttl: undefined, + tags: [], + runTags: [], + lockedToVersion: undefined, + resumeParentOnCompletion: false, + parentTaskRunId: undefined, + traceId: "trace_1", + spanId: "span_1", + parentSpanId: "span_parent", + runtimeEnvironmentId: "env_a", + engine: "V2", + workerQueue: undefined, + queue: "task/hello-world", + concurrencyKey: undefined, + machinePreset: "small-1x", + realtimeStreamsVersion: undefined, + maxAttempts: undefined, + maxDurationInSeconds: undefined, + replayedFromTaskRunFriendlyId: undefined, + annotations: undefined, + traceContext: undefined, + scheduleId: undefined, + batchId: undefined, + parentTaskRunFriendlyId: undefined, + rootTaskRunFriendlyId: undefined, + ...overrides, + }; +} + +describe("buildSyntheticSpanDetailBody", () => { + it("populates identity fields from the buffered run", () => { + const body = buildSyntheticSpanDetailBody(makeSyntheticRun()); + expect(body.spanId).toBe("span_1"); + expect(body.parentId).toBe("span_parent"); + expect(body.runId).toBe("run_friendly_1"); + expect(body.message).toBe("hello-world"); + expect(body.level).toBe("TRACE"); + expect(body.startTime).toEqual(NOW); + expect(body.durationMs).toBe(0); + }); + + it("defaults parentId to null when the buffered run has no parentSpanId", () => { + const body = buildSyntheticSpanDetailBody(makeSyntheticRun({ parentSpanId: undefined })); + expect(body.parentId).toBeNull(); + }); + + it("defaults message to '' when the buffered run has no taskIdentifier", () => { + const body = buildSyntheticSpanDetailBody( + makeSyntheticRun({ taskIdentifier: undefined }) + ); + expect(body.message).toBe(""); + }); + + it("renders a QUEUED buffered run as a still-partial, non-error, non-cancelled span", () => { + const body = buildSyntheticSpanDetailBody(makeSyntheticRun({ status: "QUEUED" })); + expect(body.isPartial).toBe(true); + expect(body.isError).toBe(false); + expect(body.isCancelled).toBe(false); + }); + + it("renders a CANCELED buffered run as a non-partial, non-error, cancelled span", () => { + const body = buildSyntheticSpanDetailBody(makeSyntheticRun({ status: "CANCELED" })); + expect(body.isPartial).toBe(false); + expect(body.isError).toBe(false); + expect(body.isCancelled).toBe(true); + }); + + it("renders a FAILED buffered run as a non-partial, errored, non-cancelled span", () => { + // Regression: a FAILED buffered run used to slip through as + // `isPartial: true, isError: false`, telling SDK pollers it was still + // executing. + const body = buildSyntheticSpanDetailBody(makeSyntheticRun({ status: "FAILED" })); + expect(body.isPartial).toBe(false); + expect(body.isError).toBe(true); + expect(body.isCancelled).toBe(false); + }); +}); + +describe("buildSyntheticTraceBody", () => { + it("envelopes the synthesised root span under `trace.rootSpan` with the buffered traceId", () => { + const body = buildSyntheticTraceBody(makeSyntheticRun()); + expect(body.trace.traceId).toBe("trace_1"); + expect(body.trace.rootSpan.id).toBe("span_1"); + expect(body.trace.rootSpan.runId).toBe("run_friendly_1"); + expect(body.trace.rootSpan.children).toEqual([]); + expect(body.trace.rootSpan.data.events).toEqual([]); + }); + + it("falls back to empty strings when traceId / spanId are absent from the snapshot", () => { + const body = buildSyntheticTraceBody( + makeSyntheticRun({ traceId: undefined, spanId: undefined }) + ); + expect(body.trace.traceId).toBe(""); + expect(body.trace.rootSpan.id).toBe(""); + }); + + it("passes through queueName and machinePreset from the snapshot", () => { + const body = buildSyntheticTraceBody(makeSyntheticRun()); + expect(body.trace.rootSpan.data.queueName).toBe("task/hello-world"); + expect(body.trace.rootSpan.data.machinePreset).toBe("small-1x"); + }); + + it("defaults taskSlug to undefined when the buffered run has no taskIdentifier", () => { + const body = buildSyntheticTraceBody(makeSyntheticRun({ taskIdentifier: undefined })); + expect(body.trace.rootSpan.data.taskSlug).toBeUndefined(); + expect(body.trace.rootSpan.data.message).toBe(""); + }); + + it("renders a QUEUED buffered run as a partial, non-error, non-cancelled root span", () => { + const body = buildSyntheticTraceBody(makeSyntheticRun({ status: "QUEUED" })); + expect(body.trace.rootSpan.data.isPartial).toBe(true); + expect(body.trace.rootSpan.data.isError).toBe(false); + expect(body.trace.rootSpan.data.isCancelled).toBe(false); + }); + + it("renders a CANCELED buffered run as a non-partial, non-error, cancelled root span", () => { + const body = buildSyntheticTraceBody(makeSyntheticRun({ status: "CANCELED" })); + expect(body.trace.rootSpan.data.isPartial).toBe(false); + expect(body.trace.rootSpan.data.isError).toBe(false); + expect(body.trace.rootSpan.data.isCancelled).toBe(true); + }); + + it("renders a FAILED buffered run as a non-partial, errored, non-cancelled root span", () => { + // Regression: a FAILED buffered run used to render with + // `isPartial: true, isError: false`, masking the failure from SDK + // consumers. + const body = buildSyntheticTraceBody(makeSyntheticRun({ status: "FAILED" })); + expect(body.trace.rootSpan.data.isPartial).toBe(false); + expect(body.trace.rootSpan.data.isError).toBe(true); + expect(body.trace.rootSpan.data.isCancelled).toBe(false); + }); +}); From 9fd79573325d96b31f5f3b4addb3e9cdef2273e3 Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Thu, 28 May 2026 12:57:45 +0100 Subject: [PATCH 12/12] fix(webapp): forward scheduleId through mollifier buffer synthesis + pin synthesise contract with regression tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit synthesiseFoundRunFromBuffer hardcoded `scheduleId: null`, which dropped the schedule field from the retrieve-API response for any scheduled trigger that landed in the mollifier buffer. Scheduled triggers go through the same TriggerTaskService path as API triggers and the gate doesn't bypass them, so the snapshot does carry scheduleId; the synthesis was just throwing it away. Forward `buffered.scheduleId ?? null` so resolveSchedule() can hydrate the schedule object from PG (the Schedule row exists before the trigger fires). Exported synthesiseFoundRunFromBuffer + the FoundRun type from the presenter and added apps/webapp/test/mollifierSynthesiseFoundRun.test.ts (16 cases). The test file pins the snapshot→FoundRun mapping that previously had no direct coverage — the new scheduleId forwarding plus earlier-session regressions (batch reconstruction, workerQueue default "", FAILED→SYSTEM_FAILURE status mapping, STRING_ERROR shape, defensive metadata coercion, idempotency defaults, execution-state zero defaults). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../v3/ApiRetrieveRunPresenter.server.ts | 19 +- .../test/mollifierSynthesiseFoundRun.test.ts | 197 ++++++++++++++++++ 2 files changed, 213 insertions(+), 3 deletions(-) create mode 100644 apps/webapp/test/mollifierSynthesiseFoundRun.test.ts diff --git a/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts b/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts index 5e41d170e76..0bdde1ad71f 100644 --- a/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts @@ -73,7 +73,10 @@ type CommonRelatedRun = Prisma.Result< // extras the route handler reads. Declared explicitly (not inferred via // ReturnType) so findRun can return a synthesised buffered // run without the type becoming self-referential. -type FoundRun = CommonRelatedRun & { +// Exported so the buffer-synthesis helper below can be unit-tested +// against a stable shape without re-deriving it (FoundRun's exact field +// list is what the buffered run must match for `call()` not to surprise). +export type FoundRun = CommonRelatedRun & { traceId: string; payload: string; payloadType: string; @@ -587,7 +590,11 @@ function synthesiseMetadata(buffered: SyntheticRun): string | null { } } -function synthesiseFoundRunFromBuffer(buffered: SyntheticRun): FoundRun { +// Exported for unit testing. Used by `findRun()` above when the +// Postgres lookup misses and the buffer carries the run — keep the shape +// in lockstep with `FoundRun`'s field list so `call()` treats a synthesised +// buffered run identically to a freshly-triggered PG row. +export function synthesiseFoundRunFromBuffer(buffered: SyntheticRun): FoundRun { const status: TaskRunStatus = bufferedStatusToTaskRunStatus(buffered.status); const errorJson: Prisma.JsonValue = buffered.error @@ -626,7 +633,13 @@ function synthesiseFoundRunFromBuffer(buffered: SyntheticRun): FoundRun { idempotencyKeyOptions: buffered.idempotencyKeyOptions ?? null, isTest: buffered.isTest, depth: buffered.depth, - scheduleId: null, + // Scheduled triggers go through the same TriggerTaskService path as + // API triggers and aren't bypassed by the mollifier gate, so a + // scheduled run can land in the buffer with its scheduleId set on the + // snapshot. Forward it so resolveSchedule() can hydrate the `schedule` + // field in the API response instead of silently dropping it until the + // drainer materialises. + scheduleId: buffered.scheduleId ?? null, lockedToVersion: buffered.lockedToVersion ? { version: buffered.lockedToVersion } : null, resumeParentOnCompletion: buffered.resumeParentOnCompletion, // Reconstruct the batch from the snapshot's internal id so a buffered diff --git a/apps/webapp/test/mollifierSynthesiseFoundRun.test.ts b/apps/webapp/test/mollifierSynthesiseFoundRun.test.ts new file mode 100644 index 00000000000..07da5761cb7 --- /dev/null +++ b/apps/webapp/test/mollifierSynthesiseFoundRun.test.ts @@ -0,0 +1,197 @@ +import { describe, expect, it, vi } from "vitest"; + +vi.mock("~/db.server", () => ({ prisma: {}, $replica: {} })); + +import { + synthesiseFoundRunFromBuffer, + type FoundRun, +} from "~/presenters/v3/ApiRetrieveRunPresenter.server"; +import type { SyntheticRun } from "~/v3/mollifier/readFallback.server"; + +const NOW = new Date("2026-05-24T10:00:00Z"); + +function makeSyntheticRun(overrides: Partial = {}): SyntheticRun { + return { + id: "run_internal_1", + friendlyId: "run_friendly_1", + status: "QUEUED", + cancelledAt: undefined, + cancelReason: undefined, + delayUntil: undefined, + taskIdentifier: "hello-world", + createdAt: NOW, + payload: '{"hello":"world"}', + payloadType: "application/json", + metadata: undefined, + metadataType: undefined, + seedMetadata: undefined, + seedMetadataType: undefined, + idempotencyKey: undefined, + idempotencyKeyOptions: undefined, + isTest: false, + depth: 0, + ttl: undefined, + tags: ["alpha", "beta"], + runTags: ["alpha", "beta"], + lockedToVersion: undefined, + resumeParentOnCompletion: false, + parentTaskRunId: undefined, + traceId: "trace_1", + spanId: "span_1", + parentSpanId: undefined, + runtimeEnvironmentId: "env_a", + engine: "V2", + workerQueue: undefined, + queue: undefined, + concurrencyKey: undefined, + machinePreset: undefined, + realtimeStreamsVersion: undefined, + maxAttempts: undefined, + maxDurationInSeconds: undefined, + replayedFromTaskRunFriendlyId: undefined, + annotations: undefined, + traceContext: undefined, + scheduleId: undefined, + batchId: undefined, + parentTaskRunFriendlyId: undefined, + rootTaskRunFriendlyId: undefined, + ...overrides, + }; +} + +describe("synthesiseFoundRunFromBuffer", () => { + it("populates internal id and friendlyId so downstream logging keys off the cuid", () => { + const found: FoundRun = synthesiseFoundRunFromBuffer(makeSyntheticRun()); + expect(found.id).toBe("run_internal_1"); + expect(found.friendlyId).toBe("run_friendly_1"); + }); + + it("forwards scheduleId from the snapshot so resolveSchedule can hydrate the schedule field", () => { + // Regression: scheduleId was previously hardcoded to null, dropping the + // schedule metadata for buffered scheduled runs even though the snapshot + // carries it (readFallback.server.ts extracts snapshot.scheduleId). + const found = synthesiseFoundRunFromBuffer( + makeSyntheticRun({ scheduleId: "schedule_internal_42" }) + ); + expect(found.scheduleId).toBe("schedule_internal_42"); + }); + + it("leaves scheduleId null when the snapshot has no scheduleId (non-scheduled trigger)", () => { + const found = synthesiseFoundRunFromBuffer(makeSyntheticRun()); + expect(found.scheduleId).toBeNull(); + }); + + it("reconstructs batch.friendlyId from snapshot.batchId so batch-scoped JWTs authorise", () => { + // Regression: batch was previously hardcoded to null, so the + // route-authorization callbacks (which read run.batch?.friendlyId) + // skipped pushing the batch resource — a batch-scoped JWT 403'd on + // buffered batched runs. + const found = synthesiseFoundRunFromBuffer( + // BatchId.toFriendlyId encodes the internal id with a "batch_" prefix. + makeSyntheticRun({ batchId: "abcdefghijklmnopqrstuvwx" }) + ); + expect(found.batch).not.toBeNull(); + expect(found.batch!.id).toBe("abcdefghijklmnopqrstuvwx"); + expect(found.batch!.friendlyId).toMatch(/^batch_/); + }); + + it("leaves batch null when the snapshot has no batchId (non-batched run)", () => { + const found = synthesiseFoundRunFromBuffer(makeSyntheticRun()); + expect(found.batch).toBeNull(); + }); + + it("defaults workerQueue to '' so createCommonRunStructure coerces region to undefined", () => { + // Regression: workerQueue previously defaulted to "main", which fed + // through `run.workerQueue || undefined` as the API response's + // `region` — advertising a not-yet-assigned region. + const found = synthesiseFoundRunFromBuffer(makeSyntheticRun({ workerQueue: undefined })); + expect(found.workerQueue).toBe(""); + }); + + it("passes through an explicit workerQueue from the snapshot unchanged", () => { + const found = synthesiseFoundRunFromBuffer( + makeSyntheticRun({ workerQueue: "us-east-1" }) + ); + expect(found.workerQueue).toBe("us-east-1"); + }); + + it("maps buffered FAILED to SYSTEM_FAILURE so the API surfaces the failure", () => { + const found = synthesiseFoundRunFromBuffer( + makeSyntheticRun({ + status: "FAILED", + error: { code: "GATE_REJECTED", message: "buffer rejected the run" }, + }) + ); + expect(found.status).toBe("SYSTEM_FAILURE"); + expect(found.error).toEqual({ + type: "STRING_ERROR", + raw: "GATE_REJECTED: buffer rejected the run", + }); + }); + + it("maps buffered CANCELED to CANCELED with completedAt populated from cancelledAt", () => { + const cancelledAt = new Date("2026-05-24T10:05:00Z"); + const found = synthesiseFoundRunFromBuffer( + makeSyntheticRun({ status: "CANCELED", cancelledAt }) + ); + expect(found.status).toBe("CANCELED"); + expect(found.completedAt).toEqual(cancelledAt); + }); + + it("maps buffered QUEUED to PENDING with no error and no completedAt", () => { + const found = synthesiseFoundRunFromBuffer(makeSyntheticRun({ status: "QUEUED" })); + expect(found.status).toBe("PENDING"); + expect(found.error).toBeNull(); + expect(found.completedAt).toBeNull(); + }); + + it("passes through a string snapshot.metadata unchanged", () => { + const found = synthesiseFoundRunFromBuffer( + makeSyntheticRun({ metadata: '{"customer":"acme"}' }) + ); + expect(found.metadata).toBe('{"customer":"acme"}'); + }); + + it("defensively coerces a non-string snapshot.metadata to a JSON string instead of dropping it silently", () => { + // Production never writes non-string metadata, but if the snapshot + // shape drifts we'd rather see the value (with a warn log) than have + // it disappear. + const found = synthesiseFoundRunFromBuffer( + makeSyntheticRun({ metadata: { customer: "acme" } }) + ); + expect(found.metadata).toBe('{"customer":"acme"}'); + }); + + it("defaults idempotencyKey / idempotencyKeyOptions to null when absent", () => { + const found = synthesiseFoundRunFromBuffer(makeSyntheticRun()); + expect(found.idempotencyKey).toBeNull(); + expect(found.idempotencyKeyOptions).toBeNull(); + }); + + it("zeroes execution-state fields that aren't meaningful for a buffered run", () => { + const found = synthesiseFoundRunFromBuffer(makeSyntheticRun()); + expect(found.startedAt).toBeNull(); + expect(found.attempts).toEqual([]); + expect(found.attemptNumber).toBeNull(); + expect(found.parentTaskRun).toBeNull(); + expect(found.rootTaskRun).toBeNull(); + expect(found.childRuns).toEqual([]); + expect(found.output).toBeNull(); + expect(found.costInCents).toBe(0); + expect(found.baseCostInCents).toBe(0); + expect(found.usageDurationMs).toBe(0); + }); + + it("forwards runTags from the snapshot tags array", () => { + const found = synthesiseFoundRunFromBuffer( + makeSyntheticRun({ tags: ["alpha", "beta"] }) + ); + expect(found.runTags).toEqual(["alpha", "beta"]); + }); + + it("pins engine to V2 and taskEventStore to taskEvent (only valid values for a buffered run)", () => { + const found = synthesiseFoundRunFromBuffer(makeSyntheticRun()); + expect(found.engine).toBe("V2"); + expect(found.taskEventStore).toBe("taskEvent"); + }); +});