From fb311b5ef6a7287abd928ca178fac94663b2896d Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Wed, 27 May 2026 10:40:22 -0700 Subject: [PATCH 1/2] fix(rate-limiter): hosted-key queue follow-ups from #4756 review --- .../background/workflow-column-execution.ts | 4 ++++ .../hosted-key/hosted-key-rate-limiter.ts | 5 +++++ .../rate-limiter/hosted-key/queue.test.ts | 16 ++++++++++++---- .../lib/core/rate-limiter/hosted-key/queue.ts | 19 +++++++++++++++---- 4 files changed, 36 insertions(+), 8 deletions(-) diff --git a/apps/sim/background/workflow-column-execution.ts b/apps/sim/background/workflow-column-execution.ts index 5e9a1d301de..3dac9ac6fbb 100644 --- a/apps/sim/background/workflow-column-execution.ts +++ b/apps/sim/background/workflow-column-execution.ts @@ -70,6 +70,10 @@ export async function executeWorkflowGroupCellJob( ...currentPayload, groupId: next.id, workflowId: next.workflowId, + // Re-derive from the target group rather than inheriting the prior group's + // value via the spread: a workflow group following an enrichment group would + // otherwise carry a stale enrichmentId. + enrichmentId: next.enrichmentId, executionId: generateId(), } } diff --git a/apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.ts b/apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.ts index 10046696561..0c7745e750c 100644 --- a/apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.ts +++ b/apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.ts @@ -65,6 +65,7 @@ function interruptibleSleep(ms: number, signal?: AbortSignal): Promise { return new Promise((resolve) => { const onAbort = () => { clearTimeout(timer) + signal.removeEventListener('abort', onAbort) resolve() } const timer = setTimeout(() => { @@ -72,6 +73,10 @@ function interruptibleSleep(ms: number, signal?: AbortSignal): Promise { resolve() }, ms) signal.addEventListener('abort', onAbort, { once: true }) + // Re-check after registering: if the signal fired between the guard above and + // addEventListener, the 'abort' event already dispatched and our listener would + // never run, leaving the sleep to run its full duration. onAbort is idempotent. + if (signal.aborted) onAbort() }) } diff --git a/apps/sim/lib/core/rate-limiter/hosted-key/queue.test.ts b/apps/sim/lib/core/rate-limiter/hosted-key/queue.test.ts index 7405c1c5e61..d3d997d215a 100644 --- a/apps/sim/lib/core/rate-limiter/hosted-key/queue.test.ts +++ b/apps/sim/lib/core/rate-limiter/hosted-key/queue.test.ts @@ -170,24 +170,32 @@ describe('HostedKeyQueue', () => { }) describe('refreshHeartbeat', () => { - it('writes the heartbeat key with TTL', async () => { - mockRedis.set.mockResolvedValueOnce('OK') + it('writes the heartbeat key with TTL and re-extends the queue list TTL', async () => { + mockRedis.pipeline.exec.mockResolvedValueOnce([ + [null, 'OK'], + [null, 1], + ]) await queue.refreshHeartbeat(provider, workspaceId, ticketId) - expect(mockRedis.set).toHaveBeenCalledWith( + expect(mockRedis.pipeline.set).toHaveBeenCalledWith( 'hosted-queue-tkt:exa:workspace-1:ticket-1', '1', 'EX', expect.any(Number) ) + expect(mockRedis.pipeline.expire).toHaveBeenCalledWith( + 'hosted-queue:exa:workspace-1', + expect.any(Number) + ) + expect(mockRedis.pipeline.exec).toHaveBeenCalledTimes(1) }) it('is a no-op when Redis is unavailable', async () => { redisConfigMockFns.mockGetRedisClient.mockReturnValueOnce(null) await expect(queue.refreshHeartbeat(provider, workspaceId, ticketId)).resolves.toBeUndefined() - expect(mockRedis.set).not.toHaveBeenCalled() + expect(mockRedis.multi).not.toHaveBeenCalled() }) }) diff --git a/apps/sim/lib/core/rate-limiter/hosted-key/queue.ts b/apps/sim/lib/core/rate-limiter/hosted-key/queue.ts index 4a7ecd5aed1..dd06de8d481 100644 --- a/apps/sim/lib/core/rate-limiter/hosted-key/queue.ts +++ b/apps/sim/lib/core/rate-limiter/hosted-key/queue.ts @@ -15,8 +15,11 @@ const TICKET_HEARTBEAT_TTL_SECONDS = 30 export const HEARTBEAT_REFRESH_INTERVAL_MS = 10_000 /** - * TTL on the queue list itself. Set on every enqueue. Prevents abandoned queues - * (whole workspace went silent) from sticking around forever in Redis. + * TTL on the queue list itself. Set on every enqueue and re-extended by the head's + * heartbeat while it actively waits, so a long-waiting head (whose budget can exceed + * this TTL for enterprise async runs) never lets the list expire out from under the + * waiters behind it. Prevents abandoned queues (whole workspace went silent) from + * sticking around forever in Redis. */ const QUEUE_LIST_TTL_SECONDS = 600 @@ -148,7 +151,11 @@ export class HostedKeyQueue { /** * Refresh the ticket's heartbeat. Called periodically by the head while it's - * waiting on the bucket so it doesn't get reaped as dead. + * waiting on the bucket so it doesn't get reaped as dead. Also re-extends the + * queue list TTL: a head whose wait outlives {@link QUEUE_LIST_TTL_SECONDS} + * (possible for long enterprise async budgets) would otherwise let the list + * expire with no new enqueue to refresh it, dropping every waiter to "missing" + * and collapsing FIFO ordering into concurrent bucket racing. */ async refreshHeartbeat( provider: string, @@ -158,9 +165,13 @@ export class HostedKeyQueue { const redis = getRedisClient() if (!redis) return + const listKey = queueListKey(provider, billingActorId) const hbKey = heartbeatKey(provider, billingActorId, ticketId) try { - await redis.set(hbKey, '1', 'EX', TICKET_HEARTBEAT_TTL_SECONDS) + const pipeline = redis.multi() + pipeline.set(hbKey, '1', 'EX', TICKET_HEARTBEAT_TTL_SECONDS) + pipeline.expire(listKey, QUEUE_LIST_TTL_SECONDS) + await pipeline.exec() } catch (error) { logger.warn(`Queue heartbeat refresh failed for ${hbKey}`, { error: toError(error).message, From 33c0138d97c2366e72c4773c31c58f949360604d Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Wed, 27 May 2026 11:03:22 -0700 Subject: [PATCH 2/2] chore(rate-limiter): trim verbose comments --- .../sim/background/workflow-column-execution.ts | 4 +--- .../hosted-key/hosted-key-rate-limiter.ts | 4 +--- .../lib/core/rate-limiter/hosted-key/queue.ts | 17 ++++++----------- 3 files changed, 8 insertions(+), 17 deletions(-) diff --git a/apps/sim/background/workflow-column-execution.ts b/apps/sim/background/workflow-column-execution.ts index 3dac9ac6fbb..98490e32281 100644 --- a/apps/sim/background/workflow-column-execution.ts +++ b/apps/sim/background/workflow-column-execution.ts @@ -70,9 +70,7 @@ export async function executeWorkflowGroupCellJob( ...currentPayload, groupId: next.id, workflowId: next.workflowId, - // Re-derive from the target group rather than inheriting the prior group's - // value via the spread: a workflow group following an enrichment group would - // otherwise carry a stale enrichmentId. + // Re-derive so a workflow group after an enrichment group doesn't keep a stale enrichmentId. enrichmentId: next.enrichmentId, executionId: generateId(), } diff --git a/apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.ts b/apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.ts index 0c7745e750c..d199ed9e2c2 100644 --- a/apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.ts +++ b/apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.ts @@ -73,9 +73,7 @@ function interruptibleSleep(ms: number, signal?: AbortSignal): Promise { resolve() }, ms) signal.addEventListener('abort', onAbort, { once: true }) - // Re-check after registering: if the signal fired between the guard above and - // addEventListener, the 'abort' event already dispatched and our listener would - // never run, leaving the sleep to run its full duration. onAbort is idempotent. + // Catch an abort that fired between the guard above and addEventListener. if (signal.aborted) onAbort() }) } diff --git a/apps/sim/lib/core/rate-limiter/hosted-key/queue.ts b/apps/sim/lib/core/rate-limiter/hosted-key/queue.ts index dd06de8d481..a0803d1ae61 100644 --- a/apps/sim/lib/core/rate-limiter/hosted-key/queue.ts +++ b/apps/sim/lib/core/rate-limiter/hosted-key/queue.ts @@ -15,11 +15,9 @@ const TICKET_HEARTBEAT_TTL_SECONDS = 30 export const HEARTBEAT_REFRESH_INTERVAL_MS = 10_000 /** - * TTL on the queue list itself. Set on every enqueue and re-extended by the head's - * heartbeat while it actively waits, so a long-waiting head (whose budget can exceed - * this TTL for enterprise async runs) never lets the list expire out from under the - * waiters behind it. Prevents abandoned queues (whole workspace went silent) from - * sticking around forever in Redis. + * TTL on the queue list itself. Set on enqueue and re-extended by the head's heartbeat, + * so a long-waiting head can't let the list expire out from under the waiters behind it. + * Prevents abandoned queues from sticking around forever in Redis. */ const QUEUE_LIST_TTL_SECONDS = 600 @@ -150,12 +148,9 @@ export class HostedKeyQueue { } /** - * Refresh the ticket's heartbeat. Called periodically by the head while it's - * waiting on the bucket so it doesn't get reaped as dead. Also re-extends the - * queue list TTL: a head whose wait outlives {@link QUEUE_LIST_TTL_SECONDS} - * (possible for long enterprise async budgets) would otherwise let the list - * expire with no new enqueue to refresh it, dropping every waiter to "missing" - * and collapsing FIFO ordering into concurrent bucket racing. + * Refresh the ticket's heartbeat so the head isn't reaped as dead while waiting on the + * bucket. Also re-extends the queue list TTL so a wait outliving {@link QUEUE_LIST_TTL_SECONDS} + * doesn't let the list expire and collapse FIFO ordering. */ async refreshHeartbeat( provider: string,