Implementing Request Priority in LLM Serving
Airlines have figured this out. First class boards first. Economy waits. But economy still flies.
LLM serving needs the same tiers. Real-time chat gets priority. Overnight batch processing fills gaps. Everyone gets served, but not equally.
Priority Queue Basics
from dataclasses import dataclass
from heapq import heappush, heappop
from enum import IntEnum
class Priority(IntEnum):
CRITICAL = 0 # Emergency, skip everything
HIGH = 1 # Real-time user-facing
NORMAL = 2 # Standard requests
LOW = 3 # Background jobs
BATCH = 4 # Offline processing
@dataclass(order=True)
class PrioritizedRequest:
priority: int
arrival_time: float
request: Request = field(compare=False)
class PriorityQueue:
def __init__(self):
self.heap = []
def add(self, request: Request, priority: Priority):
entry = PrioritizedRequest(
priority=priority.value,
arrival_time=time.time(),
request=request
)
heappush(self.heap, entry)
def get_next(self) -> Optional[Request]:
if self.heap:
return heappop(self.heap).request
return None
Lower priority number = higher actual priority. The heap gives you the highest priority request in O(log n).
Preventing Starvation
Pure priority queuing has a flaw: low priority requests might never run if high priority requests keep arriving.
class AgingPriorityQueue:
def __init__(self, aging_rate: float = 0.1):
self.aging_rate = aging_rate # Priority boost per second
self.requests = []
def add(self, request: Request, base_priority: int):
entry = {
"request": request,
"base_priority": base_priority,
"arrival_time": time.time()
}
self.requests.append(entry)
def get_next(self) -> Optional[Request]:
if not self.requests:
return None
now = time.time()
# Calculate effective priority with aging
def effective_priority(entry):
age = now - entry["arrival_time"]
# Priority decreases (improves) with age
return entry["base_priority"] - (age * self.aging_rate)
# Sort by effective priority
self.requests.sort(key=effective_priority)
return self.requests.pop(0)["request"]
A batch job waiting 10 minutes eventually gets priority boost equivalent to a normal request. After 20 minutes, it's like a high priority request.
Capacity Reservation
Another approach: reserve capacity for each priority tier.
class ReservedCapacityScheduler:
def __init__(self, total_capacity: int):
# Reserve capacity percentages
self.reservations = {
Priority.CRITICAL: 0.10, # 10% always available
Priority.HIGH: 0.30, # 30% for high priority
Priority.NORMAL: 0.40, # 40% for normal
Priority.LOW: 0.15, # 15% for low
Priority.BATCH: 0.05, # 5% for batch
}
self.total_capacity = total_capacity
self.current_usage = {p: 0 for p in Priority}
def can_admit(self, priority: Priority, tokens: int) -> bool:
# Check if this priority tier has reserved capacity
reserved = self.total_capacity * self.reservations[priority]
if self.current_usage[priority] + tokens <= reserved:
return True
# Check if there's unused capacity from lower priorities
available = self.total_capacity - sum(self.current_usage.values())
return available >= tokens
def admit(self, priority: Priority, tokens: int):
self.current_usage[priority] += tokens
def release(self, priority: Priority, tokens: int):
self.current_usage[priority] -= tokens
High priority can steal from batch's allocation, but batch can't steal from high priority's reservation.
Priority Inversion Protection
Priority inversion: a low priority request holds a resource that a high priority request needs.
class PriorityInversionProtection:
def __init__(self):
self.active_requests = {} # request_id -> priority
def start_request(self, request_id: str, priority: Priority):
self.active_requests[request_id] = priority
def check_for_inversion(self, waiting_request: Request) -> list[str]:
# Find lower-priority requests that might be blocking
blockers = [
req_id for req_id, pri in self.active_requests.items()
if pri > waiting_request.priority # Lower priority
]
if blockers and waiting_request.priority <= Priority.HIGH:
# Consider preempting the blockers
return blockers
return []
In LLM serving, inversion is less common than in OS scheduling, but it can happen with shared resources like KV cache memory.
API Design for Priority
Expose priority through your API:
# Explicit priority parameter
response = client.chat(
messages=[...],
priority="high", # or "normal", "low", "batch"
)
# Inferred from endpoint
# /v1/chat/completions -> normal priority
# /v1/batch/completions -> batch priority
# /v1/priority/completions -> high priority (costs more)
# Inferred from auth tier
def get_priority(api_key: str) -> Priority:
tier = get_tier_for_key(api_key)
return {
"enterprise": Priority.HIGH,
"pro": Priority.NORMAL,
"free": Priority.LOW,
}.get(tier, Priority.NORMAL)
Monitoring Priority Health
Track whether priorities are working:
metrics = {
"queue_depth_by_priority": Gauge("queue_depth", ["priority"]),
"latency_by_priority": Histogram("latency", ["priority"]),
"starvation_time": Histogram("starvation_seconds", ["priority"]),
}
# Alert if:
# - High priority P99 > 500ms (SLA breach)
# - Batch priority starvation > 30 minutes (need more capacity)
# - Queue depth for any tier growing continuously (falling behind)
Priority is a promise. If high priority isn't actually fast, the system is lying.
The goal isn't to make high priority fast. It's to make fast things high priority, and be honest about what's fast.