Deploying RAG Systems to Production
Production-ready RAG: architecture, scaling, monitoring, error handling, and operational best practices for reliable deployments.
Production vs Prototype
| Aspect | Prototype | Production |
|---|---|---|
| Availability | Best effort | 99.9%+ SLA |
| Latency | Variable | p95 < 2s |
| Error handling | Crashes | Graceful degradation |
| Monitoring | None | Comprehensive |
| Cost | Don't care | Optimized |
| Scale | Single user | Thousands concurrent |
| Data | Sample | Full corpus |
Production Architecture
Basic Architecture
┌─────────┐ ┌──────────┐ ┌──────────┐ ┌─────┐
│ User │────>│ API │────>│ Vector │────>│ LLM │
│ Query │ │ Gateway │ │ DB │ │ │
└─────────┘ └──────────┘ └──────────┘ └─────┘
│ │ │
v v v
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Cache │ │ Embedding│ │ Monitor │
│ Layer │ │ Service │ │ │
└──────────┘ └──────────┘ └──────────┘
Components
API Gateway
- Rate limiting
- Authentication
- Request routing
- Load balancing
Vector Database
- Managed (Pinecone) or self-hosted (Qdrant)
- Replicated for HA
- Backups configured
Embedding Service
- Separate service for embeddings
- Batch processing
- Model caching
LLM Service
- API provider (OpenAI) or self-hosted
- Fallback providers
- Response streaming
Cache Layer
- Redis or Memcached
- Cache frequent queries
- Cache embeddings
Monitoring
- Prometheus + Grafana
- Error tracking (Sentry)
- Logging (ELK stack)
Scaling Strategies
Vertical Scaling
Vector Database
- More CPU: Faster search
- More RAM: Larger indexes in memory
- GPU: Accelerated similarity search
API Servers
- More CPU: Handle more concurrent requests
- More RAM: Larger caches
Limits:
- Single machine maximum
- Expensive at high end
- No redundancy
Horizontal Scaling
Load Balancing
DEVELOPERpython# Multiple API servers behind load balancer # NGINX config upstream rag_api { least_conn; # Route to least busy server server api1.example.com:8000; server api2.example.com:8000; server api3.example.com:8000; } server { listen 80; location / { proxy_pass http://rag_api; proxy_set_header Host $host; } }
Vector DB Sharding
DEVELOPERpython# Shard by document ID def get_shard(doc_id, num_shards=4): return hash(doc_id) % num_shards # Query all shards, merge results async def distributed_search(query_vector, k=5): tasks = [ search_shard(shard_id, query_vector, k=k) for shard_id in range(num_shards) ] all_results = await asyncio.gather(*tasks) # Merge and re-rank merged = merge_results(all_results) return merged[:k]
Read Replicas
DEVELOPERpython# Write to primary, read from replicas class VectorDBCluster: def __init__(self, primary, replicas): self.primary = primary self.replicas = replicas self.replica_index = 0 def write(self, vector): # All writes go to primary self.primary.upsert(vector) def search(self, query_vector, k=5): # Read from replica (round-robin) replica = self.replicas[self.replica_index] self.replica_index = (self.replica_index + 1) % len(self.replicas) return replica.search(query_vector, k=k)
Auto-Scaling
DEVELOPERyaml# Kubernetes HPA (Horizontal Pod Autoscaler) apiVersion: autoscaling/v2 kind: HorizontalPodAutoscaler metadata: name: rag-api-hpa spec: scaleTargetRef: apiVersion: apps/v1 kind: Deployment name: rag-api minReplicas: 2 maxReplicas: 10 metrics: - type: Resource resource: name: cpu target: type: Utilization averageUtilization: 70 - type: Resource resource: name: memory target: type: Utilization averageUtilization: 80
Caching
Query Caching
DEVELOPERpythonimport redis import hashlib import json class QueryCache: def __init__(self, redis_client, ttl=3600): self.redis = redis_client self.ttl = ttl def get_cache_key(self, query, k): # Hash query for cache key query_hash = hashlib.md5(f"{query}:{k}".encode()).hexdigest() return f"rag:query:{query_hash}" def get(self, query, k): key = self.get_cache_key(query, k) cached = self.redis.get(key) if cached: return json.loads(cached) return None def set(self, query, k, result): key = self.get_cache_key(query, k) self.redis.setex(key, self.ttl, json.dumps(result)) # Usage cache = QueryCache(redis.Redis(host='localhost')) def rag_query(query, k=5): # Check cache cached = cache.get(query, k) if cached: return cached # Execute RAG result = execute_rag_pipeline(query, k) # Cache result cache.set(query, k, result) return result
Embedding Caching
DEVELOPERpythonclass EmbeddingCache: def __init__(self, redis_client): self.redis = redis_client def get_embedding(self, text): key = f"emb:{hashlib.md5(text.encode()).hexdigest()}" # Check cache cached = self.redis.get(key) if cached: return np.frombuffer(cached, dtype=np.float32) # Generate embedding embedding = embed_model.encode(text) # Cache (no TTL - embeddings don't change) self.redis.set(key, embedding.tobytes()) return embedding
Cache Invalidation
DEVELOPERpythondef update_document(doc_id, new_content): # Update vector DB embedding = embed(new_content) vector_db.upsert(doc_id, embedding, metadata={'content': new_content}) # Invalidate related cache entries invalidate_cache_for_document(doc_id) def invalidate_cache_for_document(doc_id): # Find all cached queries that retrieved this document pattern = f"rag:query:*" for key in redis.scan_iter(match=pattern): cached_result = json.loads(redis.get(key)) # If this document was in the results, invalidate if any(doc['id'] == doc_id for doc in cached_result.get('documents', [])): redis.delete(key)
Error Handling
Graceful Degradation
DEVELOPERpythonclass RobustRAG: def __init__(self, primary_llm, fallback_llm, vector_db): self.primary_llm = primary_llm self.fallback_llm = fallback_llm self.vector_db = vector_db async def query(self, user_query, k=5): try: # Try primary retrieval contexts = await self.vector_db.search(user_query, k=k) except Exception as e: logger.error(f"Vector DB error: {e}") # Fallback: return empty contexts contexts = [] # Or: use keyword search fallback # contexts = await self.keyword_search_fallback(user_query) try: # Try primary LLM answer = await self.primary_llm.generate( query=user_query, contexts=contexts ) except Exception as e: logger.error(f"Primary LLM error: {e}") try: # Fallback to secondary LLM answer = await self.fallback_llm.generate( query=user_query, contexts=contexts ) except Exception as e2: logger.error(f"Fallback LLM error: {e2}") # Ultimate fallback answer = "I'm experiencing technical difficulties. Please try again later." return { 'answer': answer, 'contexts': contexts, 'fallback_used': False # Track for monitoring }
Retry Logic
DEVELOPERpythonfrom tenacity import retry, stop_after_attempt, wait_exponential @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10), reraise=True ) async def resilient_llm_call(llm, prompt): """ Retry with exponential backoff - Attempt 1: immediate - Attempt 2: wait 1s - Attempt 3: wait 2s """ return await llm.generate(prompt)
Circuit Breaker
DEVELOPERpythonclass CircuitBreaker: def __init__(self, failure_threshold=5, timeout=60): self.failure_count = 0 self.failure_threshold = failure_threshold self.timeout = timeout self.state = 'CLOSED' # CLOSED, OPEN, HALF_OPEN self.last_failure_time = None async def call(self, func, *args, **kwargs): if self.state == 'OPEN': if time.time() - self.last_failure_time > self.timeout: self.state = 'HALF_OPEN' else: raise Exception("Circuit breaker is OPEN") try: result = await func(*args, **kwargs) # Success: reset if self.state == 'HALF_OPEN': self.state = 'CLOSED' self.failure_count = 0 return result except Exception as e: self.failure_count += 1 self.last_failure_time = time.time() if self.failure_count >= self.failure_threshold: self.state = 'OPEN' raise e # Usage llm_breaker = CircuitBreaker(failure_threshold=5, timeout=60) async def safe_llm_call(prompt): return await llm_breaker.call(llm.generate, prompt)
Monitoring
Metrics
DEVELOPERpythonfrom prometheus_client import Counter, Histogram, Gauge # Request metrics query_counter = Counter('rag_queries_total', 'Total RAG queries') query_duration = Histogram('rag_query_duration_seconds', 'Query duration') error_counter = Counter('rag_errors_total', 'Total errors', ['type']) # Component metrics retrieval_latency = Histogram('rag_retrieval_latency_seconds', 'Retrieval latency') llm_latency = Histogram('rag_llm_latency_seconds', 'LLM latency') # Quality metrics avg_precision = Gauge('rag_precision_at_5', 'Average precision@5') thumbs_up_rate = Gauge('rag_thumbs_up_rate', 'Thumbs up rate') # Usage @query_duration.time() async def handle_query(query): query_counter.inc() try: # Retrieval with retrieval_latency.time(): contexts = await retrieve(query) # Generation with llm_latency.time(): answer = await generate(query, contexts) return answer except Exception as e: error_counter.labels(type=type(e).__name__).inc() raise
Logging
DEVELOPERpythonimport logging import json # Structured logging class JSONFormatter(logging.Formatter): def format(self, record): log_data = { 'timestamp': self.formatTime(record), 'level': record.levelname, 'message': record.getMessage(), 'module': record.module, } if hasattr(record, 'query'): log_data['query'] = record.query if hasattr(record, 'latency'): log_data['latency'] = record.latency return json.dumps(log_data) # Configure logger logger = logging.getLogger('rag') handler = logging.StreamHandler() handler.setFormatter(JSONFormatter()) logger.addHandler(handler) logger.setLevel(logging.INFO) # Usage logger.info('Query processed', extra={ 'query': user_query, 'latency': latency_ms, 'num_contexts': len(contexts) })
Alerting
DEVELOPERyaml# Prometheus alerting rules groups: - name: rag_alerts rules: - alert: HighErrorRate expr: rate(rag_errors_total[5m]) > 0.1 for: 5m annotations: summary: "High error rate in RAG system" description: "Error rate is {{ $value }} errors/sec" - alert: HighLatency expr: histogram_quantile(0.95, rate(rag_query_duration_seconds_bucket[5m])) > 5 for: 10m annotations: summary: "High query latency" description: "p95 latency is {{ $value }}s" - alert: LowPrecision expr: rag_precision_at_5 < 0.6 for: 30m annotations: summary: "Retrieval precision degraded" description: "Precision@5 is {{ $value }}"
Data Pipeline
Document Ingestion
DEVELOPERpythonclass DocumentIngestionPipeline: def __init__(self, vector_db, embedding_service): self.vector_db = vector_db self.embedding_service = embedding_service async def ingest_document(self, document): try: # 1. Extract text text = extract_text(document) # 2. Chunk chunks = chunk_document(text, chunk_size=512, overlap=50) # 3. Embed (batched) embeddings = await self.embedding_service.embed_batch(chunks) # 4. Upload to vector DB (batched) await self.vector_db.upsert_batch( ids=[f"{document.id}_{i}" for i in range(len(chunks))], embeddings=embeddings, metadatas=[{ 'doc_id': document.id, 'chunk_index': i, 'content': chunk } for i, chunk in enumerate(chunks)] ) logger.info(f"Ingested document {document.id}: {len(chunks)} chunks") except Exception as e: logger.error(f"Failed to ingest document {document.id}: {e}") raise # Background job async def batch_ingestion(): pipeline = DocumentIngestionPipeline(vector_db, embedding_service) # Get new documents new_docs = await get_new_documents() # Process in parallel tasks = [pipeline.ingest_document(doc) for doc in new_docs] await asyncio.gather(*tasks, return_exceptions=True)
Incremental Updates
DEVELOPERpythonasync def update_document(doc_id, new_content): # Delete old chunks await vector_db.delete(filter={'doc_id': doc_id}) # Ingest new version await ingestion_pipeline.ingest_document({ 'id': doc_id, 'content': new_content }) # Invalidate caches invalidate_cache_for_document(doc_id)
Cost Optimization
Embeddings
DEVELOPERpython# Batch embedding for better throughput/cost async def cost_optimized_embedding(texts, batch_size=100): embeddings = [] for i in range(0, len(texts), batch_size): batch = texts[i:i+batch_size] # Single API call for batch batch_embeddings = await embedding_api.embed_batch(batch) embeddings.extend(batch_embeddings) # Rate limiting await asyncio.sleep(0.1) return embeddings
LLM Calls
DEVELOPERpython# Reduce token usage def optimize_context(query, chunks, max_tokens=2000): """ Fit as much relevant context as possible within token budget """ selected_chunks = [] total_tokens = 0 for chunk in chunks: chunk_tokens = count_tokens(chunk) if total_tokens + chunk_tokens <= max_tokens: selected_chunks.append(chunk) total_tokens += chunk_tokens else: break return selected_chunks
Caching Strategy
- Cache embeddings indefinitely (deterministic)
- Cache frequent queries (1 hour TTL)
- Cache popular query patterns (24 hour TTL)
- Invalidate on content updates
Security
API Authentication
DEVELOPERpythonfrom fastapi import Depends, HTTPException, Security from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials security = HTTPBearer() async def verify_token(credentials: HTTPAuthorizationCredentials = Security(security)): token = credentials.credentials # Verify JWT try: payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"]) return payload except jwt.ExpiredSignatureError: raise HTTPException(status_code=401, detail="Token expired") except jwt.InvalidTokenError: raise HTTPException(status_code=401, detail="Invalid token") @app.post("/query") async def query(request: QueryRequest, user = Depends(verify_token)): # Process query return await rag_system.query(request.query)
Rate Limiting
DEVELOPERpythonfrom slowapi import Limiter from slowapi.util import get_remote_address limiter = Limiter(key_func=get_remote_address) @app.post("/query") @limiter.limit("100/hour") async def query(request: QueryRequest): return await rag_system.query(request.query)
Input Sanitization
DEVELOPERpythondef sanitize_query(query: str) -> str: # Limit length max_length = 1000 query = query[:max_length] # Remove potentially harmful characters query = re.sub(r'[^\w\s\?.,!-]', '', query) # Prevent prompt injection injection_patterns = [ r'ignore previous instructions', r'disregard above', r'system:', ] for pattern in injection_patterns: if re.search(pattern, query, re.IGNORECASE): raise ValueError("Potentially harmful query detected") return query
Deployment Checklist
- Vector DB: Replicated, backed up
- API: Auto-scaling, health checks
- Caching: Redis cluster, eviction policy
- Monitoring: Metrics, logs, alerts
- Error handling: Retries, fallbacks, circuit breakers
- Security: Authentication, rate limiting, input validation
- Documentation: API docs, runbooks
- Testing: Load tests, chaos testing
- Rollback plan: Blue-green deployment
- On-call: Runbooks, escalation procedures
Next Steps
Optimizing query processing and managing context windows effectively are critical for cost and quality. The next guides cover query optimization techniques and context window management strategies.
Tags
Related Guides
RAG Monitoring and Observability
Monitor RAG systems in production: track latency, costs, accuracy, and user satisfaction with metrics and dashboards.
Pinecone for Production RAG at Scale
Deploy production-ready vector search: Pinecone setup, indexing strategies, and scaling to billions of vectors.
Cohere Rerank API for Production RAG
Boost RAG accuracy by 40% with Cohere's Rerank API: simple integration, multilingual support, production-ready.