Back to Blog
Evaluating Millions of LLM Responses
Assembly lines changed manufacturing not by making workers faster, but by making inspection systematic. Before Ford, craftsmen checked their own work. After Ford, dedicated inspectors sampled the line. The craftsman approach didn't scale.
LLM evaluation faces the same transition. At 1,000 responses per day, humans can review a meaningful fraction. At 10 million, you're sampling 0.001%. The question isn't whether to automate. It's what to automate and how.
The Scale Problem
def evaluation_scale_reality():
"""
The math on human review at scale
"""
daily_responses = 10_000_000
scenarios = {
"comprehensive_human_review": {
"responses_per_hour": 30,
"hours_needed": daily_responses / 30, # 333,333 hours
"reviewers_needed": 333_333 / 8, # 41,667 full-time reviewers
"annual_cost": 41_667 * 50_000, # $2B+
},
"realistic_sampling": {
"sample_rate": 0.0001, # 0.01%
"samples_per_day": 1000,
"hours_needed": 1000 / 30, # 33 hours
"reviewers_needed": 4,
"coverage": "Statistical sampling only",
},
"automated_with_human_escalation": {
"automated_coverage": 1.0, # 100%
"human_review": "Edge cases + disagreements",
"human_volume": "~500/day",
"cost": "Compute + small team",
},
}
return {
"conclusion": "Automation is the only path to comprehensive coverage",
"human_role": "Calibration, edge cases, eval design",
}
Building an Automated Eval Pipeline
class EvaluationPipeline:
"""
Evaluate every response, automatically
"""
def __init__(self, config):
self.deterministic_evals = [
LengthCheck(),
FormatCheck(),
LanguageCheck(),
RefusalDetector(),
PIIDetector(),
]
self.llm_evals = [
CoherenceJudge(),
RelevanceJudge(),
HarmfulnessJudge(),
]
self.task_specific = config.get("task_evals", [])
async def evaluate(self, request: dict, response: str) -> dict:
scores = {}
# Fast deterministic checks (microseconds)
for eval in self.deterministic_evals:
scores[eval.name] = eval.check(response)
# LLM-based evaluation (milliseconds, async)
if self.should_run_llm_evals(scores):
llm_results = await asyncio.gather(*[
eval.judge(request, response)
for eval in self.llm_evals
])
for eval, result in zip(self.llm_evals, llm_results):
scores[eval.name] = result
# Task-specific (if configured)
for eval in self.task_specific:
if eval.applies_to(request):
scores[eval.name] = await eval.evaluate(request, response)
return {
"scores": scores,
"pass": self.compute_pass(scores),
"flags": self.extract_flags(scores),
}
Deterministic vs LLM-Based Evals
def eval_type_comparison():
return {
"deterministic": {
"examples": [
"Response length in range",
"JSON format valid",
"No PII detected",
"Language matches request",
"Refusal keywords absent",
],
"cost": "Microseconds, $0",
"reliability": "100% consistent",
"coverage": "Narrow but certain",
"use_for": "Hard constraints, compliance",
},
"llm_as_judge": {
"examples": [
"Response is coherent",
"Answer addresses question",
"Tone is appropriate",
"No hallucination detected",
],
"cost": "100-500ms, $0.001-0.01 per eval",
"reliability": "90-95% agreement with humans",
"coverage": "Broad, subjective quality",
"use_for": "Quality that humans would judge",
},
"task_specific": {
"examples": [
"Code compiles and passes tests",
"SQL query returns expected rows",
"Math answer is correct",
"Translation preserves meaning",
],
"cost": "Varies (seconds for code execution)",
"reliability": "High for verifiable tasks",
"coverage": "Task-dependent",
"use_for": "Ground-truth verifiable outputs",
},
}
Sampling Strategies
class SamplingStrategy:
"""
When you can't evaluate everything with expensive evals
"""
def select_samples(self, responses: list, budget: int) -> list:
samples = []
# Always include: edge cases
samples.extend(self.high_uncertainty(responses))
# Always include: extremes
samples.extend(self.longest_responses(responses))
samples.extend(self.shortest_responses(responses))
# Stratified random from remainder
remaining_budget = budget - len(samples)
samples.extend(self.stratified_sample(
responses,
remaining_budget,
strata=["task_type", "user_tier", "model_version"]
))
return samples[:budget]
def high_uncertainty(self, responses: list) -> list:
"""Responses where automated evals disagreed or were uncertain"""
return [r for r in responses if r.eval_uncertainty > 0.3]
Aggregation and Alerting
class EvalAggregator:
"""
Turn millions of individual evals into actionable signals
"""
def aggregate_hourly(self, evals: list) -> dict:
return {
"pass_rate": sum(e.passed for e in evals) / len(evals),
"scores_by_dimension": {
dim: statistics.mean([e.scores[dim] for e in evals])
for dim in self.dimensions
},
"failure_breakdown": self.categorize_failures(evals),
"percentiles": {
"p50": self.percentile(evals, 50),
"p95": self.percentile(evals, 95),
"p99": self.percentile(evals, 99),
},
}
def detect_regression(self, current: dict, baseline: dict) -> list:
alerts = []
for dim, score in current["scores_by_dimension"].items():
baseline_score = baseline["scores_by_dimension"][dim]
if score < baseline_score - self.threshold:
alerts.append({
"type": "regression",
"dimension": dim,
"current": score,
"baseline": baseline_score,
"drop": baseline_score - score,
})
return alerts
Cost of Evaluation
def evaluation_cost_model():
"""
Evaluation has real costs. Budget accordingly.
"""
daily_responses = 10_000_000
return {
"deterministic_only": {
"cost_per_response": 0.0001, # Compute only
"daily_cost": 1.00,
"coverage": "Basic quality gates",
},
"deterministic_plus_llm_sample": {
"deterministic": "100% coverage",
"llm_eval": "1% sample",
"llm_cost_per": 0.005,
"daily_cost": 1.00 + (100_000 * 0.005), # $501
"coverage": "Gates + quality sampling",
},
"full_llm_eval": {
"cost_per_response": 0.005,
"daily_cost": 50_000,
"coverage": "Every response evaluated",
"note": "Rarely justified at scale",
},
"recommendation": """
Tier your evals:
1. 100% deterministic (pennies)
2. 100% cheap LLM eval with small model (dollars)
3. 1-5% expensive eval with strong model (hundreds)
4. 0.1% human review (escalations only)
""",
}
Real-Time vs Batch Evaluation
def evaluation_timing():
return {
"real_time": {
"when": "Before response reaches user",
"latency_budget": "< 200ms",
"what_to_run": [
"Safety classifiers",
"Format validation",
"Refusal detection",
],
"purpose": "Block bad responses",
},
"near_real_time": {
"when": "Within seconds of response",
"latency_budget": "< 5s",
"what_to_run": [
"Quality scores",
"Coherence checks",
],
"purpose": "Monitoring dashboards",
},
"batch": {
"when": "Hourly or daily",
"latency_budget": "Hours",
"what_to_run": [
"Expensive LLM-as-judge",
"Human review samples",
"Trend analysis",
],
"purpose": "Quality reporting, regression detection",
},
}
Implementation Architecture
def eval_architecture():
return """
Request Flow:
User Request
│
▼
┌─────────┐
│ Model │
└────┬────┘
│
▼
┌─────────────────┐
│ Real-time Evals │ ← Block if fails
│ (Safety, Format)│
└────────┬────────┘
│
▼
Response to User
│
│ async
▼
┌─────────────────┐
│ Background Evals│ ← Queue for processing
│ (Quality, LLM) │
└────────┬────────┘
│
▼
┌─────────────────┐
│ Aggregation │
│ & Alerting │
└─────────────────┘
"""
Automated evaluation at scale is about accepting that you can't review everything manually, then building systems to maximize signal from automated checks. The goal isn't perfection on every response. It's detecting problems before they affect too many users.