GuideAdvanced

Deploying RAG Systems to Production

February 20, 2025
14 min read
Ailog Research Team

Production-ready RAG: architecture, scaling, monitoring, error handling, and operational best practices for reliable deployments.

Production vs Prototype

AspectPrototypeProduction
AvailabilityBest effort99.9%+ SLA
LatencyVariablep95 < 2s
Error handlingCrashesGraceful degradation
MonitoringNoneComprehensive
CostDon't careOptimized
ScaleSingle userThousands concurrent
DataSampleFull corpus

Production Architecture

Basic Architecture

┌─────────┐     ┌──────────┐     ┌──────────┐     ┌─────┐
│ User    │────>│ API      │────>│ Vector   │────>│ LLM │
│ Query   │     │ Gateway  │     │ DB       │     │     │
└─────────┘     └──────────┘     └──────────┘     └─────┘
                     │                 │              │
                     v                 v              v
                ┌──────────┐     ┌──────────┐   ┌──────────┐
                │ Cache    │     │ Embedding│   │ Monitor  │
                │ Layer    │     │ Service  │   │          │
                └──────────┘     └──────────┘   └──────────┘

Components

API Gateway

  • Rate limiting
  • Authentication
  • Request routing
  • Load balancing

Vector Database

  • Managed (Pinecone) or self-hosted (Qdrant)
  • Replicated for HA
  • Backups configured

Embedding Service

  • Separate service for embeddings
  • Batch processing
  • Model caching

LLM Service

  • API provider (OpenAI) or self-hosted
  • Fallback providers
  • Response streaming

Cache Layer

  • Redis or Memcached
  • Cache frequent queries
  • Cache embeddings

Monitoring

  • Prometheus + Grafana
  • Error tracking (Sentry)
  • Logging (ELK stack)

Scaling Strategies

Vertical Scaling

Vector Database

  • More CPU: Faster search
  • More RAM: Larger indexes in memory
  • GPU: Accelerated similarity search

API Servers

  • More CPU: Handle more concurrent requests
  • More RAM: Larger caches

Limits:

  • Single machine maximum
  • Expensive at high end
  • No redundancy

Horizontal Scaling

Load Balancing

DEVELOPERpython
# Multiple API servers behind load balancer # NGINX config upstream rag_api { least_conn; # Route to least busy server server api1.example.com:8000; server api2.example.com:8000; server api3.example.com:8000; } server { listen 80; location / { proxy_pass http://rag_api; proxy_set_header Host $host; } }

Vector DB Sharding

DEVELOPERpython
# Shard by document ID def get_shard(doc_id, num_shards=4): return hash(doc_id) % num_shards # Query all shards, merge results async def distributed_search(query_vector, k=5): tasks = [ search_shard(shard_id, query_vector, k=k) for shard_id in range(num_shards) ] all_results = await asyncio.gather(*tasks) # Merge and re-rank merged = merge_results(all_results) return merged[:k]

Read Replicas

DEVELOPERpython
# Write to primary, read from replicas class VectorDBCluster: def __init__(self, primary, replicas): self.primary = primary self.replicas = replicas self.replica_index = 0 def write(self, vector): # All writes go to primary self.primary.upsert(vector) def search(self, query_vector, k=5): # Read from replica (round-robin) replica = self.replicas[self.replica_index] self.replica_index = (self.replica_index + 1) % len(self.replicas) return replica.search(query_vector, k=k)

Auto-Scaling

DEVELOPERyaml
# Kubernetes HPA (Horizontal Pod Autoscaler) apiVersion: autoscaling/v2 kind: HorizontalPodAutoscaler metadata: name: rag-api-hpa spec: scaleTargetRef: apiVersion: apps/v1 kind: Deployment name: rag-api minReplicas: 2 maxReplicas: 10 metrics: - type: Resource resource: name: cpu target: type: Utilization averageUtilization: 70 - type: Resource resource: name: memory target: type: Utilization averageUtilization: 80

