Back to Blog

What Happens When Your Primary Model Fails

Hospitals have backup generators. Not because the grid always fails, but because when it does, the stakes are too high for "we'll figure it out." The backup isn't used daily, but its existence enables reliability.

LLM systems need the same preparation. Your primary provider will have outages. Your self-hosted model will crash. The question isn't whether failures happen, but whether you have a plan when they do.

Failure Modes

def failure_modes():
    return {
        "api_outage": {
            "symptoms": ["Connection refused", "503 errors", "Timeouts"],
            "duration": "Minutes to hours",
            "frequency": "Monthly for major providers",
            "detection": "Health checks, error rate monitoring",
        },

        "rate_limiting": {
            "symptoms": ["429 errors", "Increasing latency"],
            "duration": "Until quota resets",
            "frequency": "Traffic-dependent",
            "detection": "Rate limit headers, 429 count",
        },

        "degraded_performance": {
            "symptoms": ["High latency", "Partial responses"],
            "duration": "Variable",
            "frequency": "Occasional",
            "detection": "Latency percentiles, quality scores",
        },

        "model_issues": {
            "symptoms": ["Bad outputs", "Refusals", "Loops"],
            "duration": "Until provider fixes",
            "frequency": "Rare but impactful",
            "detection": "Quality monitoring, user feedback",
        },
    }

Fallback Strategy

class FallbackStrategy:
    """
    Multi-level fallback for LLM requests
    """

    def __init__(self):
        self.primary = Provider("anthropic", "claude-3-opus")
        self.secondary = Provider("openai", "gpt-4")
        self.tertiary = Provider("anthropic", "claude-3-haiku")
        self.cache = ResponseCache()

    async def generate(self, request: dict) -> dict:
        # Level 1: Try primary
        try:
            return await self.primary.generate(request, timeout=30)
        except (Timeout, APIError) as e:
            self.log_fallback("primary_failed", e)

        # Level 2: Try secondary (different provider)
        try:
            return await self.secondary.generate(request, timeout=30)
        except (Timeout, APIError) as e:
            self.log_fallback("secondary_failed", e)

        # Level 3: Try smaller model (faster, more available)
        try:
            return await self.tertiary.generate(request, timeout=15)
        except (Timeout, APIError) as e:
            self.log_fallback("tertiary_failed", e)

        # Level 4: Check cache for similar request
        cached = self.cache.get_similar(request)
        if cached:
            return {**cached, "source": "cache"}

        # Level 5: Return error gracefully
        return self.graceful_error(request)

    def graceful_error(self, request: dict) -> dict:
        return {
            "error": True,
            "message": "Service temporarily unavailable. Please try again.",
            "retry_after": 60,
            "fallback_suggestion": self.get_fallback_suggestion(request),
        }

Circuit Breaker Pattern

class CircuitBreaker:
    """
    Prevent cascading failures with circuit breaker
    """

    def __init__(self, failure_threshold: int = 5, recovery_time: int = 60):
        self.failure_threshold = failure_threshold
        self.recovery_time = recovery_time
        self.failures = 0
        self.last_failure = None
        self.state = "closed"  # closed, open, half-open

    def can_proceed(self) -> bool:
        """Check if circuit allows requests"""
        if self.state == "closed":
            return True

        if self.state == "open":
            # Check if recovery time has passed
            if time.time() - self.last_failure > self.recovery_time:
                self.state = "half-open"
                return True
            return False

        if self.state == "half-open":
            return True  # Allow one test request

    def record_success(self):
        """Record successful request"""
        if self.state == "half-open":
            self.state = "closed"
            self.failures = 0

    def record_failure(self):
        """Record failed request"""
        self.failures += 1
        self.last_failure = time.time()

        if self.failures >= self.failure_threshold:
            self.state = "open"

        if self.state == "half-open":
            self.state = "open"  # Test failed, back to open


class ProviderWithCircuitBreaker:
    """Provider with circuit breaker protection"""

    def __init__(self, provider, fallback):
        self.provider = provider
        self.fallback = fallback
        self.breaker = CircuitBreaker()

    async def generate(self, request: dict) -> dict:
        if not self.breaker.can_proceed():
            return await self.fallback.generate(request)

        try:
            result = await self.provider.generate(request)
            self.breaker.record_success()
            return result
        except Exception as e:
            self.breaker.record_failure()
            return await self.fallback.generate(request)

Multi-Provider Configuration

