diff --git a/packages/shared/src/__tests__/rate-limiter.test.ts b/packages/shared/src/__tests__/rate-limiter.test.ts index 383b44c..7ae5004 100644 --- a/packages/shared/src/__tests__/rate-limiter.test.ts +++ b/packages/shared/src/__tests__/rate-limiter.test.ts @@ -95,6 +95,29 @@ describe('TokenBucket', () => { expect(bucket.available).toBe(0); }); + it('serializes concurrent waiters FIFO and never over-issues tokens', async () => { + const bucket = new TokenBucket({ capacity: 2, refillRate: 1, refillIntervalMs: 1000 }); + bucket.tryConsume(2); // drain to 0 + expect(bucket.available).toBe(0); + + const order: number[] = []; + const p1 = bucket.waitAndConsume(1).then(() => order.push(1)); + const p2 = bucket.waitAndConsume(1).then(() => order.push(2)); + + // Each waiter needs one refill interval; serialized, not simultaneous. + await vi.advanceTimersByTimeAsync(1000); // first waiter gets its token + await vi.advanceTimersByTimeAsync(1000); // second waiter gets its token + await Promise.all([p1, p2]); + + expect(order).toEqual([1, 2]); // FIFO order preserved + expect(bucket.available).toBeGreaterThanOrEqual(0); // never drove tokens negative + }); + + it('throws when waitAndConsume count exceeds capacity', async () => { + const bucket = new TokenBucket({ capacity: 2, refillRate: 1, refillIntervalMs: 1000 }); + await expect(bucket.waitAndConsume(3)).rejects.toThrow(/capacity/); + }); + it('defaults count to 1 for tryConsume', () => { const bucket = new TokenBucket({ capacity: 3, refillRate: 1, refillIntervalMs: 1000 }); diff --git a/packages/shared/src/rate-limiter.ts b/packages/shared/src/rate-limiter.ts index 44feba4..8683162 100644 --- a/packages/shared/src/rate-limiter.ts +++ b/packages/shared/src/rate-limiter.ts @@ -21,6 +21,13 @@ export class TokenBucket { private readonly refillRate: number; private readonly refillIntervalMs: number; private lastRefillTime: number; + /** + * Tail of a FIFO promise chain that serializes `waitAndConsume` callers. + * Each waiter awaits the prior one before entering its consume loop, so + * concurrent waiters never both subtract from the same refill (which + * previously drove `tokens` negative and let the bucket exceed its limit). + */ + private queueTail: Promise = Promise.resolve(); constructor(config: TokenBucketConfig) { this.capacity = config.capacity; @@ -52,22 +59,45 @@ export class TokenBucket { return false; } - /** Wait until enough tokens are available, then consume them */ + /** + * Wait until enough tokens are available, then consume them. + * + * Callers are served FIFO and one at a time: a waiter only consumes after + * re-checking that enough tokens have actually refilled, so concurrent + * callers can never over-issue (drive `tokens` negative). Throws if `count` + * exceeds the bucket capacity, which could otherwise never be satisfied. + */ async waitAndConsume(count = 1): Promise { - this.refill(); - if (this.tokens >= count) { - this.tokens -= count; - return; + if (count > this.capacity) { + throw new RangeError( + `Cannot consume ${count} tokens: exceeds bucket capacity of ${this.capacity}` + ); } - const deficit = count - this.tokens; - const intervalsNeeded = Math.ceil(deficit / this.refillRate); - const waitMs = intervalsNeeded * this.refillIntervalMs; + // Serialize against other waiters: take the current tail, install our own. + const prior = this.queueTail; + let release!: () => void; + this.queueTail = new Promise((resolve) => { + release = resolve; + }); - await new Promise((resolve) => setTimeout(resolve, waitMs)); - - this.refill(); - this.tokens -= count; + try { + await prior; + // We now hold the lock; loop until enough tokens have refilled. + for (;;) { + this.refill(); + if (this.tokens >= count) { + this.tokens -= count; + return; + } + const deficit = count - this.tokens; + const intervalsNeeded = Math.ceil(deficit / this.refillRate); + const waitMs = intervalsNeeded * this.refillIntervalMs; + await new Promise((resolve) => setTimeout(resolve, waitMs)); + } + } finally { + release(); + } } /** Current number of available tokens */