Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions packages/shared/src/__tests__/rate-limiter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,29 @@ describe('TokenBucket', () => {
expect(bucket.available).toBe(0);
});

it('serializes concurrent waiters FIFO and never over-issues tokens', async () => {
const bucket = new TokenBucket({ capacity: 2, refillRate: 1, refillIntervalMs: 1000 });
bucket.tryConsume(2); // drain to 0
expect(bucket.available).toBe(0);

const order: number[] = [];
const p1 = bucket.waitAndConsume(1).then(() => order.push(1));
const p2 = bucket.waitAndConsume(1).then(() => order.push(2));

// Each waiter needs one refill interval; serialized, not simultaneous.
await vi.advanceTimersByTimeAsync(1000); // first waiter gets its token
await vi.advanceTimersByTimeAsync(1000); // second waiter gets its token
await Promise.all([p1, p2]);

expect(order).toEqual([1, 2]); // FIFO order preserved
expect(bucket.available).toBeGreaterThanOrEqual(0); // never drove tokens negative
});

it('throws when waitAndConsume count exceeds capacity', async () => {
const bucket = new TokenBucket({ capacity: 2, refillRate: 1, refillIntervalMs: 1000 });
await expect(bucket.waitAndConsume(3)).rejects.toThrow(/capacity/);
});

it('defaults count to 1 for tryConsume', () => {
const bucket = new TokenBucket({ capacity: 3, refillRate: 1, refillIntervalMs: 1000 });

Expand Down
54 changes: 42 additions & 12 deletions packages/shared/src/rate-limiter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ export class TokenBucket {
private readonly refillRate: number;
private readonly refillIntervalMs: number;
private lastRefillTime: number;
/**
* Tail of a FIFO promise chain that serializes `waitAndConsume` callers.
* Each waiter awaits the prior one before entering its consume loop, so
* concurrent waiters never both subtract from the same refill (which
* previously drove `tokens` negative and let the bucket exceed its limit).
*/
private queueTail: Promise<void> = Promise.resolve();

constructor(config: TokenBucketConfig) {
this.capacity = config.capacity;
Expand Down Expand Up @@ -52,22 +59,45 @@ export class TokenBucket {
return false;
}

/** Wait until enough tokens are available, then consume them */
/**
* Wait until enough tokens are available, then consume them.
*
* Callers are served FIFO and one at a time: a waiter only consumes after
* re-checking that enough tokens have actually refilled, so concurrent
* callers can never over-issue (drive `tokens` negative). Throws if `count`
* exceeds the bucket capacity, which could otherwise never be satisfied.
*/
async waitAndConsume(count = 1): Promise<void> {
this.refill();
if (this.tokens >= count) {
this.tokens -= count;
return;
if (count > this.capacity) {
throw new RangeError(
`Cannot consume ${count} tokens: exceeds bucket capacity of ${this.capacity}`
);
}

const deficit = count - this.tokens;
const intervalsNeeded = Math.ceil(deficit / this.refillRate);
const waitMs = intervalsNeeded * this.refillIntervalMs;
// Serialize against other waiters: take the current tail, install our own.
const prior = this.queueTail;
let release!: () => void;
this.queueTail = new Promise<void>((resolve) => {
release = resolve;
});

await new Promise<void>((resolve) => setTimeout(resolve, waitMs));

this.refill();
this.tokens -= count;
try {
await prior;
// We now hold the lock; loop until enough tokens have refilled.
for (;;) {
this.refill();
if (this.tokens >= count) {
this.tokens -= count;
return;
}
const deficit = count - this.tokens;
const intervalsNeeded = Math.ceil(deficit / this.refillRate);
const waitMs = intervalsNeeded * this.refillIntervalMs;
await new Promise<void>((resolve) => setTimeout(resolve, waitMs));
}
} finally {
release();
}
}

/** Current number of available tokens */
Expand Down
Loading