Caching

Query Caching

DEVELOPERpython
import redis import hashlib import json class QueryCache: def __init__(self, redis_client, ttl=3600): self.redis = redis_client self.ttl = ttl def get_cache_key(self, query, k): # Hash query for cache key query_hash = hashlib.md5(f"{query}:{k}".encode()).hexdigest() return f"rag:query:{query_hash}" def get(self, query, k): key = self.get_cache_key(query, k) cached = self.redis.get(key) if cached: return json.loads(cached) return None def set(self, query, k, result): key = self.get_cache_key(query, k) self.redis.setex(key, self.ttl, json.dumps(result)) # Usage cache = QueryCache(redis.Redis(host='localhost')) def rag_query(query, k=5): # Check cache cached = cache.get(query, k) if cached: return cached # Execute RAG result = execute_rag_pipeline(query, k) # Cache result cache.set(query, k, result) return result

Embedding Caching

DEVELOPERpython
class EmbeddingCache: def __init__(self, redis_client): self.redis = redis_client def get_embedding(self, text): key = f"emb:{hashlib.md5(text.encode()).hexdigest()}" # Check cache cached = self.redis.get(key) if cached: return np.frombuffer(cached, dtype=np.float32) # Generate embedding embedding = embed_model.encode(text) # Cache (no TTL - embeddings don't change) self.redis.set(key, embedding.tobytes()) return embedding

Cache Invalidation

DEVELOPERpython
def update_document(doc_id, new_content): # Update vector DB embedding = embed(new_content) vector_db.upsert(doc_id, embedding, metadata={'content': new_content}) # Invalidate related cache entries invalidate_cache_for_document(doc_id) def invalidate_cache_for_document(doc_id): # Find all cached queries that retrieved this document pattern = f"rag:query:*" for key in redis.scan_iter(match=pattern): cached_result = json.loads(redis.get(key)) # If this document was in the results, invalidate if any(doc['id'] == doc_id for doc in cached_result.get('documents', [])): redis.delete(key)

Error Handling

Graceful Degradation

DEVELOPERpython
class RobustRAG: def __init__(self, primary_llm, fallback_llm, vector_db): self.primary_llm = primary_llm self.fallback_llm = fallback_llm self.vector_db = vector_db async def query(self, user_query, k=5): try: # Try primary retrieval contexts = await self.vector_db.search(user_query, k=k) except Exception as e: logger.error(f"Vector DB error: {e}") # Fallback: return empty contexts contexts = [] # Or: use keyword search fallback # contexts = await self.keyword_search_fallback(user_query) try: # Try primary LLM answer = await self.primary_llm.generate( query=user_query, contexts=contexts ) except Exception as e: logger.error(f"Primary LLM error: {e}") try: # Fallback to secondary LLM answer = await self.fallback_llm.generate( query=user_query, contexts=contexts ) except Exception as e2: logger.error(f"Fallback LLM error: {e2}") # Ultimate fallback answer = "I'm experiencing technical difficulties. Please try again later." return { 'answer': answer, 'contexts': contexts, 'fallback_used': False # Track for monitoring }

Retry Logic

DEVELOPERpython
from tenacity import retry, stop_after_attempt, wait_exponential @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10), reraise=True ) async def resilient_llm_call(llm, prompt): """ Retry with exponential backoff - Attempt 1: immediate - Attempt 2: wait 1s - Attempt 3: wait 2s """ return await llm.generate(prompt)

Circuit Breaker

