Performance & Cost Optimization: Làm AI Applications nhanh hơn, rẻ hơn

Running LLMs trong production đắt đỏ. Một API call có thể cost $0.001-$0.1, nhưng với millions of requests, chi phí tăng nhanh. Performance optimization giảm latency, cost optimization giảm spending - và thường chúng đi đôi với nhau.

Trong bài này, chúng ta sẽ khám phá các techniques để optimize cả performance lẫn cost cho AI applications.

Token Cost Models

Understanding LLM Pricing

# Typical pricing (as of 2024)
PRICING = {
    'gpt-4': {
        'input': 0.03 / 1000,   # $0.03 per 1K input tokens
        'output': 0.06 / 1000   # $0.06 per 1K output tokens
    },
    'gpt-3.5-turbo': {
        'input': 0.0015 / 1000,
        'output': 0.002 / 1000
    },
    'claude-2': {
        'input': 0.008 / 1000,
        'output': 0.024 / 1000
    }
}

def calculate_cost(model, input_tokens, output_tokens):
    """Calculate API call cost."""
    pricing = PRICING[model]
    
    input_cost = input_tokens * pricing['input']
    output_cost = output_tokens * pricing['output']
    
    return input_cost + output_cost

# Example
cost = calculate_cost('gpt-4', input_tokens=500, output_tokens=300)
print(f"Cost: ${cost:.4f}")  # $0.033

# 1 million requests/day:
daily_cost = cost * 1_000_000
print(f"Daily: ${daily_cost:,.2f}")  # $33,000/day!

Cost Reduction Strategies

1. Prompt Compression

# ❌ Verbose prompt (expensive)
verbose_prompt = """
Please analyze the following customer review and provide:
1. Overall sentiment (positive, negative, neutral)
2. Key topics mentioned
3. Specific pain points if any
4. Suggestions for improvement

Customer review:
{review}

Please provide your analysis in a structured format with clear headings for each section.
"""

# ✅ Compressed prompt (cheaper)
compressed_prompt = """
Analyze review. Return JSON:
{
  "sentiment": "positive|negative|neutral",
  "topics": [],
  "pain_points": [],
  "suggestions": []
}

Review: {review}
"""

# Savings: ~50% fewer tokens

2. Smart Caching

import hashlib
import redis

class LLMCache:
    def __init__(self):
        self.redis = redis.Redis()
        self.ttl = 86400  # 24 hours
    
    def get_cache_key(self, prompt, model):
        """Generate deterministic cache key."""
        content = f"{model}:{prompt}"
        return hashlib.sha256(content.encode()).hexdigest()
    
    def get(self, prompt, model):
        """Check cache before API call."""
        key = self.get_cache_key(prompt, model)
        cached = self.redis.get(key)
        
        if cached:
            print("💰 Cache hit - $0 cost!")
            return cached.decode()
        
        return None
    
    def set(self, prompt, model, response):
        """Cache response."""
        key = self.get_cache_key(prompt, model)
        self.redis.set(key, response, ex=self.ttl)

# Usage
cache = LLMCache()

def llm_call(prompt, model='gpt-3.5-turbo'):
    # Check cache first
    cached = cache.get(prompt, model)
    if cached:
        return cached
    
    # Cache miss - make API call
    response = openai.ChatCompletion.create(
        model=model,
        messages=[{"role": "user", "content": prompt}]
    )
    
    result = response.choices[0].message.content
    
    # Cache for future
    cache.set(prompt, model, result)
    
    return result

# First call: API cost
result = llm_call("What is Python?")  # $0.0015

# Subsequent calls: FREE
result = llm_call("What is Python?")  # $0
result = llm_call("What is Python?")  # $0

Cache hit rate impact:

Scenario: 1M requests/day
Without cache: 1M API calls × $0.002 = $2,000/day

With 70% cache hit rate:
- Cached: 700K × $0 = $0
- API calls: 300K × $0.002 = $600/day
Savings: $1,400/day = $511,000/year!

3. Model Selection

def smart_model_selection(query):
    """Route to cheapest sufficient model."""
    
    # Simple queries → cheap model
    if is_simple_query(query):
        return call_llm('gpt-3.5-turbo', query)  # $0.002/1K
    
    # Complex reasoning → expensive model
    elif requires_complex_reasoning(query):
        return call_llm('gpt-4', query)  # $0.06/1K
    
    # Default
    else:
        return call_llm('gpt-3.5-turbo', query)

def is_simple_query(query):
    """Heuristics for simple queries."""
    simple_patterns = [
        'what is',
        'define',
        'translate',
        'summarize'
    ]
    return any(pattern in query.lower() for pattern in simple_patterns)

Savings example:

