Back to Blog
What Happens When Your Primary Model Fails
Hospitals have backup generators. Not because the grid always fails, but because when it does, the stakes are too high for "we'll figure it out." The backup isn't used daily, but its existence enables reliability.
LLM systems need the same preparation. Your primary provider will have outages. Your self-hosted model will crash. The question isn't whether failures happen, but whether you have a plan when they do.
Failure Modes
def failure_modes():
return {
"api_outage": {
"symptoms": ["Connection refused", "503 errors", "Timeouts"],
"duration": "Minutes to hours",
"frequency": "Monthly for major providers",
"detection": "Health checks, error rate monitoring",
},
"rate_limiting": {
"symptoms": ["429 errors", "Increasing latency"],
"duration": "Until quota resets",
"frequency": "Traffic-dependent",
"detection": "Rate limit headers, 429 count",
},
"degraded_performance": {
"symptoms": ["High latency", "Partial responses"],
"duration": "Variable",
"frequency": "Occasional",
"detection": "Latency percentiles, quality scores",
},
"model_issues": {
"symptoms": ["Bad outputs", "Refusals", "Loops"],
"duration": "Until provider fixes",
"frequency": "Rare but impactful",
"detection": "Quality monitoring, user feedback",
},
}
Fallback Strategy
class FallbackStrategy:
"""
Multi-level fallback for LLM requests
"""
def __init__(self):
self.primary = Provider("anthropic", "claude-3-opus")
self.secondary = Provider("openai", "gpt-4")
self.tertiary = Provider("anthropic", "claude-3-haiku")
self.cache = ResponseCache()
async def generate(self, request: dict) -> dict:
# Level 1: Try primary
try:
return await self.primary.generate(request, timeout=30)
except (Timeout, APIError) as e:
self.log_fallback("primary_failed", e)
# Level 2: Try secondary (different provider)
try:
return await self.secondary.generate(request, timeout=30)
except (Timeout, APIError) as e:
self.log_fallback("secondary_failed", e)
# Level 3: Try smaller model (faster, more available)
try:
return await self.tertiary.generate(request, timeout=15)
except (Timeout, APIError) as e:
self.log_fallback("tertiary_failed", e)
# Level 4: Check cache for similar request
cached = self.cache.get_similar(request)
if cached:
return {**cached, "source": "cache"}
# Level 5: Return error gracefully
return self.graceful_error(request)
def graceful_error(self, request: dict) -> dict:
return {
"error": True,
"message": "Service temporarily unavailable. Please try again.",
"retry_after": 60,
"fallback_suggestion": self.get_fallback_suggestion(request),
}
Circuit Breaker Pattern
class CircuitBreaker:
"""
Prevent cascading failures with circuit breaker
"""
def __init__(self, failure_threshold: int = 5, recovery_time: int = 60):
self.failure_threshold = failure_threshold
self.recovery_time = recovery_time
self.failures = 0
self.last_failure = None
self.state = "closed" # closed, open, half-open
def can_proceed(self) -> bool:
"""Check if circuit allows requests"""
if self.state == "closed":
return True
if self.state == "open":
# Check if recovery time has passed
if time.time() - self.last_failure > self.recovery_time:
self.state = "half-open"
return True
return False
if self.state == "half-open":
return True # Allow one test request
def record_success(self):
"""Record successful request"""
if self.state == "half-open":
self.state = "closed"
self.failures = 0
def record_failure(self):
"""Record failed request"""
self.failures += 1
self.last_failure = time.time()
if self.failures >= self.failure_threshold:
self.state = "open"
if self.state == "half-open":
self.state = "open" # Test failed, back to open
class ProviderWithCircuitBreaker:
"""Provider with circuit breaker protection"""
def __init__(self, provider, fallback):
self.provider = provider
self.fallback = fallback
self.breaker = CircuitBreaker()
async def generate(self, request: dict) -> dict:
if not self.breaker.can_proceed():
return await self.fallback.generate(request)
try:
result = await self.provider.generate(request)
self.breaker.record_success()
return result
except Exception as e:
self.breaker.record_failure()
return await self.fallback.generate(request)
Multi-Provider Configuration
def multi_provider_setup():
return {
"same_model_different_provider": {
"example": "Claude via Anthropic, Claude via AWS Bedrock",
"benefit": "Identical output, different infrastructure",
"tradeoff": "Need accounts with both, may have cost difference",
},
"different_model_same_capability": {
"example": "Claude Opus -> GPT-4",
"benefit": "Wide availability, different failure modes",
"tradeoff": "Slight output differences",
},
"smaller_model_fallback": {
"example": "Opus -> Sonnet -> Haiku",
"benefit": "Faster, more available, cheaper",
"tradeoff": "Quality degradation",
},
"configuration_example": """
fallback_chain:
- provider: anthropic
model: claude-3-opus
priority: 1
timeout: 30s
- provider: aws-bedrock
model: claude-3-opus
priority: 2
timeout: 30s
- provider: openai
model: gpt-4
priority: 3
timeout: 30s
- provider: anthropic
model: claude-3-haiku
priority: 4
timeout: 10s
""",
}
Response Caching for Emergencies
class EmergencyCache:
"""
Cache responses for emergency fallback
"""
def __init__(self, max_entries: int = 10000):
self.cache = {} # request_hash -> response
self.max_entries = max_entries
def cache_response(self, request: dict, response: dict):
"""Cache successful response"""
# Hash the request (excluding timestamps, etc.)
key = self.hash_request(request)
self.cache[key] = {
"response": response,
"cached_at": datetime.now(),
}
# Evict old entries
if len(self.cache) > self.max_entries:
self.evict_oldest()
def get_cached(self, request: dict) -> dict | None:
"""Get exact cached response"""
key = self.hash_request(request)
return self.cache.get(key, {}).get("response")
def get_similar(self, request: dict, threshold: float = 0.9) -> dict | None:
"""Get semantically similar cached response"""
request_embedding = self.embed(request["prompt"])
best_match = None
best_score = 0
for key, entry in self.cache.items():
cached_embedding = self.get_embedding(key)
similarity = cosine_similarity(request_embedding, cached_embedding)
if similarity > threshold and similarity > best_score:
best_match = entry["response"]
best_score = similarity
if best_match:
best_match["cache_similarity"] = best_score
best_match["from_cache"] = True
return best_match
Health Checks and Monitoring
class ProviderHealthChecker:
"""
Monitor provider health proactively
"""
def __init__(self, providers: list):
self.providers = providers
self.health_status = {}
async def health_check_loop(self, interval_seconds: int = 30):
"""Continuous health checking"""
while True:
for provider in self.providers:
try:
start = time.time()
await provider.generate(
{"prompt": "Hello", "max_tokens": 1},
timeout=10
)
latency = time.time() - start
self.health_status[provider.name] = {
"status": "healthy",
"latency": latency,
"checked_at": datetime.now(),
}
except Exception as e:
self.health_status[provider.name] = {
"status": "unhealthy",
"error": str(e),
"checked_at": datetime.now(),
}
await asyncio.sleep(interval_seconds)
def get_healthy_providers(self) -> list:
"""Get list of currently healthy providers"""
return [
name for name, status in self.health_status.items()
if status["status"] == "healthy"
]
User Communication
def user_communication():
return {
"transparent_degradation": {
"message": "Response may be slower than usual due to high demand.",
"when": "Using fallback provider or degraded mode",
},
"graceful_error": {
"message": "We're experiencing temporary issues. Your request has been queued and will be processed shortly.",
"when": "All providers failing but request can wait",
},
"hard_failure": {
"message": "Service temporarily unavailable. Please try again in a few minutes.",
"when": "Cannot serve request at all",
},
"implementation": """
def format_response(result: dict) -> dict:
if result.get("from_fallback"):
result["notice"] = "Using backup service"
if result.get("from_cache"):
result["notice"] = "Cached response due to high demand"
if result.get("degraded"):
result["notice"] = "Shorter response due to capacity limits"
return result
""",
}
Testing Fallbacks
def testing_fallbacks():
return {
"chaos_engineering": {
"approach": "Intentionally inject failures to test fallback",
"examples": [
"Block primary provider in firewall",
"Inject artificial latency",
"Return errors for subset of requests",
],
},
"game_days": {
"approach": "Scheduled exercises to practice incident response",
"frequency": "Quarterly",
"scenarios": [
"Primary provider total outage",
"Rate limiting surge",
"Quality degradation event",
],
},
"monitoring_validation": {
"verify": [
"Alerts fire when fallback activates",
"Metrics show fallback usage",
"Logs capture fallback decisions",
],
},
}
Fallback systems are insurance. You pay the cost of maintaining them even when everything works. The payoff is reliability when things fail. Design your fallbacks before the incident, test them regularly, and monitor their activation in production.