Designing Queues That Don't Explode
Water pipes have a maximum flow rate. Exceed it, and either the pipe bursts or water backs up. Systems without overflow handling eventually drown.
LLM request queues face the same physics. Requests arrive faster than they're processed. Where do they go?
The Bounded Queue Principle
Never use an unbounded queue:
# Dangerous: grows forever under load
queue = [] # or Queue()
# Safe: has a limit
queue = Queue(maxsize=1000)
But what should maxsize be?
Token-Based Queue Limits
Request count is misleading. 100 small requests might be fine. 10 large requests might OOM your system.
class TokenBoundedQueue:
def __init__(self, max_tokens: int = 10_000_000):
self.max_tokens = max_tokens
self.current_tokens = 0
self.requests = deque()
self.lock = threading.Lock()
def put(self, request: Request) -> bool:
tokens = request.input_tokens + request.max_output_tokens
with self.lock:
if self.current_tokens + tokens > self.max_tokens:
return False # Rejected
self.requests.append(request)
self.current_tokens += tokens
return True
def get(self) -> Optional[Request]:
with self.lock:
if not self.requests:
return None
request = self.requests.popleft()
tokens = request.input_tokens + request.max_output_tokens
self.current_tokens -= tokens
return request
def size_tokens(self) -> int:
return self.current_tokens
10 million tokens might be your limit. Whether that's 100 requests or 10,000 depends on request sizes.
Multi-Level Queues
Different request types need different handling:
class MultiLevelQueue:
def __init__(self):
self.queues = {
"express": TokenBoundedQueue(max_tokens=1_000_000),
"standard": TokenBoundedQueue(max_tokens=5_000_000),
"bulk": TokenBoundedQueue(max_tokens=20_000_000),
}
def put(self, request: Request) -> bool:
tier = classify_request(request)
return self.queues[tier].put(request)
def get_next_batch(self) -> list[Request]:
# Express first, then standard, then bulk
for tier in ["express", "standard", "bulk"]:
request = self.queues[tier].get()
if request:
return [request] # Simplification
return []
Express queue fills up? Standard and bulk still work. Bulk overloaded? Express remains responsive.
Queue Timeout Policies
Requests shouldn't wait forever:
class TimeBoundedQueue:
def __init__(self, max_age_seconds: float = 30.0):
self.max_age = max_age_seconds
self.requests = deque() # (arrival_time, request)
def put(self, request: Request):
self.requests.append((time.time(), request))
def get(self) -> Optional[Request]:
self._evict_stale()
if self.requests:
_, request = self.requests.popleft()
return request
return None
def _evict_stale(self):
now = time.time()
while self.requests:
arrival, request = self.requests[0]
if now - arrival > self.max_age:
self.requests.popleft()
# Optionally: notify request was dropped
request.on_timeout()
else:
break
A request waiting 30 seconds is probably not useful anymore. Better to tell the client to retry than to process stale work.
Backpressure Signaling
When the queue is filling up, tell upstream:
class BackpressureQueue:
def __init__(self, max_tokens: int, warning_threshold: float = 0.8):
self.max_tokens = max_tokens
self.warning_threshold = warning_threshold
self.current_tokens = 0
def put(self, request: Request) -> QueueResult:
tokens = request.total_tokens
if self.current_tokens + tokens > self.max_tokens:
return QueueResult(
accepted=False,
status="rejected",
retry_after_seconds=10,
)
utilization = self.current_tokens / self.max_tokens
self.current_tokens += tokens
if utilization > self.warning_threshold:
return QueueResult(
accepted=True,
status="accepted_under_pressure",
estimated_wait_seconds=self._estimate_wait(),
)
return QueueResult(
accepted=True,
status="accepted",
estimated_wait_seconds=self._estimate_wait(),
)
Clients can decide: wait, retry later, or try a different endpoint.
Queue Monitoring
The queue is a leading indicator. Problems show up here before they show up in latency:
metrics = {
"queue_depth_tokens": "Total tokens queued",
"queue_depth_requests": "Total requests queued",
"queue_wait_time_p99": "How long requests wait",
"queue_rejection_rate": "Requests turned away",
"queue_timeout_rate": "Requests that aged out",
}
alerts = {
"queue_depth > 80%": "Approaching capacity",
"queue_wait_p99 > 10s": "Users waiting too long",
"rejection_rate > 5%": "Shedding significant load",
}
Queue growing + processing rate stable = you're falling behind. Time to scale or shed load.
The Sizing Formula
Start with:
def initial_queue_size(
target_latency_seconds: float,
processing_rate_tps: float,
peak_arrival_rate_tps: float,
safety_margin: float = 1.5
) -> int:
# How many tokens might accumulate during peak
burst_duration = 60 # Assume 1-minute bursts
excess_rate = peak_arrival_rate_tps - processing_rate_tps
if excess_rate <= 0:
# You can keep up, minimal queue needed
return int(processing_rate_tps * target_latency_seconds * safety_margin)
max_backlog = excess_rate * burst_duration
return int(max_backlog * safety_margin)
Then observe and adjust. If queue is always near-empty, maybe you over-provisioned serving capacity. If queue is always near-full, you're under-provisioned.
A well-designed queue is invisible when things are normal and graceful when things aren't.