100K simple queries:
- All GPT-4: 100K × $0.06 = $6,000
- Routed: 70K × $0.002 + 30K × $0.06 = $140 + $1,800 = $1,940
Savings: $4,060 (68%)

Latency vs Throughput Trade-offs

Understanding the Trade-off

Latency: Time for SINGLE request
Throughput: Requests processed per SECOND

Examples:
┌──────────────┬─────────┬────────────┐
│ Configuration│ Latency │ Throughput │
├──────────────┼─────────┼────────────┤
│ No batching  │ 100ms   │ 10 req/s   │
│ Batch size 8 │ 150ms   │ 53 req/s   │
│ Batch size 32│ 300ms   │ 107 req/s  │
└──────────────┴─────────┴────────────┘

Trade-off: Higher batching → More throughput, Higher latency

Dynamic Batching Implementation

import asyncio
from collections import deque
import numpy as np

class AdaptiveBatcher:
    def __init__(
        self,
        min_batch_size=1,
        max_batch_size=32,
        max_wait_ms=50
    ):
        self.min_batch_size = min_batch_size
        self.max_batch_size = max_batch_size
        self.max_wait_ms = max_wait_ms
        
        self.queue = deque()
        self.processing = False
    
    async def predict(self, features):
        """Add request to queue and wait for result."""
        
        # Create future for this request
        future = asyncio.Future()
        self.queue.append((features, future))
        
        # Trigger batch processing
        if not self.processing:
            asyncio.create_task(self._process_batch())
        
        # Wait for result
        return await future
    
    async def _process_batch(self):
        """Process batch when ready."""
        self.processing = True
        
        # Wait for batch to accumulate or timeout
        start_time = asyncio.get_event_loop().time()
        
        while True:
            # Check if batch ready
            batch_ready = len(self.queue) >= self.min_batch_size
            time_exceeded = (asyncio.get_event_loop().time() - start_time) * 1000 > self.max_wait_ms
            
            if batch_ready or time_exceeded:
                break
            
            await asyncio.sleep(0.001)  # 1ms
        
        # Collect batch
        batch_features = []
        batch_futures = []
        
        while self.queue and len(batch_features) < self.max_batch_size:
            features, future = self.queue.popleft()
            batch_features.append(features)
            batch_futures.append(future)
        
        if batch_features:
            # Batch inference (GPU loves this!)
            batch_array = np.array(batch_features)
            predictions = model.predict(batch_array)
            
            # Return individual results
            for future, prediction in zip(batch_futures, predictions):
                future.set_result(prediction)
        
        self.processing = False
        
        # Continue if more in queue
        if self.queue:
            asyncio.create_task(self._process_batch())

# Usage in FastAPI
from fastapi import FastAPI

app = FastAPI()
batcher = AdaptiveBatcher(
    min_batch_size=4,
    max_batch_size=32,
    max_wait_ms=50  # 50ms max wait
)

@app.post("/predict")
async def predict(features: list):
    result = await batcher.predict(features)
    return {"prediction": result}

Performance gains:

Without batching:
- 1 request = 100ms
- Throughput: 10 req/s
- GPU utilization: 20%

With adaptive batching:
- Batch size 16 = 150ms per batch
- Throughput: 107 req/s (10.7x improvement!)
- GPU utilization: 85%

Latency Budgets

class LatencyBudgetManager:
    def __init__(self, budget_ms=1000):
        self.budget_ms = budget_ms
    
    async def execute_with_budget(self, tasks):
        """Execute tasks within latency budget."""
        
        start = time.time()
        results = []
        
        for task in tasks:
            elapsed_ms = (time.time() - start) * 1000
            remaining_budget = self.budget_ms - elapsed_ms
            
            if remaining_budget <= 0:
                print("⚠️ Budget exceeded, skipping remaining tasks")
                break
            
            # Execute with timeout
            try:
                result = await asyncio.wait_for(
                    task(),
                    timeout=remaining_budget / 1000
                )
                results.append(result)
            except asyncio.TimeoutError:
                print(f"Task timed out after {remaining_budget}ms")
                break
        
        return results

# Example: RAG pipeline with budget
async def rag_pipeline(query):
    budget = LatencyBudgetManager(budget_ms=500)
    
    tasks = [
        lambda: retrieve_documents(query),   # 100ms
        lambda: rerank_documents(docs),      # 150ms
        lambda: generate_answer(docs, query) # 200ms
    ]
    
    results = await budget.execute_with_budget(tasks)
    # Total: ~450ms (within 500ms budget)

Request Batching Strategies

Batch Size Tuning

import time