def multi_provider_setup():
    return {
        "same_model_different_provider": {
            "example": "Claude via Anthropic, Claude via AWS Bedrock",
            "benefit": "Identical output, different infrastructure",
            "tradeoff": "Need accounts with both, may have cost difference",
        },

        "different_model_same_capability": {
            "example": "Claude Opus -> GPT-4",
            "benefit": "Wide availability, different failure modes",
            "tradeoff": "Slight output differences",
        },

        "smaller_model_fallback": {
            "example": "Opus -> Sonnet -> Haiku",
            "benefit": "Faster, more available, cheaper",
            "tradeoff": "Quality degradation",
        },

        "configuration_example": """
            fallback_chain:
              - provider: anthropic
                model: claude-3-opus
                priority: 1
                timeout: 30s

              - provider: aws-bedrock
                model: claude-3-opus
                priority: 2
                timeout: 30s

              - provider: openai
                model: gpt-4
                priority: 3
                timeout: 30s

              - provider: anthropic
                model: claude-3-haiku
                priority: 4
                timeout: 10s
        """,
    }

Response Caching for Emergencies

class EmergencyCache:
    """
    Cache responses for emergency fallback
    """

    def __init__(self, max_entries: int = 10000):
        self.cache = {}  # request_hash -> response
        self.max_entries = max_entries

    def cache_response(self, request: dict, response: dict):
        """Cache successful response"""
        # Hash the request (excluding timestamps, etc.)
        key = self.hash_request(request)
        self.cache[key] = {
            "response": response,
            "cached_at": datetime.now(),
        }

        # Evict old entries
        if len(self.cache) > self.max_entries:
            self.evict_oldest()

    def get_cached(self, request: dict) -> dict | None:
        """Get exact cached response"""
        key = self.hash_request(request)
        return self.cache.get(key, {}).get("response")

    def get_similar(self, request: dict, threshold: float = 0.9) -> dict | None:
        """Get semantically similar cached response"""
        request_embedding = self.embed(request["prompt"])

        best_match = None
        best_score = 0

        for key, entry in self.cache.items():
            cached_embedding = self.get_embedding(key)
            similarity = cosine_similarity(request_embedding, cached_embedding)

            if similarity > threshold and similarity > best_score:
                best_match = entry["response"]
                best_score = similarity

        if best_match:
            best_match["cache_similarity"] = best_score
            best_match["from_cache"] = True

        return best_match

Health Checks and Monitoring

class ProviderHealthChecker:
    """
    Monitor provider health proactively
    """

    def __init__(self, providers: list):
        self.providers = providers
        self.health_status = {}

    async def health_check_loop(self, interval_seconds: int = 30):
        """Continuous health checking"""
        while True:
            for provider in self.providers:
                try:
                    start = time.time()
                    await provider.generate(
                        {"prompt": "Hello", "max_tokens": 1},
                        timeout=10
                    )
                    latency = time.time() - start

                    self.health_status[provider.name] = {
                        "status": "healthy",
                        "latency": latency,
                        "checked_at": datetime.now(),
                    }
                except Exception as e:
                    self.health_status[provider.name] = {
                        "status": "unhealthy",
                        "error": str(e),
                        "checked_at": datetime.now(),
                    }

            await asyncio.sleep(interval_seconds)

    def get_healthy_providers(self) -> list:
        """Get list of currently healthy providers"""
        return [
            name for name, status in self.health_status.items()
            if status["status"] == "healthy"
        ]

User Communication

def user_communication():
    return {
        "transparent_degradation": {
            "message": "Response may be slower than usual due to high demand.",
            "when": "Using fallback provider or degraded mode",
        },

        "graceful_error": {
            "message": "We're experiencing temporary issues. Your request has been queued and will be processed shortly.",
            "when": "All providers failing but request can wait",
        },

        "hard_failure": {
            "message": "Service temporarily unavailable. Please try again in a few minutes.",
            "when": "Cannot serve request at all",
        },

        "implementation": """
            def format_response(result: dict) -> dict:
                if result.get("from_fallback"):
                    result["notice"] = "Using backup service"

                if result.get("from_cache"):
                    result["notice"] = "Cached response due to high demand"

                if result.get("degraded"):
                    result["notice"] = "Shorter response due to capacity limits"

                return result
        """,
    }

Testing Fallbacks

def testing_fallbacks():
    return {
        "chaos_engineering": {
            "approach": "Intentionally inject failures to test fallback",
            "examples": [
                "Block primary provider in firewall",
                "Inject artificial latency",
                "Return errors for subset of requests",
            ],
        },

        "game_days": {
            "approach": "Scheduled exercises to practice incident response",
            "frequency": "Quarterly",
            "scenarios": [
                "Primary provider total outage",
                "Rate limiting surge",
                "Quality degradation event",
            ],
        },

        "monitoring_validation": {
            "verify": [
                "Alerts fire when fallback activates",
                "Metrics show fallback usage",
                "Logs capture fallback decisions",
            ],
        },
    }

Fallback systems are insurance. You pay the cost of maintaining them even when everything works. The payoff is reliability when things fail. Design your fallbacks before the incident, test them regularly, and monitor their activation in production.