DEVELOPERpython
class CircuitBreaker: def __init__(self, failure_threshold=5, timeout=60): self.failure_count = 0 self.failure_threshold = failure_threshold self.timeout = timeout self.state = 'CLOSED' # CLOSED, OPEN, HALF_OPEN self.last_failure_time = None async def call(self, func, *args, **kwargs): if self.state == 'OPEN': if time.time() - self.last_failure_time > self.timeout: self.state = 'HALF_OPEN' else: raise Exception("Circuit breaker is OPEN") try: result = await func(*args, **kwargs) # Success: reset if self.state == 'HALF_OPEN': self.state = 'CLOSED' self.failure_count = 0 return result except Exception as e: self.failure_count += 1 self.last_failure_time = time.time() if self.failure_count >= self.failure_threshold: self.state = 'OPEN' raise e # Usage llm_breaker = CircuitBreaker(failure_threshold=5, timeout=60) async def safe_llm_call(prompt): return await llm_breaker.call(llm.generate, prompt)

Monitoring

Metrics

DEVELOPERpython
from prometheus_client import Counter, Histogram, Gauge # Request metrics query_counter = Counter('rag_queries_total', 'Total RAG queries') query_duration = Histogram('rag_query_duration_seconds', 'Query duration') error_counter = Counter('rag_errors_total', 'Total errors', ['type']) # Component metrics retrieval_latency = Histogram('rag_retrieval_latency_seconds', 'Retrieval latency') llm_latency = Histogram('rag_llm_latency_seconds', 'LLM latency') # Quality metrics avg_precision = Gauge('rag_precision_at_5', 'Average precision@5') thumbs_up_rate = Gauge('rag_thumbs_up_rate', 'Thumbs up rate') # Usage @query_duration.time() async def handle_query(query): query_counter.inc() try: # Retrieval with retrieval_latency.time(): contexts = await retrieve(query) # Generation with llm_latency.time(): answer = await generate(query, contexts) return answer except Exception as e: error_counter.labels(type=type(e).__name__).inc() raise

Logging

DEVELOPERpython
import logging import json # Structured logging class JSONFormatter(logging.Formatter): def format(self, record): log_data = { 'timestamp': self.formatTime(record), 'level': record.levelname, 'message': record.getMessage(), 'module': record.module, } if hasattr(record, 'query'): log_data['query'] = record.query if hasattr(record, 'latency'): log_data['latency'] = record.latency return json.dumps(log_data) # Configure logger logger = logging.getLogger('rag') handler = logging.StreamHandler() handler.setFormatter(JSONFormatter()) logger.addHandler(handler) logger.setLevel(logging.INFO) # Usage logger.info('Query processed', extra={ 'query': user_query, 'latency': latency_ms, 'num_contexts': len(contexts) })

Alerting

DEVELOPERyaml
# Prometheus alerting rules groups: - name: rag_alerts rules: - alert: HighErrorRate expr: rate(rag_errors_total[5m]) > 0.1 for: 5m annotations: summary: "High error rate in RAG system" description: "Error rate is {{ $value }} errors/sec" - alert: HighLatency expr: histogram_quantile(0.95, rate(rag_query_duration_seconds_bucket[5m])) > 5 for: 10m annotations: summary: "High query latency" description: "p95 latency is {{ $value }}s" - alert: LowPrecision expr: rag_precision_at_5 < 0.6 for: 30m annotations: summary: "Retrieval precision degraded" description: "Precision@5 is {{ $value }}"

Data Pipeline

Document Ingestion

DEVELOPERpython
class DocumentIngestionPipeline: def __init__(self, vector_db, embedding_service): self.vector_db = vector_db self.embedding_service = embedding_service async def ingest_document(self, document): try: # 1. Extract text text = extract_text(document) # 2. Chunk chunks = chunk_document(text, chunk_size=512, overlap=50) # 3. Embed (batched) embeddings = await self.embedding_service.embed_batch(chunks) # 4. Upload to vector DB (batched) await self.vector_db.upsert_batch( ids=[f"{document.id}_{i}" for i in range(len(chunks))], embeddings=embeddings, metadatas=[{ 'doc_id': document.id, 'chunk_index': i, 'content': chunk } for i, chunk in enumerate(chunks)] ) logger.info(f"Ingested document {document.id}: {len(chunks)} chunks") except Exception as e: logger.error(f"Failed to ingest document {document.id}: {e}") raise # Background job async def batch_ingestion(): pipeline = DocumentIngestionPipeline(vector_db, embedding_service) # Get new documents new_docs = await get_new_documents() # Process in parallel tasks = [pipeline.ingest_document(doc) for doc in new_docs] await asyncio.gather(*tasks, return_exceptions=True)