def benchmark_batch_size(model, data, batch_sizes=[1, 4, 8, 16, 32, 64]):
    """Find optimal batch size."""
    
    results = []
    
    for batch_size in batch_sizes:
        # Measure throughput
        start = time.time()
        
        for i in range(0, len(data), batch_size):
            batch = data[i:i+batch_size]
            model.predict(batch)
        
        elapsed = time.time() - start
        throughput = len(data) / elapsed
        latency = (elapsed / len(data)) * batch_size * 1000  # ms
        
        results.append({
            'batch_size': batch_size,
            'throughput': throughput,
            'latency': latency
        })
        
        print(f"Batch {batch_size}: {throughput:.1f} req/s, {latency:.1f}ms latency")
    
    return results

# Example output:
# Batch 1:  10.2 req/s, 98ms latency
# Batch 4:  35.1 req/s, 114ms latency
# Batch 8:  58.3 req/s, 137ms latency  ← Sweet spot
# Batch 16: 72.4 req/s, 221ms latency
# Batch 32: 81.2 req/s, 394ms latency  (too slow)

Streaming Responses

Reduce perceived latency by streaming.

from fastapi.responses import StreamingResponse
import asyncio

async def stream_llm_response(prompt):
    """Stream tokens as they're generated."""
    
    async for chunk in openai.ChatCompletion.create(
        model="gpt-3.5-turbo",
        messages=[{"role": "user", "content": prompt}],
        stream=True
    ):
        content = chunk.choices[0].delta.get('content', '')
        if content:
            yield f"data: {content}\n\n"
            await asyncio.sleep(0.01)  # Small delay

@app.post("/chat")
async def chat(prompt: str):
    return StreamingResponse(
        stream_llm_response(prompt),
        media_type="text/event-stream"
    )

# User experience:
# Without streaming: Wait 5 seconds → Get full response
# With streaming: See words appear immediately → Total 5 seconds
# Perceived latency: Much lower!

Model Distillation

Train smaller, faster models from large models.

Knowledge Distillation Process

import torch
import torch.nn as nn

class Distiller:
    def __init__(self, teacher_model, student_model, temperature=3.0):
        self.teacher = teacher_model
        self.student = student_model
        self.temperature = temperature
        
        # Freeze teacher
        for param in self.teacher.parameters():
            param.requires_grad = False
    
    def distillation_loss(self, student_logits, teacher_logits, labels):
        """
        Combine hard labels loss and soft labels loss.
        """
        # Soft targets (from teacher)
        soft_targets = nn.functional.softmax(
            teacher_logits / self.temperature,
            dim=1
        )
        
        soft_predictions = nn.functional.log_softmax(
            student_logits / self.temperature,
            dim=1
        )
        
        # KL divergence loss
        soft_loss = nn.functional.kl_div(
            soft_predictions,
            soft_targets,
            reduction='batchmean'
        ) * (self.temperature ** 2)
        
        # Hard targets loss
        hard_loss = nn.functional.cross_entropy(student_logits, labels)
        
        # Combined loss (usually 90% soft, 10% hard)
        return 0.9 * soft_loss + 0.1 * hard_loss
    
    def train_step(self, batch, optimizer):
        inputs, labels = batch
        
        # Teacher predictions (no grad)
        with torch.no_grad():
            teacher_logits = self.teacher(inputs)
        
        # Student predictions
        student_logits = self.student(inputs)
        
        # Compute loss
        loss = self.distillation_loss(student_logits, teacher_logits, labels)
        
        # Backprop
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        return loss.item()

# Example: Distill BERT-large → BERT-small
teacher = BertLargeModel()  # 340M parameters
student = BertSmallModel()  # 67M parameters (5x smaller)

distiller = Distiller(teacher, student)

for epoch in range(num_epochs):
    for batch in dataloader:
        loss = distiller.train_step(batch, optimizer)

# Result:
# - Student: 5x smaller, 5x faster
# - Accuracy: 95% of teacher (vs 100%)
# - Cost: 1/5 of teacher

DistilBERT Example (Pre-distilled)

from transformers import DistilBertModel

# Instead of BERT-base (110M params)
model = DistilBertModel.from_pretrained('distilbert-base-uncased')
# 66M params, 60% faster, 97% of BERT-base accuracy

# Use for inference
embeddings = model(input_ids)

# Cost savings:
# BERT-base: 100ms latency
# DistilBERT: 40ms latency (2.5x faster)

Quantization for Cost Reduction

import torch

# Original model (FP32)
model = torch.load('model.pt')
model.eval()

# Quantize to INT8
quantized_model = torch.quantization.quantize_dynamic(
    model,
    {torch.nn.Linear},
    dtype=torch.qint8
)

# Benchmark
def benchmark(model, inputs, num_runs=100):
    import time
    
    start = time.time()
    for _ in range(num_runs):
        with torch.no_grad():
            model(inputs)
    
    elapsed = time.time() - start
    avg_latency = (elapsed / num_runs) * 1000
    
    return avg_latency

