Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions apps/sim/background/workflow-column-execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ export async function executeWorkflowGroupCellJob(
...currentPayload,
groupId: next.id,
workflowId: next.workflowId,
// Re-derive so a workflow group after an enrichment group doesn't keep a stale enrichmentId.
enrichmentId: next.enrichmentId,
executionId: generateId(),
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,16 @@ function interruptibleSleep(ms: number, signal?: AbortSignal): Promise<void> {
return new Promise<void>((resolve) => {
const onAbort = () => {
clearTimeout(timer)
signal.removeEventListener('abort', onAbort)
resolve()
}
const timer = setTimeout(() => {
signal.removeEventListener('abort', onAbort)
resolve()
}, ms)
signal.addEventListener('abort', onAbort, { once: true })
// Catch an abort that fired between the guard above and addEventListener.
if (signal.aborted) onAbort()
})
}

Expand Down
16 changes: 12 additions & 4 deletions apps/sim/lib/core/rate-limiter/hosted-key/queue.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,24 +170,32 @@ describe('HostedKeyQueue', () => {
})

describe('refreshHeartbeat', () => {
it('writes the heartbeat key with TTL', async () => {
mockRedis.set.mockResolvedValueOnce('OK')
it('writes the heartbeat key with TTL and re-extends the queue list TTL', async () => {
mockRedis.pipeline.exec.mockResolvedValueOnce([
[null, 'OK'],
[null, 1],
])

await queue.refreshHeartbeat(provider, workspaceId, ticketId)

expect(mockRedis.set).toHaveBeenCalledWith(
expect(mockRedis.pipeline.set).toHaveBeenCalledWith(
'hosted-queue-tkt:exa:workspace-1:ticket-1',
'1',
'EX',
expect.any(Number)
)
expect(mockRedis.pipeline.expire).toHaveBeenCalledWith(
'hosted-queue:exa:workspace-1',
expect.any(Number)
)
expect(mockRedis.pipeline.exec).toHaveBeenCalledTimes(1)
})

it('is a no-op when Redis is unavailable', async () => {
redisConfigMockFns.mockGetRedisClient.mockReturnValueOnce(null)

await expect(queue.refreshHeartbeat(provider, workspaceId, ticketId)).resolves.toBeUndefined()
expect(mockRedis.set).not.toHaveBeenCalled()
expect(mockRedis.multi).not.toHaveBeenCalled()
})
})

Expand Down
16 changes: 11 additions & 5 deletions apps/sim/lib/core/rate-limiter/hosted-key/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ const TICKET_HEARTBEAT_TTL_SECONDS = 30
export const HEARTBEAT_REFRESH_INTERVAL_MS = 10_000

/**
* TTL on the queue list itself. Set on every enqueue. Prevents abandoned queues
* (whole workspace went silent) from sticking around forever in Redis.
* TTL on the queue list itself. Set on enqueue and re-extended by the head's heartbeat,
* so a long-waiting head can't let the list expire out from under the waiters behind it.
* Prevents abandoned queues from sticking around forever in Redis.
*/
const QUEUE_LIST_TTL_SECONDS = 600

Expand Down Expand Up @@ -147,8 +148,9 @@ export class HostedKeyQueue {
}

/**
* Refresh the ticket's heartbeat. Called periodically by the head while it's
* waiting on the bucket so it doesn't get reaped as dead.
* Refresh the ticket's heartbeat so the head isn't reaped as dead while waiting on the
* bucket. Also re-extends the queue list TTL so a wait outliving {@link QUEUE_LIST_TTL_SECONDS}
* doesn't let the list expire and collapse FIFO ordering.
*/
async refreshHeartbeat(
provider: string,
Expand All @@ -158,9 +160,13 @@ export class HostedKeyQueue {
const redis = getRedisClient()
if (!redis) return

const listKey = queueListKey(provider, billingActorId)
const hbKey = heartbeatKey(provider, billingActorId, ticketId)
try {
await redis.set(hbKey, '1', 'EX', TICKET_HEARTBEAT_TTL_SECONDS)
const pipeline = redis.multi()
pipeline.set(hbKey, '1', 'EX', TICKET_HEARTBEAT_TTL_SECONDS)
pipeline.expire(listKey, QUEUE_LIST_TTL_SECONDS)
await pipeline.exec()
} catch (error) {
logger.warn(`Queue heartbeat refresh failed for ${hbKey}`, {
error: toError(error).message,
Expand Down
Loading