Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/mollifier-reads.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: feature
---

Mollifier API read-fallback: serve buffered runs from synthetic run/trace/span data on the retrieve, trace, spans, and attempts endpoints.
201 changes: 198 additions & 3 deletions apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts
Comment thread
d-cs marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,17 @@ import {
logger,
} from "@trigger.dev/core/v3";
import { parsePacketAsJson } from "@trigger.dev/core/v3/utils/ioSerialization";
import { BatchId } from "@trigger.dev/core/v3/isomorphic";
import { getUserProvidedIdempotencyKey } from "@trigger.dev/core/v3/serverOnly";
import { Prisma, TaskRunAttemptStatus, TaskRunStatus } from "@trigger.dev/database";
import assertNever from "assert-never";
import { API_VERSIONS, CURRENT_API_VERSION, RunStatusUnspecifiedApiVersion } from "~/api/versions";
import { $replica, prisma } from "~/db.server";
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import {
findRunByIdWithMollifierFallback,
type SyntheticRun,
} from "~/v3/mollifier/readFallback.server";
import { generatePresignedUrl } from "~/v3/objectStore.server";
import { tracer } from "~/v3/tracer.server";
import { startSpanWithEnv } from "~/v3/tracing.server";
Expand Down Expand Up @@ -64,13 +69,37 @@ type CommonRelatedRun = Prisma.Result<
"findFirstOrThrow"
>;

type FoundRun = NonNullable<Awaited<ReturnType<typeof ApiRetrieveRunPresenter.findRun>>>;
// Full shape returned by findRun() — the commonRunSelect fields plus the
// extras the route handler reads. Declared explicitly (not inferred via
// ReturnType<typeof findRun>) so findRun can return a synthesised buffered
// run without the type becoming self-referential.
// Exported so the buffer-synthesis helper below can be unit-tested
// against a stable shape without re-deriving it (FoundRun's exact field
// list is what the buffered run must match for `call()` not to surprise).
export type FoundRun = CommonRelatedRun & {
traceId: string;
payload: string;
payloadType: string;
output: string | null;
outputType: string;
error: Prisma.JsonValue;
attempts: { id: string }[];
attemptNumber: number | null;
engine: "V1" | "V2";
taskEventStore: string;
parentTaskRun: CommonRelatedRun | null;
rootTaskRun: CommonRelatedRun | null;
childRuns: CommonRelatedRun[];
};

