diff --git a/.server-changes/mollifier-reads.md b/.server-changes/mollifier-reads.md new file mode 100644 index 0000000000..bbb3aa8334 --- /dev/null +++ b/.server-changes/mollifier-reads.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: feature +--- + +Mollifier API read-fallback: serve buffered runs from synthetic run/trace/span data on the retrieve, trace, spans, and attempts endpoints. diff --git a/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts b/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts index a392866afc..0bdde1ad71 100644 --- a/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts @@ -9,12 +9,17 @@ import { logger, } from "@trigger.dev/core/v3"; import { parsePacketAsJson } from "@trigger.dev/core/v3/utils/ioSerialization"; +import { BatchId } from "@trigger.dev/core/v3/isomorphic"; import { getUserProvidedIdempotencyKey } from "@trigger.dev/core/v3/serverOnly"; import { Prisma, TaskRunAttemptStatus, TaskRunStatus } from "@trigger.dev/database"; import assertNever from "assert-never"; import { API_VERSIONS, CURRENT_API_VERSION, RunStatusUnspecifiedApiVersion } from "~/api/versions"; import { $replica, prisma } from "~/db.server"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; +import { + findRunByIdWithMollifierFallback, + type SyntheticRun, +} from "~/v3/mollifier/readFallback.server"; import { generatePresignedUrl } from "~/v3/objectStore.server"; import { tracer } from "~/v3/tracer.server"; import { startSpanWithEnv } from "~/v3/tracing.server"; @@ -64,13 +69,37 @@ type CommonRelatedRun = Prisma.Result< "findFirstOrThrow" >; -type FoundRun = NonNullable>>; +// Full shape returned by findRun() — the commonRunSelect fields plus the +// extras the route handler reads. Declared explicitly (not inferred via +// ReturnType) so findRun can return a synthesised buffered +// run without the type becoming self-referential. +// Exported so the buffer-synthesis helper below can be unit-tested +// against a stable shape without re-deriving it (FoundRun's exact field +// list is what the buffered run must match for `call()` not to surprise). +export type FoundRun = CommonRelatedRun & { + traceId: string; + payload: string; + payloadType: string; + output: string | null; + outputType: string; + error: Prisma.JsonValue; + attempts: { id: string }[]; + attemptNumber: number | null; + engine: "V1" | "V2"; + taskEventStore: string; + parentTaskRun: CommonRelatedRun | null; + rootTaskRun: CommonRelatedRun | null; + childRuns: CommonRelatedRun[]; +}; export class ApiRetrieveRunPresenter { constructor(private readonly apiVersion: API_VERSIONS) {} - public static async findRun(friendlyId: string, env: AuthenticatedEnvironment) { - return $replica.taskRun.findFirst({ + public static async findRun( + friendlyId: string, + env: AuthenticatedEnvironment, + ): Promise { + const pgRow = await $replica.taskRun.findFirst({ where: { friendlyId, runtimeEnvironmentId: env.id, @@ -102,6 +131,23 @@ export class ApiRetrieveRunPresenter { }, }, }); + + if (pgRow) return pgRow; + + // Postgres miss → fall back to the mollifier buffer. When the gate + // diverted a trigger, the run lives in Redis until the drainer replays + // it through engine.trigger. Synthesise the FoundRun shape so call() + // returns a `QUEUED` (or `FAILED`) response with empty output, no + // attempts, no relations. + const buffered = await findRunByIdWithMollifierFallback({ + runId: friendlyId, + environmentId: env.id, + organizationId: env.organizationId, + }); + + if (!buffered) return null; + + return synthesiseFoundRunFromBuffer(buffered); } public async call(taskRun: FoundRun, env: AuthenticatedEnvironment) { @@ -475,3 +521,152 @@ function resolveTriggerFunction(run: CommonRelatedRun): TriggerFunction { return run.resumeParentOnCompletion ? "triggerAndWait" : "trigger"; } } + +// Build a FoundRun-shaped object from a buffered (mollified) run. The run +// is in the Redis buffer; engine.trigger hasn't created the Postgres row +// yet, so every field that comes from execution state (output, attempts, +// completedAt, cost, relations) takes a default. The presenter's call() +// handles QUEUED-state runs without surprise. +function bufferedStatusToTaskRunStatus(status: SyntheticRun["status"]): TaskRunStatus { + switch (status) { + case "FAILED": + return "SYSTEM_FAILURE"; + case "CANCELED": + return "CANCELED"; + default: + return "PENDING"; + } +} + +// The PG path stores `TaskRun.payload` as `String?`, so in production +// the buffered snapshot's `payload` is always a string. We defensively +// coerce other types instead of silently dropping them: an object gets +// JSON-stringified (matches how the trigger path would serialise it), +// anything truly unrenderable falls back to an empty string. The log +// line surfaces format drift to ops without crashing the read path. +function synthesisePayload(buffered: SyntheticRun): string { + const payload = buffered.payload; + if (typeof payload === "string") return payload; + if (payload === undefined || payload === null) return ""; + try { + const serialised = JSON.stringify(payload); + logger.warn("ApiRetrieveRunPresenter: buffered snapshot.payload non-string coerced", { + runFriendlyId: buffered.friendlyId, + payloadType: typeof payload, + }); + return typeof serialised === "string" ? serialised : ""; + } catch { + logger.error("ApiRetrieveRunPresenter: buffered snapshot.payload unserialisable", { + runFriendlyId: buffered.friendlyId, + payloadType: typeof payload, + }); + return ""; + } +} + +// Mirror synthesisePayload for metadata. The PG path stores +// `TaskRun.metadata` as `String?`, and the snapshot writes it from +// `metadataPacket.data` (also a string), so in production it is always a +// string or absent. We coerce defensively — an object gets JSON-stringified +// (matching how the trigger path serialises it) rather than silently +// dropped to null, and the log line surfaces format drift to ops. +function synthesiseMetadata(buffered: SyntheticRun): string | null { + const metadata = buffered.metadata; + if (typeof metadata === "string") return metadata; + if (metadata === undefined || metadata === null) return null; + try { + const serialised = JSON.stringify(metadata); + logger.warn("ApiRetrieveRunPresenter: buffered snapshot.metadata non-string coerced", { + runFriendlyId: buffered.friendlyId, + metadataType: typeof metadata, + }); + return typeof serialised === "string" ? serialised : null; + } catch { + logger.error("ApiRetrieveRunPresenter: buffered snapshot.metadata unserialisable", { + runFriendlyId: buffered.friendlyId, + metadataType: typeof metadata, + }); + return null; + } +} + +// Exported for unit testing. Used by `findRun()` above when the +// Postgres lookup misses and the buffer carries the run — keep the shape +// in lockstep with `FoundRun`'s field list so `call()` treats a synthesised +// buffered run identically to a freshly-triggered PG row. +export function synthesiseFoundRunFromBuffer(buffered: SyntheticRun): FoundRun { + const status: TaskRunStatus = bufferedStatusToTaskRunStatus(buffered.status); + + const errorJson: Prisma.JsonValue = buffered.error + ? { + type: "STRING_ERROR", + raw: `${buffered.error.code}: ${buffered.error.message}`, + } + : null; + + const metadata: string | null = synthesiseMetadata(buffered); + + return { + // `id` is the internal cuid (Prisma TaskRun.id column), `friendlyId` + // is the user-facing `run_xxx` token. Downstream logging keyed off + // `taskRun.id` correlates with other systems via the cuid — using + // the friendlyId here breaks log correlation. `SyntheticRun` carries + // the cuid alongside the friendlyId for exactly this reason + // (RunId.fromFriendlyId in readFallback.server.ts). + id: buffered.id, + friendlyId: buffered.friendlyId, + status, + taskIdentifier: buffered.taskIdentifier ?? "", + createdAt: buffered.createdAt, + startedAt: null, + updatedAt: buffered.cancelledAt ?? buffered.createdAt, + completedAt: buffered.cancelledAt ?? null, + expiredAt: null, + delayUntil: buffered.delayUntil ?? null, + metadata, + metadataType: buffered.metadataType ?? "application/json", + ttl: buffered.ttl ?? null, + costInCents: 0, + baseCostInCents: 0, + usageDurationMs: 0, + idempotencyKey: buffered.idempotencyKey ?? null, + idempotencyKeyOptions: buffered.idempotencyKeyOptions ?? null, + isTest: buffered.isTest, + depth: buffered.depth, + // Scheduled triggers go through the same TriggerTaskService path as + // API triggers and aren't bypassed by the mollifier gate, so a + // scheduled run can land in the buffer with its scheduleId set on the + // snapshot. Forward it so resolveSchedule() can hydrate the `schedule` + // field in the API response instead of silently dropping it until the + // drainer materialises. + scheduleId: buffered.scheduleId ?? null, + lockedToVersion: buffered.lockedToVersion ? { version: buffered.lockedToVersion } : null, + resumeParentOnCompletion: buffered.resumeParentOnCompletion, + // Reconstruct the batch from the snapshot's internal id so a buffered + // run reports the same `batchId` / triggerFunction as it will once + // materialised, and so batch-scoped JWTs authorise against it (the + // route authorization callbacks read `run.batch?.friendlyId`). + batch: buffered.batchId + ? { id: buffered.batchId, friendlyId: BatchId.toFriendlyId(buffered.batchId) } + : null, + runTags: buffered.tags, + traceId: buffered.traceId ?? "", + payload: synthesisePayload(buffered), + payloadType: buffered.payloadType ?? "application/json", + output: null, + outputType: "application/json", + error: errorJson, + attempts: [], + attemptNumber: null, + engine: "V2", + taskEventStore: "taskEvent", + // Empty string when absent (matches syntheticSpanRun.server.ts and lets + // `createCommonRunStructure`'s `run.workerQueue || undefined` coerce the + // API response's `region` to undefined instead of advertising a + // misleading "main" region for a not-yet-assigned buffered run). + workerQueue: buffered.workerQueue ?? "", + parentTaskRun: null, + rootTaskRun: null, + childRuns: [], + }; +} diff --git a/apps/webapp/app/routes/api.v1.runs.$runId.spans.$spanId.ts b/apps/webapp/app/routes/api.v1.runs.$runId.spans.$spanId.ts index be0d12087b..a5250e5b85 100644 --- a/apps/webapp/app/routes/api.v1.runs.$runId.spans.$spanId.ts +++ b/apps/webapp/app/routes/api.v1.runs.$runId.spans.$spanId.ts @@ -9,33 +9,69 @@ import { } from "~/services/routeBuilders/apiBuilder.server"; import { getEventRepositoryForStore } from "~/v3/eventRepository/index.server"; import { getTaskEventStoreTableForRun } from "~/v3/taskEventStore.server"; +import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server"; +import { buildSyntheticSpanDetailBody } from "~/v3/mollifier/syntheticApiResponses.server"; const ParamsSchema = z.object({ runId: z.string(), spanId: z.string(), }); +// Resolve the run from either Postgres or the mollifier buffer. +// Buffered runs only have one valid spanId (the queued span recorded at +// gate time and reused as the run's root spanId when the drainer +// materialises). Any other spanId returns a deterministic 404; the queued +// span returns a minimal synthesised shape so the customer's SDK sees the +// same 200 contract they'd get for a freshly-triggered run. +type ResolvedRun = + | { source: "pg"; run: Awaited> & {} } + | { source: "buffer"; run: NonNullable>> }; + +async function findPgRun(runId: string, environmentId: string) { + return $replica.taskRun.findFirst({ + where: { friendlyId: runId, runtimeEnvironmentId: environmentId }, + }); +} + export const loader = createLoaderApiRoute( { params: ParamsSchema, allowJWT: true, corsStrategy: "all", - findResource: (params, auth) => { - return $replica.taskRun.findFirst({ - where: { - friendlyId: params.runId, - runtimeEnvironmentId: auth.environment.id, - }, + findResource: async (params, auth): Promise => { + const pgRun = await findPgRun(params.runId, auth.environment.id); + if (pgRun) return { source: "pg", run: pgRun }; + + const buffered = await findRunByIdWithMollifierFallback({ + runId: params.runId, + environmentId: auth.environment.id, + organizationId: auth.environment.organizationId, }); + if (buffered) return { source: "buffer", run: buffered }; + + return null; }, shouldRetryNotFound: true, authorization: { action: "read", - resource: (run) => { + resource: (resolved) => { + if (resolved.source === "pg") { + const run = resolved.run; + const resources = [ + { type: "runs", id: run.friendlyId }, + { type: "tasks", id: run.taskIdentifier }, + ...run.runTags.map((tag) => ({ type: "tags", id: tag })), + ]; + if (run.batchId) { + resources.push({ type: "batch", id: BatchId.toFriendlyId(run.batchId) }); + } + return anyResource(resources); + } + const run = resolved.run; const resources = [ { type: "runs", id: run.friendlyId }, - { type: "tasks", id: run.taskIdentifier }, - ...run.runTags.map((tag) => ({ type: "tags", id: tag })), + ...(run.taskIdentifier ? [{ type: "tasks", id: run.taskIdentifier }] : []), + ...run.tags.map((tag) => ({ type: "tags", id: tag })), ]; if (run.batchId) { resources.push({ type: "batch", id: BatchId.toFriendlyId(run.batchId) }); @@ -44,7 +80,20 @@ export const loader = createLoaderApiRoute( }, }, }, - async ({ params, resource: run, authentication }) => { + async ({ params, resource: resolved, authentication }) => { + if (resolved.source === "buffer") { + // Buffered runs have exactly one valid spanId — the queued span the + // mollifier gate recorded at trigger time, which becomes the run's + // root spanId once the drainer materialises. Any other spanId is a + // deterministic 404. The matching spanId returns a minimal shape + // representing "span exists, no execution data yet." + if (resolved.run.spanId !== params.spanId) { + return json({ error: "Span not found" }, { status: 404 }); + } + return json(buildSyntheticSpanDetailBody(resolved.run), { status: 200 }); + } + + const run = resolved.run; const eventRepository = await getEventRepositoryForStore( run.taskEventStore, authentication.environment.organization.id diff --git a/apps/webapp/app/routes/api.v1.runs.$runId.trace.ts b/apps/webapp/app/routes/api.v1.runs.$runId.trace.ts index 77e6a4df04..04ae398194 100644 --- a/apps/webapp/app/routes/api.v1.runs.$runId.trace.ts +++ b/apps/webapp/app/routes/api.v1.runs.$runId.trace.ts @@ -8,32 +8,68 @@ import { } from "~/services/routeBuilders/apiBuilder.server"; import { getEventRepositoryForStore } from "~/v3/eventRepository/index.server"; import { getTaskEventStoreTableForRun } from "~/v3/taskEventStore.server"; +import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server"; +import { buildSyntheticTraceBody } from "~/v3/mollifier/syntheticApiResponses.server"; const ParamsSchema = z.object({ runId: z.string(), // This is the run friendly ID }); +// Discriminator on the resolved resource — `pg` is the real Prisma TaskRun +// row, `buffer` is a synthesised shape from the mollifier buffer for runs +// whose drainer hasn't yet materialised them. The handler renders an empty +// trace for buffered runs so the customer sees the same 200 shape they'd +// get for a freshly-triggered PG run with no spans yet (matches the +// pass-through control case in scripts/mollifier-api-parity.sh). +type ResolvedRun = + | { source: "pg"; run: Awaited> & {} } + | { source: "buffer"; run: NonNullable>> }; + +async function findPgRun(runId: string, environmentId: string) { + return $replica.taskRun.findFirst({ + where: { friendlyId: runId, runtimeEnvironmentId: environmentId }, + }); +} + export const loader = createLoaderApiRoute( { params: ParamsSchema, allowJWT: true, corsStrategy: "all", - findResource: (params, auth) => { - return $replica.taskRun.findFirst({ - where: { - friendlyId: params.runId, - runtimeEnvironmentId: auth.environment.id, - }, + findResource: async (params, auth): Promise => { + const pgRun = await findPgRun(params.runId, auth.environment.id); + if (pgRun) return { source: "pg", run: pgRun }; + + const buffered = await findRunByIdWithMollifierFallback({ + runId: params.runId, + environmentId: auth.environment.id, + organizationId: auth.environment.organizationId, }); + if (buffered) return { source: "buffer", run: buffered }; + + return null; }, shouldRetryNotFound: true, authorization: { action: "read", - resource: (run) => { + resource: (resolved) => { + if (resolved.source === "pg") { + const run = resolved.run; + const resources = [ + { type: "runs", id: run.friendlyId }, + { type: "tasks", id: run.taskIdentifier }, + ...run.runTags.map((tag) => ({ type: "tags", id: tag })), + ]; + if (run.batchId) { + resources.push({ type: "batch", id: BatchId.toFriendlyId(run.batchId) }); + } + return anyResource(resources); + } + const run = resolved.run; const resources = [ { type: "runs", id: run.friendlyId }, - { type: "tasks", id: run.taskIdentifier }, - ...run.runTags.map((tag) => ({ type: "tags", id: tag })), + ...(run.taskIdentifier ? [{ type: "tasks", id: run.taskIdentifier }] : []), + ...run.tags.map((tag) => ({ type: "tags", id: tag })), ]; if (run.batchId) { resources.push({ type: "batch", id: BatchId.toFriendlyId(run.batchId) }); @@ -42,7 +78,17 @@ export const loader = createLoaderApiRoute( }, }, }, - async ({ resource: run, authentication }) => { + async ({ resource: resolved, authentication }) => { + if (resolved.source === "buffer") { + // Buffered runs have no events ingested yet — the drainer hasn't + // materialised the PG row and the worker hasn't started executing. + // The helper synthesises a single root span that satisfies the SDK's + // RetrieveRunTraceResponseBody schema (rootSpan is non-nullable) and + // reflects the buffered terminal state. + return json(buildSyntheticTraceBody(resolved.run), { status: 200 }); + } + + const run = resolved.run; const eventRepository = await getEventRepositoryForStore( run.taskEventStore, authentication.environment.organization.id diff --git a/apps/webapp/app/v3/mollifier/syntheticApiResponses.server.ts b/apps/webapp/app/v3/mollifier/syntheticApiResponses.server.ts new file mode 100644 index 0000000000..02c63fe91f --- /dev/null +++ b/apps/webapp/app/v3/mollifier/syntheticApiResponses.server.ts @@ -0,0 +1,73 @@ +import type { SyntheticRun } from "./readFallback.server"; + +// Buffered runs have no execution data — the drainer hasn't materialised +// the PG row and the worker hasn't started. The SDK-facing read routes +// still need to return a span/trace shape that satisfies their response +// schemas; these helpers build that minimal shape from the buffered +// SyntheticRun. +// +// CANCELED and FAILED are terminal states: a FAILED buffered run is +// errored (drainer exhausted retries or the gate rejected it) and must +// not signal "still in progress." The flags below mirror +// syntheticTrace.server.ts so the SDK contract stays consistent across +// the three read paths (spans, trace, dashboard trace presenter). + +function deriveTerminalFlags(status: SyntheticRun["status"]): { + isError: boolean; + isPartial: boolean; + isCancelled: boolean; +} { + const isCancelled = status === "CANCELED"; + const isFailed = status === "FAILED"; + return { + isError: isFailed, + isPartial: !isCancelled && !isFailed, + isCancelled, + }; +} + +// Body for GET /api/v1/runs/:runId/spans/:spanId when the run is buffered +// and `:spanId` has already been verified against `buffered.spanId` by the +// route. Pure function so the route layer just authenticates, resolves +// the run, validates the spanId, and forwards the buffered run here. +export function buildSyntheticSpanDetailBody(buffered: SyntheticRun) { + const flags = deriveTerminalFlags(buffered.status); + return { + spanId: buffered.spanId, + parentId: buffered.parentSpanId ?? null, + runId: buffered.friendlyId, + message: buffered.taskIdentifier ?? "", + ...flags, + level: "TRACE" as const, + startTime: buffered.createdAt, + durationMs: 0, + }; +} + +// Body for GET /api/v1/runs/:runId/trace when the run is buffered. +// Returns the `{ trace: { traceId, rootSpan } }` envelope expected by the +// SDK's RetrieveRunTraceResponseBody schema. +export function buildSyntheticTraceBody(buffered: SyntheticRun) { + const flags = deriveTerminalFlags(buffered.status); + return { + trace: { + traceId: buffered.traceId ?? "", + rootSpan: { + id: buffered.spanId ?? "", + runId: buffered.friendlyId, + data: { + message: buffered.taskIdentifier ?? "", + taskSlug: buffered.taskIdentifier ?? undefined, + events: [] as unknown[], + startTime: buffered.createdAt, + duration: 0, + ...flags, + level: "TRACE" as const, + queueName: buffered.queue ?? undefined, + machinePreset: buffered.machinePreset ?? undefined, + }, + children: [] as unknown[], + }, + }, + }; +} diff --git a/apps/webapp/app/v3/mollifier/syntheticRedirectInfo.server.ts b/apps/webapp/app/v3/mollifier/syntheticRedirectInfo.server.ts new file mode 100644 index 0000000000..e316846d70 --- /dev/null +++ b/apps/webapp/app/v3/mollifier/syntheticRedirectInfo.server.ts @@ -0,0 +1,119 @@ +import type { MollifierBuffer } from "@trigger.dev/redis-worker"; +import type { PrismaClientOrTransaction } from "@trigger.dev/database"; +import { z } from "zod"; +import { prisma } from "~/db.server"; +import { logger } from "~/services/logger.server"; +import { getMollifierBuffer } from "./mollifierBuffer.server"; +// Use the webapp-side wrapper (not `deserialiseSnapshot` from +// @trigger.dev/redis-worker directly) so this file shares a single +// deserialisation path with readFallback.server.ts. The two are +// behaviourally identical today (both wrap `JSON.parse`), but pinning +// the shared helper keeps the two read-side modules from drifting if +// snapshot encoding ever changes. +import { deserialiseMollifierSnapshot } from "./mollifierSnapshot.server"; + +// Validated subset of a mollifier snapshot — just the fields needed to +// rebuild a canonical run-detail URL for a buffered run. Anything else +// in the payload is ignored. `safeParse` against this schema replaces +// the ad-hoc `as Record` + `typeof === "string"` checks +// that the redirect path used to do by hand; missing or wrong-typed +// fields collapse into a single `parsed.success === false` branch. +const BufferedSnapshotSchema = z.object({ + spanId: z.string().optional(), + environment: z.object({ + slug: z.string(), + project: z.object({ slug: z.string() }), + organization: z.object({ slug: z.string() }), + }), +}); + +export type BufferedRunRedirectInfo = { + organizationSlug: string; + projectSlug: string; + environmentSlug: string; + spanId: string | undefined; +}; + +export type FindBufferedRunRedirectInfoDeps = { + getBuffer?: () => MollifierBuffer | null; + prismaClient?: PrismaClientOrTransaction; +}; + +// Resolve the org/project/env slugs needed to build the canonical run-detail +// URL for a buffered run. Used by the short-URL redirect routes +// (`runs.$runParam`, `@.runs.$runParam`, `projects.v3.$projectRef.runs.$runParam`) +// so a customer clicking the trigger-API-returned run link doesn't 404 +// during the buffered window. +// +// Authorisation: PG query confirms the requesting user belongs to the +// organisation the buffer entry says owns the run. Without this check a +// known runId would leak slugs. +export async function findBufferedRunRedirectInfo( + args: { + runFriendlyId: string; + userId: string; + // Admin impersonation paths bypass org-membership; mirrors the existing + // PG-side admin route behaviour (`@.runs.$runParam` doesn't filter by + // org membership in the PG query either). + skipOrgMembershipCheck?: boolean; + }, + deps: FindBufferedRunRedirectInfoDeps = {}, +): Promise { + const buffer = (deps.getBuffer ?? getMollifierBuffer)(); + const prismaClient = deps.prismaClient ?? prisma; + if (!buffer) return null; + + let entry; + try { + entry = await buffer.getEntry(args.runFriendlyId); + } catch (err) { + logger.warn("buffered redirect: buffer.getEntry failed", { + runFriendlyId: args.runFriendlyId, + err: err instanceof Error ? err.message : String(err), + }); + return null; + } + if (!entry) return null; + + if (!args.skipOrgMembershipCheck) { + const member = await prismaClient.orgMember.findFirst({ + where: { userId: args.userId, organizationId: entry.orgId }, + select: { id: true }, + }); + if (!member) return null; + } + + let raw: unknown; + try { + raw = deserialiseMollifierSnapshot(entry.payload); + } catch (err) { + logger.warn("buffered redirect: snapshot deserialise failed", { + runFriendlyId: args.runFriendlyId, + err: err instanceof Error ? err.message : String(err), + }); + return null; + } + + const parsed = BufferedSnapshotSchema.safeParse(raw); + if (!parsed.success) { + // Either the snapshot is from a different writer that doesn't carry + // environment slugs (in which case we genuinely can't build a URL) + // or a buffer-format drift snuck through. Log at debug; the caller + // 404s and the user sees the standard not-found page, not a 500. + logger.debug("buffered redirect: snapshot shape mismatch", { + runFriendlyId: args.runFriendlyId, + issues: parsed.error.issues.map((issue) => ({ + path: issue.path.join("."), + code: issue.code, + })), + }); + return null; + } + + return { + organizationSlug: parsed.data.environment.organization.slug, + projectSlug: parsed.data.environment.project.slug, + environmentSlug: parsed.data.environment.slug, + spanId: parsed.data.spanId, + }; +} diff --git a/apps/webapp/app/v3/mollifier/syntheticSpanRun.server.ts b/apps/webapp/app/v3/mollifier/syntheticSpanRun.server.ts new file mode 100644 index 0000000000..bdc7873304 --- /dev/null +++ b/apps/webapp/app/v3/mollifier/syntheticSpanRun.server.ts @@ -0,0 +1,184 @@ +import { prettyPrintPacket, RunAnnotations } from "@trigger.dev/core/v3"; +import { getMaxDuration } from "@trigger.dev/core/v3/isomorphic"; +import { + extractIdempotencyKeyScope, + getUserProvidedIdempotencyKey, +} from "@trigger.dev/core/v3/serverOnly"; +import { MachinePresetName } from "@trigger.dev/core/v3/schemas"; +import type { SpanRun } from "~/presenters/v3/SpanPresenter.server"; +import type { SyntheticRun } from "./readFallback.server"; + +// `SyntheticRun.machinePreset` is sourced from the snapshot payload as +// a plain string, but `SpanRun.machinePreset` is the narrowed enum. +// Validate against the canonical enum so an unknown / stale preset +// string collapses to undefined rather than fighting the type checker. +function narrowMachinePreset(value: string | undefined): SpanRun["machinePreset"] { + if (value === undefined) return undefined; + const parsed = MachinePresetName.safeParse(value); + return parsed.success ? parsed.data : undefined; +} + +// Synthesise a SpanRun-shaped object from a buffered run so the run-detail +// page's right-side details panel renders identically to a PG-resident +// run. The shape matches `SpanPresenter.getRun`'s return value; +// buffered-irrelevant fields (output, attempts, schedule, session, +// region, batch) are filled with sensible defaults, while terminal state +// (CANCELED / FAILED) is reflected into `status`, `isFinished`, `isError` +// and `error` so a finished buffered run does not render as PENDING. +// +// Pretty-printing for payload and metadata mirrors SpanPresenter so the +// UI receives data in the same shape. Buffered runs cannot use the +// `application/store` packet path (no R2 object yet) so we treat raw +// snapshot fields as inline packets. +export async function buildSyntheticSpanRun(args: { + run: SyntheticRun; + environment: { id: string; slug: string; type: "PRODUCTION" | "DEVELOPMENT" | "STAGING" | "PREVIEW" }; +}): Promise { + const { run, environment } = args; + + const payload = + typeof run.payload !== "undefined" && run.payload !== null + ? await prettyPrintPacket(run.payload, run.payloadType ?? undefined) + : undefined; + + const metadata = run.metadata + ? await prettyPrintPacket(run.metadata, run.metadataType, { + filteredKeys: ["$$streams", "$$streamsVersion", "$$streamsBaseUrl"], + }) + : undefined; + + const idempotencyShape = { + idempotencyKey: run.idempotencyKey ?? null, + idempotencyKeyExpiresAt: null, + idempotencyKeyOptions: run.idempotencyKeyOptions ?? null, + }; + + const idempotencyKey = getUserProvidedIdempotencyKey(idempotencyShape); + const idempotencyKeyScope = extractIdempotencyKeyScope(idempotencyShape); + const idempotencyKeyStatus: SpanRun["idempotencyKeyStatus"] = idempotencyKey + ? "active" + : idempotencyKeyScope + ? "inactive" + : undefined; + + const taskKind = RunAnnotations.safeParse(run.annotations).data?.taskKind; + const isAgentRun = taskKind === "AGENT"; + + const queueName = run.queue ?? "task/"; + const isCancelled = run.status === "CANCELED"; + const isFailed = run.status === "FAILED"; + + // The run-detail panel derives terminal/error state from `status`, + // `isFinished` and `isError` (SpanPresenter.getRun -> isFinalRunStatus / + // isFailedRunStatus). Buffered FAILED runs surface as SYSTEM_FAILURE to + // match ApiRetrieveRunPresenter.bufferedStatusToTaskRunStatus; both + // CANCELED and SYSTEM_FAILURE are final run statuses, and SYSTEM_FAILURE + // is also a failed status. + const status: SpanRun["status"] = isCancelled + ? "CANCELED" + : isFailed + ? "SYSTEM_FAILURE" + : "PENDING"; + + // Mirror ApiRetrieveRunPresenter's STRING_ERROR synthesis so the panel + // shows why a buffered run failed instead of an empty error block. + const error: SpanRun["error"] = + isFailed && run.error + ? { type: "STRING_ERROR", raw: `${run.error.code}: ${run.error.message}` } + : undefined; + + return { + id: run.id, + friendlyId: run.friendlyId, + status, + statusReason: isCancelled + ? run.cancelReason ?? undefined + : isFailed + ? run.error?.message ?? undefined + : undefined, + createdAt: run.createdAt, + startedAt: null, + executedAt: null, + updatedAt: run.cancelledAt ?? run.createdAt, + delayUntil: run.delayUntil ?? null, + expiredAt: null, + completedAt: run.cancelledAt ?? null, + logsDeletedAt: null, + ttl: run.ttl ?? null, + taskIdentifier: run.taskIdentifier ?? "", + version: undefined, + sdkVersion: undefined, + runtime: undefined, + runtimeVersion: undefined, + isTest: run.isTest, + replayedFromTaskRunFriendlyId: run.replayedFromTaskRunFriendlyId ?? null, + environmentId: environment.id, + idempotencyKey, + idempotencyKeyExpiresAt: null, + idempotencyKeyScope, + idempotencyKeyStatus, + debounce: null, + schedule: undefined, + queue: { + name: queueName, + isCustomQueue: !queueName.startsWith("task/"), + concurrencyKey: run.concurrencyKey ?? null, + }, + tags: run.runTags, + baseCostInCents: 0, + costInCents: 0, + totalCostInCents: 0, + usageDurationMs: 0, + isFinished: isCancelled || isFailed, + isRunning: false, + isError: isFailed, + isAgentRun, + payload, + payloadType: run.payloadType ?? "application/json", + output: undefined, + outputType: "application/json", + error, + // The snapshot only carries the root/parent friendly IDs, not the + // spanId or taskIdentifier that SpanPresenter sources from the joined + // PG rows. Emitting them with empty-string stubs renders a blank task + // name and a misleading `?span=` jump target, so we omit the + // relationships until the drainer materialises the row (a transient + // window). Top-level buffered runs have no relationships regardless. + relationships: { + root: undefined, + parent: undefined, + }, + context: JSON.stringify( + { + task: { + id: run.taskIdentifier ?? "", + }, + run: { + id: run.friendlyId, + createdAt: run.createdAt, + isTest: run.isTest, + }, + environment: { + id: environment.id, + slug: environment.slug, + type: environment.type, + }, + }, + null, + 2, + ), + metadata, + maxDurationInSeconds: getMaxDuration(run.maxDurationInSeconds), + batch: undefined, + session: undefined, + engine: "V2", + region: null, + workerQueue: run.workerQueue ?? "", + traceId: run.traceId ?? "", + spanId: run.spanId ?? "", + isCached: false, + machinePreset: narrowMachinePreset(run.machinePreset), + taskEventStore: "taskEvent", + externalTraceId: undefined, + }; +} diff --git a/apps/webapp/app/v3/mollifier/syntheticTrace.server.ts b/apps/webapp/app/v3/mollifier/syntheticTrace.server.ts new file mode 100644 index 0000000000..ee0d518e2e --- /dev/null +++ b/apps/webapp/app/v3/mollifier/syntheticTrace.server.ts @@ -0,0 +1,76 @@ +import { millisecondsToNanoseconds } from "@trigger.dev/core/v3"; +import { createTreeFromFlatItems, flattenTree } from "~/components/primitives/TreeView/TreeView"; +import { createTimelineSpanEventsFromSpanEvents } from "~/utils/timelineSpanEvents"; +import type { SpanSummary } from "~/v3/eventRepository/eventRepository.types"; +import type { SyntheticRun } from "./readFallback.server"; + +// Build a single-span trace for a buffered run so the run-detail page +// renders a meaningful timeline before the drainer materialises the +// row. Mirrors the shape produced by `RunPresenter` when its trace +// store lookup returns no spans, so the dashboard consumer treats the +// buffered run identically to a freshly enqueued PG run that hasn't +// emitted any events yet. +export function buildSyntheticTraceForBufferedRun(run: SyntheticRun) { + const spanId = run.spanId ?? ""; + const isCancelled = run.status === "CANCELED"; + const isFailed = run.status === "FAILED"; + const span: SpanSummary = { + id: spanId, + parentId: run.parentSpanId, + runId: run.friendlyId, + data: { + message: run.taskIdentifier ?? "Task", + style: { icon: "task", variant: "primary" }, + events: [], + startTime: run.createdAt, + duration: 0, + isError: isFailed, + // CANCELED and FAILED are terminal; only a still-queued buffered run + // is partial. A partial failed span would otherwise render as + // "executing" forever in the timeline. + isPartial: !isCancelled && !isFailed, + isCancelled, + isDebug: false, + level: "TRACE", + }, + }; + + const tree = createTreeFromFlatItems([span], spanId); + const treeRootStartTimeMs = tree?.data.startTime.getTime() ?? 0; + const totalDuration = Math.max(tree?.data.duration ?? 0, millisecondsToNanoseconds(1)); + + const events = tree + ? flattenTree(tree).map((n) => { + const offset = millisecondsToNanoseconds( + n.data.startTime.getTime() - treeRootStartTimeMs + ); + return { + ...n, + data: { + ...n.data, + timelineEvents: createTimelineSpanEventsFromSpanEvents(n.data.events, false, treeRootStartTimeMs), + duration: n.data.isPartial ? null : n.data.duration, + offset, + isRoot: n.id === spanId, + }, + }; + }) + : []; + + return { + // Matches RunPresenter's derivation: failed root span -> "failed", + // otherwise a terminal (non-partial) span -> "completed", else + // "executing". CANCELED is terminal-but-not-error, so "completed". + rootSpanStatus: (isFailed ? "failed" : isCancelled ? "completed" : "executing") as + | "executing" + | "completed" + | "failed", + events, + duration: totalDuration, + rootStartedAt: tree?.data.startTime, + startedAt: null, + queuedDuration: undefined, + overridesBySpanId: undefined, + linkedRunIdBySpanId: {} as Record, + }; +} diff --git a/apps/webapp/test/mollifierReadFallback.test.ts b/apps/webapp/test/mollifierReadFallback.test.ts index 1db167eea6..fcee3fb95f 100644 --- a/apps/webapp/test/mollifierReadFallback.test.ts +++ b/apps/webapp/test/mollifierReadFallback.test.ts @@ -260,6 +260,45 @@ describe("findRunByIdWithMollifierFallback", () => { expect(result!.runTags).toEqual(["t1", "t2"]); }); + it("extracts batchId from the snapshot's nested batch object (engine.trigger shape)", async () => { + const entry: BufferEntry = { + runId: "run_1", + envId: "env_a", + orgId: "org_1", + payload: JSON.stringify({ + taskIdentifier: "t", + // The engine.trigger input nests the batch as `{ id, index }`, + // where `id` is the batch's internal cuid (not a flat `batchId`). + batch: { id: "batch_internal_cuid", index: 3 }, + }), + status: "QUEUED", + attempts: 0, + createdAt: NOW, + }; + const result = await findRunByIdWithMollifierFallback( + { runId: "run_1", environmentId: "env_a", organizationId: "org_1" }, + { getBuffer: () => fakeBuffer(entry) }, + ); + expect(result!.batchId).toBe("batch_internal_cuid"); + }); + + it("leaves batchId undefined when the snapshot has no batch (non-batched run)", async () => { + const entry: BufferEntry = { + runId: "run_1", + envId: "env_a", + orgId: "org_1", + payload: JSON.stringify({ taskIdentifier: "t" }), + status: "QUEUED", + attempts: 0, + createdAt: NOW, + }; + const result = await findRunByIdWithMollifierFallback( + { runId: "run_1", environmentId: "env_a", organizationId: "org_1" }, + { getBuffer: () => fakeBuffer(entry) }, + ); + expect(result!.batchId).toBeUndefined(); + }); + it("treats invalid date strings as undefined and does not mis-classify status as CANCELED", async () => { const entry: BufferEntry = { runId: "run_1", diff --git a/apps/webapp/test/mollifierSynthesiseFoundRun.test.ts b/apps/webapp/test/mollifierSynthesiseFoundRun.test.ts new file mode 100644 index 0000000000..07da5761cb --- /dev/null +++ b/apps/webapp/test/mollifierSynthesiseFoundRun.test.ts @@ -0,0 +1,197 @@ +import { describe, expect, it, vi } from "vitest"; + +vi.mock("~/db.server", () => ({ prisma: {}, $replica: {} })); + +import { + synthesiseFoundRunFromBuffer, + type FoundRun, +} from "~/presenters/v3/ApiRetrieveRunPresenter.server"; +import type { SyntheticRun } from "~/v3/mollifier/readFallback.server"; + +const NOW = new Date("2026-05-24T10:00:00Z"); + +function makeSyntheticRun(overrides: Partial = {}): SyntheticRun { + return { + id: "run_internal_1", + friendlyId: "run_friendly_1", + status: "QUEUED", + cancelledAt: undefined, + cancelReason: undefined, + delayUntil: undefined, + taskIdentifier: "hello-world", + createdAt: NOW, + payload: '{"hello":"world"}', + payloadType: "application/json", + metadata: undefined, + metadataType: undefined, + seedMetadata: undefined, + seedMetadataType: undefined, + idempotencyKey: undefined, + idempotencyKeyOptions: undefined, + isTest: false, + depth: 0, + ttl: undefined, + tags: ["alpha", "beta"], + runTags: ["alpha", "beta"], + lockedToVersion: undefined, + resumeParentOnCompletion: false, + parentTaskRunId: undefined, + traceId: "trace_1", + spanId: "span_1", + parentSpanId: undefined, + runtimeEnvironmentId: "env_a", + engine: "V2", + workerQueue: undefined, + queue: undefined, + concurrencyKey: undefined, + machinePreset: undefined, + realtimeStreamsVersion: undefined, + maxAttempts: undefined, + maxDurationInSeconds: undefined, + replayedFromTaskRunFriendlyId: undefined, + annotations: undefined, + traceContext: undefined, + scheduleId: undefined, + batchId: undefined, + parentTaskRunFriendlyId: undefined, + rootTaskRunFriendlyId: undefined, + ...overrides, + }; +} + +describe("synthesiseFoundRunFromBuffer", () => { + it("populates internal id and friendlyId so downstream logging keys off the cuid", () => { + const found: FoundRun = synthesiseFoundRunFromBuffer(makeSyntheticRun()); + expect(found.id).toBe("run_internal_1"); + expect(found.friendlyId).toBe("run_friendly_1"); + }); + + it("forwards scheduleId from the snapshot so resolveSchedule can hydrate the schedule field", () => { + // Regression: scheduleId was previously hardcoded to null, dropping the + // schedule metadata for buffered scheduled runs even though the snapshot + // carries it (readFallback.server.ts extracts snapshot.scheduleId). + const found = synthesiseFoundRunFromBuffer( + makeSyntheticRun({ scheduleId: "schedule_internal_42" }) + ); + expect(found.scheduleId).toBe("schedule_internal_42"); + }); + + it("leaves scheduleId null when the snapshot has no scheduleId (non-scheduled trigger)", () => { + const found = synthesiseFoundRunFromBuffer(makeSyntheticRun()); + expect(found.scheduleId).toBeNull(); + }); + + it("reconstructs batch.friendlyId from snapshot.batchId so batch-scoped JWTs authorise", () => { + // Regression: batch was previously hardcoded to null, so the + // route-authorization callbacks (which read run.batch?.friendlyId) + // skipped pushing the batch resource — a batch-scoped JWT 403'd on + // buffered batched runs. + const found = synthesiseFoundRunFromBuffer( + // BatchId.toFriendlyId encodes the internal id with a "batch_" prefix. + makeSyntheticRun({ batchId: "abcdefghijklmnopqrstuvwx" }) + ); + expect(found.batch).not.toBeNull(); + expect(found.batch!.id).toBe("abcdefghijklmnopqrstuvwx"); + expect(found.batch!.friendlyId).toMatch(/^batch_/); + }); + + it("leaves batch null when the snapshot has no batchId (non-batched run)", () => { + const found = synthesiseFoundRunFromBuffer(makeSyntheticRun()); + expect(found.batch).toBeNull(); + }); + + it("defaults workerQueue to '' so createCommonRunStructure coerces region to undefined", () => { + // Regression: workerQueue previously defaulted to "main", which fed + // through `run.workerQueue || undefined` as the API response's + // `region` — advertising a not-yet-assigned region. + const found = synthesiseFoundRunFromBuffer(makeSyntheticRun({ workerQueue: undefined })); + expect(found.workerQueue).toBe(""); + }); + + it("passes through an explicit workerQueue from the snapshot unchanged", () => { + const found = synthesiseFoundRunFromBuffer( + makeSyntheticRun({ workerQueue: "us-east-1" }) + ); + expect(found.workerQueue).toBe("us-east-1"); + }); + + it("maps buffered FAILED to SYSTEM_FAILURE so the API surfaces the failure", () => { + const found = synthesiseFoundRunFromBuffer( + makeSyntheticRun({ + status: "FAILED", + error: { code: "GATE_REJECTED", message: "buffer rejected the run" }, + }) + ); + expect(found.status).toBe("SYSTEM_FAILURE"); + expect(found.error).toEqual({ + type: "STRING_ERROR", + raw: "GATE_REJECTED: buffer rejected the run", + }); + }); + + it("maps buffered CANCELED to CANCELED with completedAt populated from cancelledAt", () => { + const cancelledAt = new Date("2026-05-24T10:05:00Z"); + const found = synthesiseFoundRunFromBuffer( + makeSyntheticRun({ status: "CANCELED", cancelledAt }) + ); + expect(found.status).toBe("CANCELED"); + expect(found.completedAt).toEqual(cancelledAt); + }); + + it("maps buffered QUEUED to PENDING with no error and no completedAt", () => { + const found = synthesiseFoundRunFromBuffer(makeSyntheticRun({ status: "QUEUED" })); + expect(found.status).toBe("PENDING"); + expect(found.error).toBeNull(); + expect(found.completedAt).toBeNull(); + }); + + it("passes through a string snapshot.metadata unchanged", () => { + const found = synthesiseFoundRunFromBuffer( + makeSyntheticRun({ metadata: '{"customer":"acme"}' }) + ); + expect(found.metadata).toBe('{"customer":"acme"}'); + }); + + it("defensively coerces a non-string snapshot.metadata to a JSON string instead of dropping it silently", () => { + // Production never writes non-string metadata, but if the snapshot + // shape drifts we'd rather see the value (with a warn log) than have + // it disappear. + const found = synthesiseFoundRunFromBuffer( + makeSyntheticRun({ metadata: { customer: "acme" } }) + ); + expect(found.metadata).toBe('{"customer":"acme"}'); + }); + + it("defaults idempotencyKey / idempotencyKeyOptions to null when absent", () => { + const found = synthesiseFoundRunFromBuffer(makeSyntheticRun()); + expect(found.idempotencyKey).toBeNull(); + expect(found.idempotencyKeyOptions).toBeNull(); + }); + + it("zeroes execution-state fields that aren't meaningful for a buffered run", () => { + const found = synthesiseFoundRunFromBuffer(makeSyntheticRun()); + expect(found.startedAt).toBeNull(); + expect(found.attempts).toEqual([]); + expect(found.attemptNumber).toBeNull(); + expect(found.parentTaskRun).toBeNull(); + expect(found.rootTaskRun).toBeNull(); + expect(found.childRuns).toEqual([]); + expect(found.output).toBeNull(); + expect(found.costInCents).toBe(0); + expect(found.baseCostInCents).toBe(0); + expect(found.usageDurationMs).toBe(0); + }); + + it("forwards runTags from the snapshot tags array", () => { + const found = synthesiseFoundRunFromBuffer( + makeSyntheticRun({ tags: ["alpha", "beta"] }) + ); + expect(found.runTags).toEqual(["alpha", "beta"]); + }); + + it("pins engine to V2 and taskEventStore to taskEvent (only valid values for a buffered run)", () => { + const found = synthesiseFoundRunFromBuffer(makeSyntheticRun()); + expect(found.engine).toBe("V2"); + expect(found.taskEventStore).toBe("taskEvent"); + }); +}); diff --git a/apps/webapp/test/mollifierSyntheticApiResponses.test.ts b/apps/webapp/test/mollifierSyntheticApiResponses.test.ts new file mode 100644 index 0000000000..94ee67c858 --- /dev/null +++ b/apps/webapp/test/mollifierSyntheticApiResponses.test.ts @@ -0,0 +1,164 @@ +import { describe, expect, it, vi } from "vitest"; + +vi.mock("~/db.server", () => ({ prisma: {}, $replica: {} })); + +import { + buildSyntheticSpanDetailBody, + buildSyntheticTraceBody, +} from "~/v3/mollifier/syntheticApiResponses.server"; +import type { SyntheticRun } from "~/v3/mollifier/readFallback.server"; + +const NOW = new Date("2026-05-23T10:00:00Z"); + +function makeSyntheticRun(overrides: Partial = {}): SyntheticRun { + return { + id: "run_internal_1", + friendlyId: "run_friendly_1", + status: "QUEUED", + cancelledAt: undefined, + cancelReason: undefined, + delayUntil: undefined, + taskIdentifier: "hello-world", + createdAt: NOW, + payload: undefined, + payloadType: undefined, + metadata: undefined, + metadataType: undefined, + seedMetadata: undefined, + seedMetadataType: undefined, + idempotencyKey: undefined, + idempotencyKeyOptions: undefined, + isTest: false, + depth: 0, + ttl: undefined, + tags: [], + runTags: [], + lockedToVersion: undefined, + resumeParentOnCompletion: false, + parentTaskRunId: undefined, + traceId: "trace_1", + spanId: "span_1", + parentSpanId: "span_parent", + runtimeEnvironmentId: "env_a", + engine: "V2", + workerQueue: undefined, + queue: "task/hello-world", + concurrencyKey: undefined, + machinePreset: "small-1x", + realtimeStreamsVersion: undefined, + maxAttempts: undefined, + maxDurationInSeconds: undefined, + replayedFromTaskRunFriendlyId: undefined, + annotations: undefined, + traceContext: undefined, + scheduleId: undefined, + batchId: undefined, + parentTaskRunFriendlyId: undefined, + rootTaskRunFriendlyId: undefined, + ...overrides, + }; +} + +describe("buildSyntheticSpanDetailBody", () => { + it("populates identity fields from the buffered run", () => { + const body = buildSyntheticSpanDetailBody(makeSyntheticRun()); + expect(body.spanId).toBe("span_1"); + expect(body.parentId).toBe("span_parent"); + expect(body.runId).toBe("run_friendly_1"); + expect(body.message).toBe("hello-world"); + expect(body.level).toBe("TRACE"); + expect(body.startTime).toEqual(NOW); + expect(body.durationMs).toBe(0); + }); + + it("defaults parentId to null when the buffered run has no parentSpanId", () => { + const body = buildSyntheticSpanDetailBody(makeSyntheticRun({ parentSpanId: undefined })); + expect(body.parentId).toBeNull(); + }); + + it("defaults message to '' when the buffered run has no taskIdentifier", () => { + const body = buildSyntheticSpanDetailBody( + makeSyntheticRun({ taskIdentifier: undefined }) + ); + expect(body.message).toBe(""); + }); + + it("renders a QUEUED buffered run as a still-partial, non-error, non-cancelled span", () => { + const body = buildSyntheticSpanDetailBody(makeSyntheticRun({ status: "QUEUED" })); + expect(body.isPartial).toBe(true); + expect(body.isError).toBe(false); + expect(body.isCancelled).toBe(false); + }); + + it("renders a CANCELED buffered run as a non-partial, non-error, cancelled span", () => { + const body = buildSyntheticSpanDetailBody(makeSyntheticRun({ status: "CANCELED" })); + expect(body.isPartial).toBe(false); + expect(body.isError).toBe(false); + expect(body.isCancelled).toBe(true); + }); + + it("renders a FAILED buffered run as a non-partial, errored, non-cancelled span", () => { + // Regression: a FAILED buffered run used to slip through as + // `isPartial: true, isError: false`, telling SDK pollers it was still + // executing. + const body = buildSyntheticSpanDetailBody(makeSyntheticRun({ status: "FAILED" })); + expect(body.isPartial).toBe(false); + expect(body.isError).toBe(true); + expect(body.isCancelled).toBe(false); + }); +}); + +describe("buildSyntheticTraceBody", () => { + it("envelopes the synthesised root span under `trace.rootSpan` with the buffered traceId", () => { + const body = buildSyntheticTraceBody(makeSyntheticRun()); + expect(body.trace.traceId).toBe("trace_1"); + expect(body.trace.rootSpan.id).toBe("span_1"); + expect(body.trace.rootSpan.runId).toBe("run_friendly_1"); + expect(body.trace.rootSpan.children).toEqual([]); + expect(body.trace.rootSpan.data.events).toEqual([]); + }); + + it("falls back to empty strings when traceId / spanId are absent from the snapshot", () => { + const body = buildSyntheticTraceBody( + makeSyntheticRun({ traceId: undefined, spanId: undefined }) + ); + expect(body.trace.traceId).toBe(""); + expect(body.trace.rootSpan.id).toBe(""); + }); + + it("passes through queueName and machinePreset from the snapshot", () => { + const body = buildSyntheticTraceBody(makeSyntheticRun()); + expect(body.trace.rootSpan.data.queueName).toBe("task/hello-world"); + expect(body.trace.rootSpan.data.machinePreset).toBe("small-1x"); + }); + + it("defaults taskSlug to undefined when the buffered run has no taskIdentifier", () => { + const body = buildSyntheticTraceBody(makeSyntheticRun({ taskIdentifier: undefined })); + expect(body.trace.rootSpan.data.taskSlug).toBeUndefined(); + expect(body.trace.rootSpan.data.message).toBe(""); + }); + + it("renders a QUEUED buffered run as a partial, non-error, non-cancelled root span", () => { + const body = buildSyntheticTraceBody(makeSyntheticRun({ status: "QUEUED" })); + expect(body.trace.rootSpan.data.isPartial).toBe(true); + expect(body.trace.rootSpan.data.isError).toBe(false); + expect(body.trace.rootSpan.data.isCancelled).toBe(false); + }); + + it("renders a CANCELED buffered run as a non-partial, non-error, cancelled root span", () => { + const body = buildSyntheticTraceBody(makeSyntheticRun({ status: "CANCELED" })); + expect(body.trace.rootSpan.data.isPartial).toBe(false); + expect(body.trace.rootSpan.data.isError).toBe(false); + expect(body.trace.rootSpan.data.isCancelled).toBe(true); + }); + + it("renders a FAILED buffered run as a non-partial, errored, non-cancelled root span", () => { + // Regression: a FAILED buffered run used to render with + // `isPartial: true, isError: false`, masking the failure from SDK + // consumers. + const body = buildSyntheticTraceBody(makeSyntheticRun({ status: "FAILED" })); + expect(body.trace.rootSpan.data.isPartial).toBe(false); + expect(body.trace.rootSpan.data.isError).toBe(true); + expect(body.trace.rootSpan.data.isCancelled).toBe(false); + }); +}); diff --git a/apps/webapp/test/mollifierSyntheticRedirectInfo.test.ts b/apps/webapp/test/mollifierSyntheticRedirectInfo.test.ts new file mode 100644 index 0000000000..a996b9de69 --- /dev/null +++ b/apps/webapp/test/mollifierSyntheticRedirectInfo.test.ts @@ -0,0 +1,197 @@ +import { describe, expect, vi } from "vitest"; +import { redisTest } from "@internal/testcontainers"; +import { MollifierBuffer } from "@trigger.dev/redis-worker"; + +vi.mock("~/db.server", () => ({ prisma: {}, $replica: {} })); + +import { findBufferedRunRedirectInfo } from "~/v3/mollifier/syntheticRedirectInfo.server"; + +const SNAPSHOT = { + spanId: "span_1", + environment: { + slug: "dev", + project: { slug: "hello-world-bN7m" }, + organization: { slug: "references-6120" }, + }, +}; + +function fakePrisma(member: { id: string } | null) { + return { + orgMember: { findFirst: vi.fn(async () => member) }, + } as unknown as Parameters[1]["prismaClient"]; +} + +describe("findBufferedRunRedirectInfo (testcontainers)", () => { + redisTest("returns slugs + spanId for a real buffer entry when user is a member", async ({ redisOptions }) => { + const buffer = new MollifierBuffer({ redisOptions }); + try { + await buffer.accept({ + runId: "run_real_1", + envId: "env_a", + orgId: "org_1", + payload: JSON.stringify(SNAPSHOT), + }); + const info = await findBufferedRunRedirectInfo( + { runFriendlyId: "run_real_1", userId: "user_1" }, + { getBuffer: () => buffer, prismaClient: fakePrisma({ id: "member_1" }) }, + ); + expect(info).toEqual({ + organizationSlug: "references-6120", + projectSlug: "hello-world-bN7m", + environmentSlug: "dev", + spanId: "span_1", + }); + } finally { + await buffer.close(); + } + }); + + redisTest("returns null when no buffer entry exists for the runId", async ({ redisOptions }) => { + const buffer = new MollifierBuffer({ redisOptions }); + try { + const info = await findBufferedRunRedirectInfo( + { runFriendlyId: "run_missing", userId: "user_1" }, + { getBuffer: () => buffer, prismaClient: fakePrisma({ id: "member_1" }) }, + ); + expect(info).toBeNull(); + } finally { + await buffer.close(); + } + }); + + redisTest("returns null when the user is not an org member (default check enforced)", async ({ redisOptions }) => { + const buffer = new MollifierBuffer({ redisOptions }); + try { + await buffer.accept({ + runId: "run_real_2", + envId: "env_a", + orgId: "org_1", + payload: JSON.stringify(SNAPSHOT), + }); + const info = await findBufferedRunRedirectInfo( + { runFriendlyId: "run_real_2", userId: "user_other" }, + { getBuffer: () => buffer, prismaClient: fakePrisma(null) }, + ); + expect(info).toBeNull(); + } finally { + await buffer.close(); + } + }); + + redisTest("skips the org-membership check when skipOrgMembershipCheck is set (admin path)", async ({ redisOptions }) => { + const buffer = new MollifierBuffer({ redisOptions }); + try { + await buffer.accept({ + runId: "run_real_3", + envId: "env_a", + orgId: "org_1", + payload: JSON.stringify(SNAPSHOT), + }); + const findFirst = vi.fn(); + const info = await findBufferedRunRedirectInfo( + { runFriendlyId: "run_real_3", userId: "user_admin", skipOrgMembershipCheck: true }, + { + getBuffer: () => buffer, + prismaClient: { orgMember: { findFirst } } as unknown as Parameters[1]["prismaClient"], + }, + ); + expect(info?.organizationSlug).toBe("references-6120"); + expect(findFirst).not.toHaveBeenCalled(); + } finally { + await buffer.close(); + } + }); + + redisTest("returns null when snapshot is malformed JSON", async ({ redisOptions }) => { + const buffer = new MollifierBuffer({ redisOptions }); + try { + await buffer.accept({ + runId: "run_real_4", + envId: "env_a", + orgId: "org_1", + payload: "{not-json", + }); + const info = await findBufferedRunRedirectInfo( + { runFriendlyId: "run_real_4", userId: "user_1" }, + { getBuffer: () => buffer, prismaClient: fakePrisma({ id: "member_1" }) }, + ); + expect(info).toBeNull(); + } finally { + await buffer.close(); + } + }); + + redisTest("returns null when snapshot lacks org/project slugs", async ({ redisOptions }) => { + const buffer = new MollifierBuffer({ redisOptions }); + try { + await buffer.accept({ + runId: "run_real_5", + envId: "env_a", + orgId: "org_1", + payload: JSON.stringify({ spanId: "s", environment: { slug: "dev" } }), + }); + const info = await findBufferedRunRedirectInfo( + { runFriendlyId: "run_real_5", userId: "user_1" }, + { getBuffer: () => buffer, prismaClient: fakePrisma({ id: "member_1" }) }, + ); + expect(info).toBeNull(); + } finally { + await buffer.close(); + } + }); + + redisTest("returns info with undefined spanId when snapshot has no spanId", async ({ redisOptions }) => { + const buffer = new MollifierBuffer({ redisOptions }); + try { + await buffer.accept({ + runId: "run_real_6", + envId: "env_a", + orgId: "org_1", + payload: JSON.stringify({ environment: SNAPSHOT.environment }), + }); + const info = await findBufferedRunRedirectInfo( + { runFriendlyId: "run_real_6", userId: "user_1" }, + { getBuffer: () => buffer, prismaClient: fakePrisma({ id: "member_1" }) }, + ); + expect(info?.spanId).toBeUndefined(); + expect(info?.environmentSlug).toBe("dev"); + } finally { + await buffer.close(); + } + }); + + redisTest( + "rejects snapshots where a slug is the wrong type (Zod guard, not just typeof)", + async ({ redisOptions }) => { + // Regression for the pre-Zod implementation: the slug check was + // `typeof slug !== "string"` so any string passed, including ones + // that should've been rejected on shape grounds. The Zod schema + // gives us full structural validation — a `slug: 42` (number) + // collapses into the parse-fail branch like any other shape + // mismatch and we return null instead of leaking a half-built + // redirect URL. + const buffer = new MollifierBuffer({ redisOptions }); + try { + await buffer.accept({ + runId: "run_real_7", + envId: "env_a", + orgId: "org_1", + payload: JSON.stringify({ + environment: { + slug: 42, + project: { slug: "p" }, + organization: { slug: "o" }, + }, + }), + }); + const info = await findBufferedRunRedirectInfo( + { runFriendlyId: "run_real_7", userId: "user_1" }, + { getBuffer: () => buffer, prismaClient: fakePrisma({ id: "member_1" }) }, + ); + expect(info).toBeNull(); + } finally { + await buffer.close(); + } + }, + ); +}); diff --git a/apps/webapp/test/mollifierSyntheticSpanRun.test.ts b/apps/webapp/test/mollifierSyntheticSpanRun.test.ts new file mode 100644 index 0000000000..72bc9f0278 --- /dev/null +++ b/apps/webapp/test/mollifierSyntheticSpanRun.test.ts @@ -0,0 +1,191 @@ +import { describe, expect, it, vi } from "vitest"; + +vi.mock("~/db.server", () => ({ prisma: {}, $replica: {} })); + +import { buildSyntheticSpanRun } from "~/v3/mollifier/syntheticSpanRun.server"; +import type { SyntheticRun } from "~/v3/mollifier/readFallback.server"; + +const NOW = new Date("2026-05-21T10:00:00Z"); + +function makeSyntheticRun(overrides: Partial = {}): SyntheticRun { + return { + id: "run_internal_1", + friendlyId: "run_friendly_1", + status: "QUEUED", + taskIdentifier: "hello-world", + createdAt: NOW, + payload: { message: "hi" }, + payloadType: "application/json", + metadata: undefined, + metadataType: undefined, + seedMetadata: undefined, + seedMetadataType: undefined, + idempotencyKey: undefined, + idempotencyKeyOptions: undefined, + isTest: false, + depth: 0, + ttl: "10m", + tags: ["a", "b"], + runTags: ["a", "b"], + lockedToVersion: undefined, + resumeParentOnCompletion: false, + parentTaskRunId: undefined, + traceId: "trace_1", + spanId: "span_1", + parentSpanId: undefined, + runtimeEnvironmentId: "env_a", + engine: "V2", + workerQueue: "worker-queue-1", + queue: "task/hello-world", + concurrencyKey: undefined, + machinePreset: "small-1x", + realtimeStreamsVersion: "v1", + maxAttempts: 3, + maxDurationInSeconds: 3600, + replayedFromTaskRunFriendlyId: undefined, + annotations: undefined, + traceContext: undefined, + scheduleId: undefined, + batchId: undefined, + parentTaskRunFriendlyId: undefined, + rootTaskRunFriendlyId: undefined, + ...overrides, + }; +} + +const ENV = { + id: "env_a", + slug: "dev", + type: "DEVELOPMENT" as const, +}; + +describe("buildSyntheticSpanRun", () => { + it("populates the core identity fields from the snapshot", async () => { + const synth = await buildSyntheticSpanRun({ run: makeSyntheticRun(), environment: ENV }); + expect(synth.id).toBe("run_internal_1"); + expect(synth.friendlyId).toBe("run_friendly_1"); + expect(synth.taskIdentifier).toBe("hello-world"); + expect(synth.traceId).toBe("trace_1"); + expect(synth.spanId).toBe("span_1"); + expect(synth.environmentId).toBe("env_a"); + expect(synth.engine).toBe("V2"); + expect(synth.workerQueue).toBe("worker-queue-1"); + }); + + it("reports PENDING status and the non-final flags", async () => { + const synth = await buildSyntheticSpanRun({ run: makeSyntheticRun(), environment: ENV }); + expect(synth.status).toBe("PENDING"); + expect(synth.isFinished).toBe(false); + expect(synth.isRunning).toBe(false); + expect(synth.isError).toBe(false); + expect(synth.startedAt).toBeNull(); + expect(synth.completedAt).toBeNull(); + }); + + it("pretty-prints the JSON payload from the snapshot", async () => { + const synth = await buildSyntheticSpanRun({ + run: makeSyntheticRun({ payload: { message: "hi" }, payloadType: "application/json" }), + environment: ENV, + }); + // prettyPrintPacket round-trips JSON with 2-space indent. + expect(synth.payload).toContain('"message": "hi"'); + expect(synth.payloadType).toBe("application/json"); + }); + + it("forwards runTags onto `tags` exactly", async () => { + const synth = await buildSyntheticSpanRun({ + run: makeSyntheticRun({ runTags: ["alpha", "beta"] }), + environment: ENV, + }); + expect(synth.tags).toEqual(["alpha", "beta"]); + }); + + it("classifies the queue name as custom when it does not start with 'task/'", async () => { + const taskQueue = await buildSyntheticSpanRun({ + run: makeSyntheticRun({ queue: "task/hello-world" }), + environment: ENV, + }); + expect(taskQueue.queue.isCustomQueue).toBe(false); + + const customQueue = await buildSyntheticSpanRun({ + run: makeSyntheticRun({ queue: "my-custom" }), + environment: ENV, + }); + expect(customQueue.queue.isCustomQueue).toBe(true); + }); + + it("derives idempotency status from the snapshot key/options", async () => { + const withKey = await buildSyntheticSpanRun({ + run: makeSyntheticRun({ idempotencyKey: "abc", idempotencyKeyOptions: ["scope"] }), + environment: ENV, + }); + expect(withKey.idempotencyKey).toBe("abc"); + expect(withKey.idempotencyKeyStatus).toBe("active"); + + const noKey = await buildSyntheticSpanRun({ + run: makeSyntheticRun({ idempotencyKey: undefined, idempotencyKeyOptions: undefined }), + environment: ENV, + }); + expect(noKey.idempotencyKeyStatus).toBeUndefined(); + }); + + it("omits relationships even when parent/root friendlyIds are present, since the snapshot lacks their spanId/taskIdentifier", async () => { + const synth = await buildSyntheticSpanRun({ + run: makeSyntheticRun({ + parentTaskRunFriendlyId: "run_parent", + rootTaskRunFriendlyId: "run_root", + }), + environment: ENV, + }); + expect(synth.relationships.parent).toBeUndefined(); + expect(synth.relationships.root).toBeUndefined(); + }); + + it("returns no relationship objects when the snapshot has no parent/root", async () => { + const synth = await buildSyntheticSpanRun({ + run: makeSyntheticRun(), + environment: ENV, + }); + expect(synth.relationships.parent).toBeUndefined(); + expect(synth.relationships.root).toBeUndefined(); + }); + + it("reflects a buffered CANCELED run as a finished, cancelled terminal state", async () => { + const synth = await buildSyntheticSpanRun({ + run: makeSyntheticRun({ + status: "CANCELED", + cancelledAt: NOW, + cancelReason: "cancelled by user", + }), + environment: ENV, + }); + expect(synth.status).toBe("CANCELED"); + expect(synth.statusReason).toBe("cancelled by user"); + expect(synth.isFinished).toBe(true); + expect(synth.isError).toBe(false); + expect(synth.completedAt).toEqual(NOW); + }); + + it("reflects a buffered FAILED run as a finished, errored SYSTEM_FAILURE", async () => { + const synth = await buildSyntheticSpanRun({ + run: makeSyntheticRun({ + status: "FAILED", + error: { code: "GATE_REJECTED", message: "buffer rejected the run" }, + }), + environment: ENV, + }); + expect(synth.status).toBe("SYSTEM_FAILURE"); + expect(synth.isFinished).toBe(true); + expect(synth.isError).toBe(true); + expect(synth.statusReason).toBe("buffer rejected the run"); + expect(synth.error).toEqual({ + type: "STRING_ERROR", + raw: "GATE_REJECTED: buffer rejected the run", + }); + }); + + it("flags the synthetic run as 'not cached' since cache lookup did not match it", async () => { + const synth = await buildSyntheticSpanRun({ run: makeSyntheticRun(), environment: ENV }); + expect(synth.isCached).toBe(false); + }); +}); diff --git a/apps/webapp/test/mollifierSyntheticTrace.test.ts b/apps/webapp/test/mollifierSyntheticTrace.test.ts new file mode 100644 index 0000000000..ac7425a8fe --- /dev/null +++ b/apps/webapp/test/mollifierSyntheticTrace.test.ts @@ -0,0 +1,149 @@ +import { describe, expect, it, vi } from "vitest"; + +vi.mock("~/db.server", () => ({ prisma: {}, $replica: {} })); + +import { buildSyntheticTraceForBufferedRun } from "~/v3/mollifier/syntheticTrace.server"; +import type { SyntheticRun } from "~/v3/mollifier/readFallback.server"; + +const NOW = new Date("2026-05-22T10:00:00Z"); +const ONE_MS_IN_NS = 1_000_000; + +function makeSyntheticRun(overrides: Partial = {}): SyntheticRun { + return { + id: "run_internal_1", + friendlyId: "run_friendly_1", + status: "QUEUED", + cancelledAt: undefined, + cancelReason: undefined, + delayUntil: undefined, + taskIdentifier: "hello-world", + createdAt: NOW, + payload: undefined, + payloadType: undefined, + metadata: undefined, + metadataType: undefined, + seedMetadata: undefined, + seedMetadataType: undefined, + idempotencyKey: undefined, + idempotencyKeyOptions: undefined, + isTest: false, + depth: 0, + ttl: undefined, + tags: [], + runTags: [], + lockedToVersion: undefined, + resumeParentOnCompletion: false, + parentTaskRunId: undefined, + traceId: "trace_1", + spanId: "span_1", + parentSpanId: undefined, + runtimeEnvironmentId: "env_a", + engine: "V2", + workerQueue: undefined, + queue: undefined, + concurrencyKey: undefined, + machinePreset: undefined, + realtimeStreamsVersion: undefined, + maxAttempts: undefined, + maxDurationInSeconds: undefined, + replayedFromTaskRunFriendlyId: undefined, + annotations: undefined, + traceContext: undefined, + scheduleId: undefined, + batchId: undefined, + parentTaskRunFriendlyId: undefined, + rootTaskRunFriendlyId: undefined, + ...overrides, + }; +} + +describe("buildSyntheticTraceForBufferedRun", () => { + it("populates the synthesised root span from snapshot identity fields", () => { + const trace = buildSyntheticTraceForBufferedRun(makeSyntheticRun()); + expect(trace.events).toHaveLength(1); + const root = trace.events[0]; + expect(root.id).toBe("span_1"); + expect(root.data.message).toBe("hello-world"); + expect(root.data.startTime).toEqual(NOW); + expect(root.data.isRoot).toBe(true); + expect(root.data.offset).toBe(0); + expect(root.data.level).toBe("TRACE"); + }); + + it("defaults the span message to 'Task' when the snapshot has no taskIdentifier", () => { + const trace = buildSyntheticTraceForBufferedRun( + makeSyntheticRun({ taskIdentifier: undefined }) + ); + expect(trace.events[0].data.message).toBe("Task"); + }); + + it("falls back to an empty-string span id when the snapshot has no spanId", () => { + const trace = buildSyntheticTraceForBufferedRun( + makeSyntheticRun({ spanId: undefined }) + ); + expect(trace.events[0].id).toBe(""); + // Empty id still marks as root (it matches the rootId fallback). + expect(trace.events[0].data.isRoot).toBe(true); + }); + + it("renders a QUEUED buffered run as an executing, partial root span", () => { + const trace = buildSyntheticTraceForBufferedRun(makeSyntheticRun({ status: "QUEUED" })); + expect(trace.rootSpanStatus).toBe("executing"); + expect(trace.events[0].data.isPartial).toBe(true); + expect(trace.events[0].data.isError).toBe(false); + expect(trace.events[0].data.isCancelled).toBe(false); + // A partial span exposes duration as null (the timeline reads it as + // "still running"); see syntheticTrace.server.ts duration mapping. + expect(trace.events[0].data.duration).toBeNull(); + }); + + it("renders a CANCELED buffered run as a completed, non-partial cancelled span", () => { + const trace = buildSyntheticTraceForBufferedRun( + makeSyntheticRun({ status: "CANCELED", cancelledAt: NOW }) + ); + expect(trace.rootSpanStatus).toBe("completed"); + expect(trace.events[0].data.isPartial).toBe(false); + expect(trace.events[0].data.isCancelled).toBe(true); + expect(trace.events[0].data.isError).toBe(false); + // Non-partial: duration is the span's numeric value (0 here), not null. + expect(trace.events[0].data.duration).toBe(0); + }); + + it("renders a FAILED buffered run as a failed, non-partial errored span", () => { + const trace = buildSyntheticTraceForBufferedRun( + makeSyntheticRun({ + status: "FAILED", + error: { code: "GATE_REJECTED", message: "buffer rejected the run" }, + }) + ); + expect(trace.rootSpanStatus).toBe("failed"); + expect(trace.events[0].data.isPartial).toBe(false); + expect(trace.events[0].data.isError).toBe(true); + expect(trace.events[0].data.isCancelled).toBe(false); + expect(trace.events[0].data.duration).toBe(0); + }); + + it("floors the trace duration to a minimum of 1ms (in nanoseconds) so the timeline has a positive extent", () => { + const trace = buildSyntheticTraceForBufferedRun(makeSyntheticRun()); + expect(trace.duration).toBe(ONE_MS_IN_NS); + }); + + it("reports the buffered createdAt as the trace's rootStartedAt and leaves startedAt null", () => { + const trace = buildSyntheticTraceForBufferedRun(makeSyntheticRun()); + expect(trace.rootStartedAt).toEqual(NOW); + expect(trace.startedAt).toBeNull(); + }); + + it("returns no link or override metadata (buffered traces are single-span)", () => { + const trace = buildSyntheticTraceForBufferedRun(makeSyntheticRun()); + expect(trace.linkedRunIdBySpanId).toEqual({}); + expect(trace.overridesBySpanId).toBeUndefined(); + expect(trace.queuedDuration).toBeUndefined(); + }); + + it("synthesises an empty events list (no timeline events from the buffer)", () => { + const trace = buildSyntheticTraceForBufferedRun(makeSyntheticRun()); + expect(trace.events[0].data.events).toEqual([]); + expect(trace.events[0].data.timelineEvents).toEqual([]); + }); +});