diff --git a/apps/sim/background/workflow-column-execution.ts b/apps/sim/background/workflow-column-execution.ts index 5e9a1d301d..98490e3228 100644 --- a/apps/sim/background/workflow-column-execution.ts +++ b/apps/sim/background/workflow-column-execution.ts @@ -70,6 +70,8 @@ export async function executeWorkflowGroupCellJob( ...currentPayload, groupId: next.id, workflowId: next.workflowId, + // Re-derive so a workflow group after an enrichment group doesn't keep a stale enrichmentId. + enrichmentId: next.enrichmentId, executionId: generateId(), } } diff --git a/apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.ts b/apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.ts index 1004669656..d199ed9e2c 100644 --- a/apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.ts +++ b/apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.ts @@ -65,6 +65,7 @@ function interruptibleSleep(ms: number, signal?: AbortSignal): Promise { return new Promise((resolve) => { const onAbort = () => { clearTimeout(timer) + signal.removeEventListener('abort', onAbort) resolve() } const timer = setTimeout(() => { @@ -72,6 +73,8 @@ function interruptibleSleep(ms: number, signal?: AbortSignal): Promise { resolve() }, ms) signal.addEventListener('abort', onAbort, { once: true }) + // Catch an abort that fired between the guard above and addEventListener. + if (signal.aborted) onAbort() }) } diff --git a/apps/sim/lib/core/rate-limiter/hosted-key/queue.test.ts b/apps/sim/lib/core/rate-limiter/hosted-key/queue.test.ts index 7405c1c5e6..d3d997d215 100644 --- a/apps/sim/lib/core/rate-limiter/hosted-key/queue.test.ts +++ b/apps/sim/lib/core/rate-limiter/hosted-key/queue.test.ts @@ -170,24 +170,32 @@ describe('HostedKeyQueue', () => { }) describe('refreshHeartbeat', () => { - it('writes the heartbeat key with TTL', async () => { - mockRedis.set.mockResolvedValueOnce('OK') + it('writes the heartbeat key with TTL and re-extends the queue list TTL', async () => { + mockRedis.pipeline.exec.mockResolvedValueOnce([ + [null, 'OK'], + [null, 1], + ]) await queue.refreshHeartbeat(provider, workspaceId, ticketId) - expect(mockRedis.set).toHaveBeenCalledWith( + expect(mockRedis.pipeline.set).toHaveBeenCalledWith( 'hosted-queue-tkt:exa:workspace-1:ticket-1', '1', 'EX', expect.any(Number) ) + expect(mockRedis.pipeline.expire).toHaveBeenCalledWith( + 'hosted-queue:exa:workspace-1', + expect.any(Number) + ) + expect(mockRedis.pipeline.exec).toHaveBeenCalledTimes(1) }) it('is a no-op when Redis is unavailable', async () => { redisConfigMockFns.mockGetRedisClient.mockReturnValueOnce(null) await expect(queue.refreshHeartbeat(provider, workspaceId, ticketId)).resolves.toBeUndefined() - expect(mockRedis.set).not.toHaveBeenCalled() + expect(mockRedis.multi).not.toHaveBeenCalled() }) }) diff --git a/apps/sim/lib/core/rate-limiter/hosted-key/queue.ts b/apps/sim/lib/core/rate-limiter/hosted-key/queue.ts index 4a7ecd5aed..a0803d1ae6 100644 --- a/apps/sim/lib/core/rate-limiter/hosted-key/queue.ts +++ b/apps/sim/lib/core/rate-limiter/hosted-key/queue.ts @@ -15,8 +15,9 @@ const TICKET_HEARTBEAT_TTL_SECONDS = 30 export const HEARTBEAT_REFRESH_INTERVAL_MS = 10_000 /** - * TTL on the queue list itself. Set on every enqueue. Prevents abandoned queues - * (whole workspace went silent) from sticking around forever in Redis. + * TTL on the queue list itself. Set on enqueue and re-extended by the head's heartbeat, + * so a long-waiting head can't let the list expire out from under the waiters behind it. + * Prevents abandoned queues from sticking around forever in Redis. */ const QUEUE_LIST_TTL_SECONDS = 600 @@ -147,8 +148,9 @@ export class HostedKeyQueue { } /** - * Refresh the ticket's heartbeat. Called periodically by the head while it's - * waiting on the bucket so it doesn't get reaped as dead. + * Refresh the ticket's heartbeat so the head isn't reaped as dead while waiting on the + * bucket. Also re-extends the queue list TTL so a wait outliving {@link QUEUE_LIST_TTL_SECONDS} + * doesn't let the list expire and collapse FIFO ordering. */ async refreshHeartbeat( provider: string, @@ -158,9 +160,13 @@ export class HostedKeyQueue { const redis = getRedisClient() if (!redis) return + const listKey = queueListKey(provider, billingActorId) const hbKey = heartbeatKey(provider, billingActorId, ticketId) try { - await redis.set(hbKey, '1', 'EX', TICKET_HEARTBEAT_TTL_SECONDS) + const pipeline = redis.multi() + pipeline.set(hbKey, '1', 'EX', TICKET_HEARTBEAT_TTL_SECONDS) + pipeline.expire(listKey, QUEUE_LIST_TTL_SECONDS) + await pipeline.exec() } catch (error) { logger.warn(`Queue heartbeat refresh failed for ${hbKey}`, { error: toError(error).message,