export class ApiRetrieveRunPresenter {
constructor(private readonly apiVersion: API_VERSIONS) {}

public static async findRun(friendlyId: string, env: AuthenticatedEnvironment) {
return $replica.taskRun.findFirst({
public static async findRun(
friendlyId: string,
env: AuthenticatedEnvironment,
): Promise<FoundRun | null> {
const pgRow = await $replica.taskRun.findFirst({
where: {
friendlyId,
runtimeEnvironmentId: env.id,
Expand Down Expand Up @@ -102,6 +131,23 @@ export class ApiRetrieveRunPresenter {
},
},
});

if (pgRow) return pgRow;

// Postgres miss → fall back to the mollifier buffer. When the gate
// diverted a trigger, the run lives in Redis until the drainer replays
// it through engine.trigger. Synthesise the FoundRun shape so call()
// returns a `QUEUED` (or `FAILED`) response with empty output, no
// attempts, no relations.
const buffered = await findRunByIdWithMollifierFallback({
runId: friendlyId,
environmentId: env.id,
organizationId: env.organizationId,
});

if (!buffered) return null;

return synthesiseFoundRunFromBuffer(buffered);
}

public async call(taskRun: FoundRun, env: AuthenticatedEnvironment) {
Expand Down Expand Up @@ -475,3 +521,152 @@ function resolveTriggerFunction(run: CommonRelatedRun): TriggerFunction {
return run.resumeParentOnCompletion ? "triggerAndWait" : "trigger";
}
}

// Build a FoundRun-shaped object from a buffered (mollified) run. The run
// is in the Redis buffer; engine.trigger hasn't created the Postgres row
// yet, so every field that comes from execution state (output, attempts,
// completedAt, cost, relations) takes a default. The presenter's call()
// handles QUEUED-state runs without surprise.
function bufferedStatusToTaskRunStatus(status: SyntheticRun["status"]): TaskRunStatus {
switch (status) {
case "FAILED":
return "SYSTEM_FAILURE";
case "CANCELED":
return "CANCELED";
default:
return "PENDING";
}
}

// The PG path stores `TaskRun.payload` as `String?`, so in production
// the buffered snapshot's `payload` is always a string. We defensively
// coerce other types instead of silently dropping them: an object gets
// JSON-stringified (matches how the trigger path would serialise it),
// anything truly unrenderable falls back to an empty string. The log
// line surfaces format drift to ops without crashing the read path.
function synthesisePayload(buffered: SyntheticRun): string {
const payload = buffered.payload;
if (typeof payload === "string") return payload;
if (payload === undefined || payload === null) return "";
try {
const serialised = JSON.stringify(payload);
logger.warn("ApiRetrieveRunPresenter: buffered snapshot.payload non-string coerced", {
runFriendlyId: buffered.friendlyId,
payloadType: typeof payload,
});
return typeof serialised === "string" ? serialised : "";
} catch {
logger.error("ApiRetrieveRunPresenter: buffered snapshot.payload unserialisable", {
runFriendlyId: buffered.friendlyId,
payloadType: typeof payload,
});
return "";
}
}

// Mirror synthesisePayload for metadata. The PG path stores
// `TaskRun.metadata` as `String?`, and the snapshot writes it from
// `metadataPacket.data` (also a string), so in production it is always a
// string or absent. We coerce defensively — an object gets JSON-stringified
// (matching how the trigger path serialises it) rather than silently
// dropped to null, and the log line surfaces format drift to ops.
function synthesiseMetadata(buffered: SyntheticRun): string | null {
const metadata = buffered.metadata;
if (typeof metadata === "string") return metadata;
if (metadata === undefined || metadata === null) return null;
try {
const serialised = JSON.stringify(metadata);
logger.warn("ApiRetrieveRunPresenter: buffered snapshot.metadata non-string coerced", {
runFriendlyId: buffered.friendlyId,
metadataType: typeof metadata,
});
return typeof serialised === "string" ? serialised : null;
} catch {
logger.error("ApiRetrieveRunPresenter: buffered snapshot.metadata unserialisable", {
runFriendlyId: buffered.friendlyId,
metadataType: typeof metadata,
});
return null;
}
}

// Exported for unit testing. Used by `findRun()` above when the
// Postgres lookup misses and the buffer carries the run — keep the shape
// in lockstep with `FoundRun`'s field list so `call()` treats a synthesised
// buffered run identically to a freshly-triggered PG row.
export function synthesiseFoundRunFromBuffer(buffered: SyntheticRun): FoundRun {
const status: TaskRunStatus = bufferedStatusToTaskRunStatus(buffered.status);

const errorJson: Prisma.JsonValue = buffered.error
? {
type: "STRING_ERROR",
raw: `${buffered.error.code}: ${buffered.error.message}`,
}
: null;

const metadata: string | null = synthesiseMetadata(buffered);

return {
// `id` is the internal cuid (Prisma TaskRun.id column), `friendlyId`
// is the user-facing `run_xxx` token. Downstream logging keyed off
// `taskRun.id` correlates with other systems via the cuid — using
// the friendlyId here breaks log correlation. `SyntheticRun` carries
// the cuid alongside the friendlyId for exactly this reason
// (RunId.fromFriendlyId in readFallback.server.ts).
id: buffered.id,
friendlyId: buffered.friendlyId,
status,
taskIdentifier: buffered.taskIdentifier ?? "",
createdAt: buffered.createdAt,
startedAt: null,
updatedAt: buffered.cancelledAt ?? buffered.createdAt,
completedAt: buffered.cancelledAt ?? null,
expiredAt: null,
delayUntil: buffered.delayUntil ?? null,
metadata,
metadataType: buffered.metadataType ?? "application/json",
ttl: buffered.ttl ?? null,
costInCents: 0,
baseCostInCents: 0,
usageDurationMs: 0,
idempotencyKey: buffered.idempotencyKey ?? null,
idempotencyKeyOptions: buffered.idempotencyKeyOptions ?? null,
isTest: buffered.isTest,
depth: buffered.depth,
// Scheduled triggers go through the same TriggerTaskService path as
// API triggers and aren't bypassed by the mollifier gate, so a
// scheduled run can land in the buffer with its scheduleId set on the
// snapshot. Forward it so resolveSchedule() can hydrate the `schedule`
// field in the API response instead of silently dropping it until the
// drainer materialises.
scheduleId: buffered.scheduleId ?? null,
lockedToVersion: buffered.lockedToVersion ? { version: buffered.lockedToVersion } : null,
resumeParentOnCompletion: buffered.resumeParentOnCompletion,
// Reconstruct the batch from the snapshot's internal id so a buffered
// run reports the same `batchId` / triggerFunction as it will once
// materialised, and so batch-scoped JWTs authorise against it (the
// route authorization callbacks read `run.batch?.friendlyId`).
batch: buffered.batchId
? { id: buffered.batchId, friendlyId: BatchId.toFriendlyId(buffered.batchId) }
: null,
runTags: buffered.tags,
traceId: buffered.traceId ?? "",
payload: synthesisePayload(buffered),
payloadType: buffered.payloadType ?? "application/json",
output: null,
outputType: "application/json",
error: errorJson,
attempts: [],
attemptNumber: null,
engine: "V2",
taskEventStore: "taskEvent",
// Empty string when absent (matches syntheticSpanRun.server.ts and lets
// `createCommonRunStructure`'s `run.workerQueue || undefined` coerce the
// API response's `region` to undefined instead of advertising a
// misleading "main" region for a not-yet-assigned buffered run).
workerQueue: buffered.workerQueue ?? "",
parentTaskRun: null,
rootTaskRun: null,
childRuns: [],
};
}
69 changes: 59 additions & 10 deletions apps/webapp/app/routes/api.v1.runs.$runId.spans.$spanId.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,33 +9,69 @@ import {
} from "~/services/routeBuilders/apiBuilder.server";
import { getEventRepositoryForStore } from "~/v3/eventRepository/index.server";
import { getTaskEventStoreTableForRun } from "~/v3/taskEventStore.server";
import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server";
import { buildSyntheticSpanDetailBody } from "~/v3/mollifier/syntheticApiResponses.server";

const ParamsSchema = z.object({
runId: z.string(),
spanId: z.string(),
});

// Resolve the run from either Postgres or the mollifier buffer.
// Buffered runs only have one valid spanId (the queued span recorded at
// gate time and reused as the run's root spanId when the drainer
// materialises). Any other spanId returns a deterministic 404; the queued
// span returns a minimal synthesised shape so the customer's SDK sees the
// same 200 contract they'd get for a freshly-triggered run.
type ResolvedRun =
| { source: "pg"; run: Awaited<ReturnType<typeof findPgRun>> & {} }
| { source: "buffer"; run: NonNullable<Awaited<ReturnType<typeof findRunByIdWithMollifierFallback>>> };

async function findPgRun(runId: string, environmentId: string) {
return $replica.taskRun.findFirst({
where: { friendlyId: runId, runtimeEnvironmentId: environmentId },
});
}

export const loader = createLoaderApiRoute(
{
params: ParamsSchema,
allowJWT: true,
corsStrategy: "all",
findResource: (params, auth) => {
return $replica.taskRun.findFirst({
where: {
friendlyId: params.runId,
runtimeEnvironmentId: auth.environment.id,
},
findResource: async (params, auth): Promise<ResolvedRun | null> => {
const pgRun = await findPgRun(params.runId, auth.environment.id);
if (pgRun) return { source: "pg", run: pgRun };

const buffered = await findRunByIdWithMollifierFallback({
runId: params.runId,
environmentId: auth.environment.id,
organizationId: auth.environment.organizationId,
});
if (buffered) return { source: "buffer", run: buffered };

return null;
},
shouldRetryNotFound: true,
authorization: {
action: "read",
resource: (run) => {
resource: (resolved) => {
if (resolved.source === "pg") {
const run = resolved.run;
const resources = [
{ type: "runs", id: run.friendlyId },
{ type: "tasks", id: run.taskIdentifier },
...run.runTags.map((tag) => ({ type: "tags", id: tag })),
];
if (run.batchId) {
resources.push({ type: "batch", id: BatchId.toFriendlyId(run.batchId) });
}
return anyResource(resources);
}
const run = resolved.run;
const resources = [
{ type: "runs", id: run.friendlyId },
{ type: "tasks", id: run.taskIdentifier },
...run.runTags.map((tag) => ({ type: "tags", id: tag })),
...(run.taskIdentifier ? [{ type: "tasks", id: run.taskIdentifier }] : []),
...run.tags.map((tag) => ({ type: "tags", id: tag })),
];
if (run.batchId) {
resources.push({ type: "batch", id: BatchId.toFriendlyId(run.batchId) });
Expand All @@ -44,7 +80,20 @@ export const loader = createLoaderApiRoute(
},
},
},
async ({ params, resource: run, authentication }) => {
async ({ params, resource: resolved, authentication }) => {
if (resolved.source === "buffer") {
// Buffered runs have exactly one valid spanId — the queued span the
// mollifier gate recorded at trigger time, which becomes the run's
// root spanId once the drainer materialises. Any other spanId is a
// deterministic 404. The matching spanId returns a minimal shape
// representing "span exists, no execution data yet."
if (resolved.run.spanId !== params.spanId) {
return json({ error: "Span not found" }, { status: 404 });
}
return json(buildSyntheticSpanDetailBody(resolved.run), { status: 200 });
}

const run = resolved.run;
const eventRepository = await getEventRepositoryForStore(
run.taskEventStore,
authentication.environment.organization.id
Expand Down
Loading