Incremental Updates

DEVELOPERpython
async def update_document(doc_id, new_content): # Delete old chunks await vector_db.delete(filter={'doc_id': doc_id}) # Ingest new version await ingestion_pipeline.ingest_document({ 'id': doc_id, 'content': new_content }) # Invalidate caches invalidate_cache_for_document(doc_id)

Cost Optimization

Embeddings

DEVELOPERpython
# Batch embedding for better throughput/cost async def cost_optimized_embedding(texts, batch_size=100): embeddings = [] for i in range(0, len(texts), batch_size): batch = texts[i:i+batch_size] # Single API call for batch batch_embeddings = await embedding_api.embed_batch(batch) embeddings.extend(batch_embeddings) # Rate limiting await asyncio.sleep(0.1) return embeddings

LLM Calls

DEVELOPERpython
# Reduce token usage def optimize_context(query, chunks, max_tokens=2000): """ Fit as much relevant context as possible within token budget """ selected_chunks = [] total_tokens = 0 for chunk in chunks: chunk_tokens = count_tokens(chunk) if total_tokens + chunk_tokens <= max_tokens: selected_chunks.append(chunk) total_tokens += chunk_tokens else: break return selected_chunks

Caching Strategy

  • Cache embeddings indefinitely (deterministic)
  • Cache frequent queries (1 hour TTL)
  • Cache popular query patterns (24 hour TTL)
  • Invalidate on content updates

Security

API Authentication

DEVELOPERpython
from fastapi import Depends, HTTPException, Security from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials security = HTTPBearer() async def verify_token(credentials: HTTPAuthorizationCredentials = Security(security)): token = credentials.credentials # Verify JWT try: payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"]) return payload except jwt.ExpiredSignatureError: raise HTTPException(status_code=401, detail="Token expired") except jwt.InvalidTokenError: raise HTTPException(status_code=401, detail="Invalid token") @app.post("/query") async def query(request: QueryRequest, user = Depends(verify_token)): # Process query return await rag_system.query(request.query)

Rate Limiting

DEVELOPERpython
from slowapi import Limiter from slowapi.util import get_remote_address limiter = Limiter(key_func=get_remote_address) @app.post("/query") @limiter.limit("100/hour") async def query(request: QueryRequest): return await rag_system.query(request.query)

Input Sanitization

DEVELOPERpython
def sanitize_query(query: str) -> str: # Limit length max_length = 1000 query = query[:max_length] # Remove potentially harmful characters query = re.sub(r'[^\w\s\?.,!-]', '', query) # Prevent prompt injection injection_patterns = [ r'ignore previous instructions', r'disregard above', r'system:', ] for pattern in injection_patterns: if re.search(pattern, query, re.IGNORECASE): raise ValueError("Potentially harmful query detected") return query

Deployment Checklist

  • Vector DB: Replicated, backed up
  • API: Auto-scaling, health checks
  • Caching: Redis cluster, eviction policy
  • Monitoring: Metrics, logs, alerts
  • Error handling: Retries, fallbacks, circuit breakers
  • Security: Authentication, rate limiting, input validation
  • Documentation: API docs, runbooks
  • Testing: Load tests, chaos testing
  • Rollback plan: Blue-green deployment
  • On-call: Runbooks, escalation procedures

Next Steps

Optimizing query processing and managing context windows effectively are critical for cost and quality. The next guides cover query optimization techniques and context window management strategies.

Tags

productiondeploymentscalingmonitoring

Related Guides