Running LLMs trong production đắt đỏ. Một API call có thể cost $0.001-$0.1, nhưng với millions of requests, chi phí tăng nhanh. Performance optimization giảm latency, cost optimization giảm spending - và thường chúng đi đôi với nhau.
Trong bài này, chúng ta sẽ khám phá các techniques để optimize cả performance lẫn cost cho AI applications.
# Typical pricing (as of 2024)
PRICING = {
'gpt-4': {
'input': 0.03 / 1000, # $0.03 per 1K input tokens
'output': 0.06 / 1000 # $0.06 per 1K output tokens
},
'gpt-3.5-turbo': {
'input': 0.0015 / 1000,
'output': 0.002 / 1000
},
'claude-2': {
'input': 0.008 / 1000,
'output': 0.024 / 1000
}
}
def calculate_cost(model, input_tokens, output_tokens):
"""Calculate API call cost."""
pricing = PRICING[model]
input_cost = input_tokens * pricing['input']
output_cost = output_tokens * pricing['output']
return input_cost + output_cost
# Example
cost = calculate_cost('gpt-4', input_tokens=500, output_tokens=300)
print(f"Cost: ${cost:.4f}") # $0.033
# 1 million requests/day:
daily_cost = cost * 1_000_000
print(f"Daily: ${daily_cost:,.2f}") # $33,000/day!
1. Prompt Compression
# ❌ Verbose prompt (expensive)
verbose_prompt = """
Please analyze the following customer review and provide:
1. Overall sentiment (positive, negative, neutral)
2. Key topics mentioned
3. Specific pain points if any
4. Suggestions for improvement
Customer review:
{review}
Please provide your analysis in a structured format with clear headings for each section.
"""
# ✅ Compressed prompt (cheaper)
compressed_prompt = """
Analyze review. Return JSON:
{
"sentiment": "positive|negative|neutral",
"topics": [],
"pain_points": [],
"suggestions": []
}
Review: {review}
"""
# Savings: ~50% fewer tokens
2. Smart Caching
import hashlib
import redis
class LLMCache:
def __init__(self):
self.redis = redis.Redis()
self.ttl = 86400 # 24 hours
def get_cache_key(self, prompt, model):
"""Generate deterministic cache key."""
content = f"{model}:{prompt}"
return hashlib.sha256(content.encode()).hexdigest()
def get(self, prompt, model):
"""Check cache before API call."""
key = self.get_cache_key(prompt, model)
cached = self.redis.get(key)
if cached:
print("💰 Cache hit - $0 cost!")
return cached.decode()
return None
def set(self, prompt, model, response):
"""Cache response."""
key = self.get_cache_key(prompt, model)
self.redis.set(key, response, ex=self.ttl)
# Usage
cache = LLMCache()
def llm_call(prompt, model='gpt-3.5-turbo'):
# Check cache first
cached = cache.get(prompt, model)
if cached:
return cached
# Cache miss - make API call
response = openai.ChatCompletion.create(
model=model,
messages=[{"role": "user", "content": prompt}]
)
result = response.choices[0].message.content
# Cache for future
cache.set(prompt, model, result)
return result
# First call: API cost
result = llm_call("What is Python?") # $0.0015
# Subsequent calls: FREE
result = llm_call("What is Python?") # $0
result = llm_call("What is Python?") # $0
Cache hit rate impact:
Scenario: 1M requests/day
Without cache: 1M API calls × $0.002 = $2,000/day
With 70% cache hit rate:
- Cached: 700K × $0 = $0
- API calls: 300K × $0.002 = $600/day
Savings: $1,400/day = $511,000/year!
3. Model Selection
def smart_model_selection(query):
"""Route to cheapest sufficient model."""
# Simple queries → cheap model
if is_simple_query(query):
return call_llm('gpt-3.5-turbo', query) # $0.002/1K
# Complex reasoning → expensive model
elif requires_complex_reasoning(query):
return call_llm('gpt-4', query) # $0.06/1K
# Default
else:
return call_llm('gpt-3.5-turbo', query)
def is_simple_query(query):
"""Heuristics for simple queries."""
simple_patterns = [
'what is',
'define',
'translate',
'summarize'
]
return any(pattern in query.lower() for pattern in simple_patterns)
Savings example:
100K simple queries:
- All GPT-4: 100K × $0.06 = $6,000
- Routed: 70K × $0.002 + 30K × $0.06 = $140 + $1,800 = $1,940
Savings: $4,060 (68%)
Latency: Time for SINGLE request
Throughput: Requests processed per SECOND
Examples:
┌──────────────┬─────────┬────────────┐
│ Configuration│ Latency │ Throughput │
├──────────────┼─────────┼────────────┤
│ No batching │ 100ms │ 10 req/s │
│ Batch size 8 │ 150ms │ 53 req/s │
│ Batch size 32│ 300ms │ 107 req/s │
└──────────────┴─────────┴────────────┘
Trade-off: Higher batching → More throughput, Higher latency
import asyncio
from collections import deque
import numpy as np
class AdaptiveBatcher:
def __init__(
self,
min_batch_size=1,
max_batch_size=32,
max_wait_ms=50
):
self.min_batch_size = min_batch_size
self.max_batch_size = max_batch_size
self.max_wait_ms = max_wait_ms
self.queue = deque()
self.processing = False
async def predict(self, features):
"""Add request to queue and wait for result."""
# Create future for this request
future = asyncio.Future()
self.queue.append((features, future))
# Trigger batch processing
if not self.processing:
asyncio.create_task(self._process_batch())
# Wait for result
return await future
async def _process_batch(self):
"""Process batch when ready."""
self.processing = True
# Wait for batch to accumulate or timeout
start_time = asyncio.get_event_loop().time()
while True:
# Check if batch ready
batch_ready = len(self.queue) >= self.min_batch_size
time_exceeded = (asyncio.get_event_loop().time() - start_time) * 1000 > self.max_wait_ms
if batch_ready or time_exceeded:
break
await asyncio.sleep(0.001) # 1ms
# Collect batch
batch_features = []
batch_futures = []
while self.queue and len(batch_features) < self.max_batch_size:
features, future = self.queue.popleft()
batch_features.append(features)
batch_futures.append(future)
if batch_features:
# Batch inference (GPU loves this!)
batch_array = np.array(batch_features)
predictions = model.predict(batch_array)
# Return individual results
for future, prediction in zip(batch_futures, predictions):
future.set_result(prediction)
self.processing = False
# Continue if more in queue
if self.queue:
asyncio.create_task(self._process_batch())
# Usage in FastAPI
from fastapi import FastAPI
app = FastAPI()
batcher = AdaptiveBatcher(
min_batch_size=4,
max_batch_size=32,
max_wait_ms=50 # 50ms max wait
)
@app.post("/predict")
async def predict(features: list):
result = await batcher.predict(features)
return {"prediction": result}
Performance gains:
Without batching:
- 1 request = 100ms
- Throughput: 10 req/s
- GPU utilization: 20%
With adaptive batching:
- Batch size 16 = 150ms per batch
- Throughput: 107 req/s (10.7x improvement!)
- GPU utilization: 85%
class LatencyBudgetManager:
def __init__(self, budget_ms=1000):
self.budget_ms = budget_ms
async def execute_with_budget(self, tasks):
"""Execute tasks within latency budget."""
start = time.time()
results = []
for task in tasks:
elapsed_ms = (time.time() - start) * 1000
remaining_budget = self.budget_ms - elapsed_ms
if remaining_budget <= 0:
print("⚠️ Budget exceeded, skipping remaining tasks")
break
# Execute with timeout
try:
result = await asyncio.wait_for(
task(),
timeout=remaining_budget / 1000
)
results.append(result)
except asyncio.TimeoutError:
print(f"Task timed out after {remaining_budget}ms")
break
return results
# Example: RAG pipeline with budget
async def rag_pipeline(query):
budget = LatencyBudgetManager(budget_ms=500)
tasks = [
lambda: retrieve_documents(query), # 100ms
lambda: rerank_documents(docs), # 150ms
lambda: generate_answer(docs, query) # 200ms
]
results = await budget.execute_with_budget(tasks)
# Total: ~450ms (within 500ms budget)
import time
def benchmark_batch_size(model, data, batch_sizes=[1, 4, 8, 16, 32, 64]):
"""Find optimal batch size."""
results = []
for batch_size in batch_sizes:
# Measure throughput
start = time.time()
for i in range(0, len(data), batch_size):
batch = data[i:i+batch_size]
model.predict(batch)
elapsed = time.time() - start
throughput = len(data) / elapsed
latency = (elapsed / len(data)) * batch_size * 1000 # ms
results.append({
'batch_size': batch_size,
'throughput': throughput,
'latency': latency
})
print(f"Batch {batch_size}: {throughput:.1f} req/s, {latency:.1f}ms latency")
return results
# Example output:
# Batch 1: 10.2 req/s, 98ms latency
# Batch 4: 35.1 req/s, 114ms latency
# Batch 8: 58.3 req/s, 137ms latency ← Sweet spot
# Batch 16: 72.4 req/s, 221ms latency
# Batch 32: 81.2 req/s, 394ms latency (too slow)
Reduce perceived latency by streaming.
from fastapi.responses import StreamingResponse
import asyncio
async def stream_llm_response(prompt):
"""Stream tokens as they're generated."""
async for chunk in openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}],
stream=True
):
content = chunk.choices[0].delta.get('content', '')
if content:
yield f"data: {content}\n\n"
await asyncio.sleep(0.01) # Small delay
@app.post("/chat")
async def chat(prompt: str):
return StreamingResponse(
stream_llm_response(prompt),
media_type="text/event-stream"
)
# User experience:
# Without streaming: Wait 5 seconds → Get full response
# With streaming: See words appear immediately → Total 5 seconds
# Perceived latency: Much lower!
Train smaller, faster models from large models.
import torch
import torch.nn as nn
class Distiller:
def __init__(self, teacher_model, student_model, temperature=3.0):
self.teacher = teacher_model
self.student = student_model
self.temperature = temperature
# Freeze teacher
for param in self.teacher.parameters():
param.requires_grad = False
def distillation_loss(self, student_logits, teacher_logits, labels):
"""
Combine hard labels loss and soft labels loss.
"""
# Soft targets (from teacher)
soft_targets = nn.functional.softmax(
teacher_logits / self.temperature,
dim=1
)
soft_predictions = nn.functional.log_softmax(
student_logits / self.temperature,
dim=1
)
# KL divergence loss
soft_loss = nn.functional.kl_div(
soft_predictions,
soft_targets,
reduction='batchmean'
) * (self.temperature ** 2)
# Hard targets loss
hard_loss = nn.functional.cross_entropy(student_logits, labels)
# Combined loss (usually 90% soft, 10% hard)
return 0.9 * soft_loss + 0.1 * hard_loss
def train_step(self, batch, optimizer):
inputs, labels = batch
# Teacher predictions (no grad)
with torch.no_grad():
teacher_logits = self.teacher(inputs)
# Student predictions
student_logits = self.student(inputs)
# Compute loss
loss = self.distillation_loss(student_logits, teacher_logits, labels)
# Backprop
optimizer.zero_grad()
loss.backward()
optimizer.step()
return loss.item()
# Example: Distill BERT-large → BERT-small
teacher = BertLargeModel() # 340M parameters
student = BertSmallModel() # 67M parameters (5x smaller)
distiller = Distiller(teacher, student)
for epoch in range(num_epochs):
for batch in dataloader:
loss = distiller.train_step(batch, optimizer)
# Result:
# - Student: 5x smaller, 5x faster
# - Accuracy: 95% of teacher (vs 100%)
# - Cost: 1/5 of teacher
from transformers import DistilBertModel
# Instead of BERT-base (110M params)
model = DistilBertModel.from_pretrained('distilbert-base-uncased')
# 66M params, 60% faster, 97% of BERT-base accuracy
# Use for inference
embeddings = model(input_ids)
# Cost savings:
# BERT-base: 100ms latency
# DistilBERT: 40ms latency (2.5x faster)
import torch
# Original model (FP32)
model = torch.load('model.pt')
model.eval()
# Quantize to INT8
quantized_model = torch.quantization.quantize_dynamic(
model,
{torch.nn.Linear},
dtype=torch.qint8
)
# Benchmark
def benchmark(model, inputs, num_runs=100):
import time
start = time.time()
for _ in range(num_runs):
with torch.no_grad():
model(inputs)
elapsed = time.time() - start
avg_latency = (elapsed / num_runs) * 1000
return avg_latency
# Results:
fp32_latency = benchmark(model, test_input)
int8_latency = benchmark(quantized_model, test_input)
print(f"FP32: {fp32_latency:.2f}ms")
print(f"INT8: {int8_latency:.2f}ms")
print(f"Speedup: {fp32_latency/int8_latency:.2f}x")
# Typical output:
# FP32: 45.2ms
# INT8: 18.7ms
# Speedup: 2.4x
# Cost impact:
# If running on cloud GPUs: 2.4x fewer GPU hours needed
# Savings: $1000/month → $417/month
from prometheus_client import Counter, Histogram, Gauge
import time
# Metrics
api_calls = Counter('llm_api_calls_total', 'Total API calls', ['model'])
api_costs = Counter('llm_api_costs_dollars', 'Total API costs', ['model'])
api_latency = Histogram('llm_api_latency_seconds', 'API latency', ['model'])
cache_hits = Counter('llm_cache_hits_total', 'Cache hits')
cache_misses = Counter('llm_cache_misses_total', 'Cache misses')
def monitored_llm_call(prompt, model='gpt-3.5-turbo'):
start = time.time()
# Check cache
cached = cache.get(prompt, model)
if cached:
cache_hits.inc()
return cached
cache_misses.inc()
# API call
response = openai.ChatCompletion.create(
model=model,
messages=[{"role": "user", "content": prompt}]
)
# Track metrics
latency = time.time() - start
api_latency.labels(model=model).observe(latency)
api_calls.labels(model=model).inc()
# Calculate cost
usage = response.usage
cost = calculate_cost(model, usage.prompt_tokens, usage.completion_tokens)
api_costs.labels(model=model).inc(cost)
result = response.choices[0].message.content
cache.set(prompt, model, result)
return result
# Dashboard queries (Prometheus):
# - sum(rate(llm_api_costs_dollars[1h])) → Hourly spend
# - histogram_quantile(0.95, llm_api_latency_seconds) → P95 latency
# - cache_hits / (cache_hits + cache_misses) → Cache hit rate
# ✅ Implemented optimizations
OPTIMIZATIONS = {
'prompt_compression': {
'enabled': True,
'savings': '30-50% token reduction'
},
'caching': {
'enabled': True,
'hit_rate': 0.70,
'savings': '$1,400/day'
},
'model_selection': {
'enabled': True,
'routing': 'simple → gpt-3.5, complex → gpt-4',
'savings': '68% cost reduction'
},
'batching': {
'enabled': True,
'batch_size': 16,
'throughput_gain': '10x'
},
'quantization': {
'enabled': True,
'precision': 'INT8',
'speedup': '2.4x'
},
'distillation': {
'enabled': False, # Future work
'potential_savings': '80% cost reduction'
}
}
def generate_cost_report():
"""Generate monthly cost projection."""
baseline_monthly = 1_000_000 # $1M/month baseline
# Apply optimizations
cost = baseline_monthly
cost *= (1 - 0.5) # Prompt compression: -50%
cost *= (1 - 0.7) # Caching: -70%
cost *= (1 - 0.68) # Model selection: -68%
print(f"Baseline: ${baseline_monthly:,}")
print(f"Optimized: ${cost:,.2f}")
print(f"Savings: ${baseline_monthly - cost:,.2f} ({(1-cost/baseline_monthly)*100:.1f}%)")
# Example output:
# Baseline: $1,000,000
# Optimized: $48,000
# Savings: $952,000 (95.2%)
Trong bài tiếp theo, chúng ta sẽ khám phá System Observability - logging, metrics, distributed tracing, và incident management cho AI systems.
Bài viết thuộc series "From Zero to AI Engineer" - Module 10: Scalability & Observability