# Results:
fp32_latency = benchmark(model, test_input)
int8_latency = benchmark(quantized_model, test_input)

print(f"FP32: {fp32_latency:.2f}ms")
print(f"INT8: {int8_latency:.2f}ms")
print(f"Speedup: {fp32_latency/int8_latency:.2f}x")

# Typical output:
# FP32: 45.2ms
# INT8: 18.7ms
# Speedup: 2.4x

# Cost impact:
# If running on cloud GPUs: 2.4x fewer GPU hours needed
# Savings: $1000/month → $417/month

Monitoring Cost & Performance

from prometheus_client import Counter, Histogram, Gauge
import time

# Metrics
api_calls = Counter('llm_api_calls_total', 'Total API calls', ['model'])
api_costs = Counter('llm_api_costs_dollars', 'Total API costs', ['model'])
api_latency = Histogram('llm_api_latency_seconds', 'API latency', ['model'])
cache_hits = Counter('llm_cache_hits_total', 'Cache hits')
cache_misses = Counter('llm_cache_misses_total', 'Cache misses')

def monitored_llm_call(prompt, model='gpt-3.5-turbo'):
    start = time.time()
    
    # Check cache
    cached = cache.get(prompt, model)
    if cached:
        cache_hits.inc()
        return cached
    
    cache_misses.inc()
    
    # API call
    response = openai.ChatCompletion.create(
        model=model,
        messages=[{"role": "user", "content": prompt}]
    )
    
    # Track metrics
    latency = time.time() - start
    api_latency.labels(model=model).observe(latency)
    api_calls.labels(model=model).inc()
    
    # Calculate cost
    usage = response.usage
    cost = calculate_cost(model, usage.prompt_tokens, usage.completion_tokens)
    api_costs.labels(model=model).inc(cost)
    
    result = response.choices[0].message.content
    cache.set(prompt, model, result)
    
    return result

# Dashboard queries (Prometheus):
# - sum(rate(llm_api_costs_dollars[1h])) → Hourly spend
# - histogram_quantile(0.95, llm_api_latency_seconds) → P95 latency
# - cache_hits / (cache_hits + cache_misses) → Cache hit rate

Cost Optimization Checklist

# ✅ Implemented optimizations
OPTIMIZATIONS = {
    'prompt_compression': {
        'enabled': True,
        'savings': '30-50% token reduction'
    },
    'caching': {
        'enabled': True,
        'hit_rate': 0.70,
        'savings': '$1,400/day'
    },
    'model_selection': {
        'enabled': True,
        'routing': 'simple → gpt-3.5, complex → gpt-4',
        'savings': '68% cost reduction'
    },
    'batching': {
        'enabled': True,
        'batch_size': 16,
        'throughput_gain': '10x'
    },
    'quantization': {
        'enabled': True,
        'precision': 'INT8',
        'speedup': '2.4x'
    },
    'distillation': {
        'enabled': False,  # Future work
        'potential_savings': '80% cost reduction'
    }
}

def generate_cost_report():
    """Generate monthly cost projection."""
    
    baseline_monthly = 1_000_000  # $1M/month baseline
    
    # Apply optimizations
    cost = baseline_monthly
    cost *= (1 - 0.5)   # Prompt compression: -50%
    cost *= (1 - 0.7)   # Caching: -70%
    cost *= (1 - 0.68)  # Model selection: -68%
    
    print(f"Baseline: ${baseline_monthly:,}")
    print(f"Optimized: ${cost:,.2f}")
    print(f"Savings: ${baseline_monthly - cost:,.2f} ({(1-cost/baseline_monthly)*100:.1f}%)")

# Example output:
# Baseline: $1,000,000
# Optimized: $48,000
# Savings: $952,000 (95.2%)

Key Takeaways

  • Token costs: Input/output priced separately, output usually 2x more expensive
  • Prompt compression: 30-50% reduction without quality loss
  • Caching: 70% hit rate → 70% cost reduction (huge!)
  • Model routing: Use cheapest sufficient model (68% savings)
  • Batching: Trades latency for throughput (10x improvement possible)
  • Streaming: Reduces perceived latency without cost increase
  • Distillation: 5x smaller models, 95% accuracy, 1/5 cost
  • Quantization: INT8 → 2-4x speedup, 4x smaller
  • Monitor everything: Track costs, latency, cache hits in real-time

Trong bài tiếp theo, chúng ta sẽ khám phá System Observability - logging, metrics, distributed tracing, và incident management cho AI systems.


Bài viết thuộc series "From Zero to AI Engineer" - Module 10: Scalability & Observability