Deploying RAG Systems to Production
Production-ready RAG: architecture, scaling, monitoring, error handling, and operational best practices for reliable deployments.
- Author
- Ailog Research Team
- Published
- Reading time
- 14 min read
- Level
- advanced
Production vs Prototype
| Aspect | Prototype | Production | |--------|-----------|----------| | Availability | Best effort | 99.9%+ SLA | | Latency | Variable | p95 < 2s | | Error handling | Crashes | Graceful degradation | | Monitoring | None | Comprehensive | | Cost | Don't care | Optimized | | Scale | Single user | Thousands concurrent | | Data | Sample | Full corpus |
Production Architecture
Basic Architecture
`` ┌─────────┐ ┌──────────┐ ┌──────────┐ ┌─────┐ │ User │────>│ API │────>│ Vector │────>│ LLM │ │ Query │ │ Gateway │ │ DB │ │ │ └─────────┘ └──────────┘ └──────────┘ └─────┘ │ │ │ v v v ┌──────────┐ ┌──────────┐ ┌──────────┐ │ Cache │ │ Embedding│ │ Monitor │ │ Layer │ │ Service │ │ │ └──────────┘ └──────────┘ └──────────┘ `
Components
API Gateway • Rate limiting • Authentication • Request routing • Load balancing
Vector Database • Managed (Pinecone) or self-hosted (Qdrant) • Replicated for HA • Backups configured
Embedding Service • Separate service for embeddings • Batch processing • Model caching
LLM Service • API provider (OpenAI) or self-hosted • Fallback providers • Response streaming
Cache Layer • Redis or Memcached • Cache frequent queries • Cache embeddings
Monitoring • Prometheus + Grafana • Error tracking (Sentry) • Logging (ELK stack)
Scaling Strategies
Vertical Scaling
Vector Database • More CPU: Faster search • More RAM: Larger indexes in memory • GPU: Accelerated similarity search
API Servers • More CPU: Handle more concurrent requests • More RAM: Larger caches
Limits: • Single machine maximum • Expensive at high end • No redundancy
Horizontal Scaling
Load Balancing `python Multiple API servers behind load balancer
NGINX config upstream rag_api { least_conn; Route to least busy server server api1.example.com:8000; server api2.example.com:8000; server api3.example.com:8000; }
server { listen 80;
location / { proxy_pass http://rag_api; proxy_set_header Host $host; } } `
Vector DB Sharding `python Shard by document ID def get_shard(doc_id, num_shards=4): return hash(doc_id) % num_shards
Query all shards, merge results async def distributed_search(query_vector, k=5): tasks = [ search_shard(shard_id, query_vector, k=k) for shard_id in range(num_shards) ]
all_results = await asyncio.gather(tasks)
Merge and re-rank merged = merge_results(all_results) return merged[:k] `
Read Replicas `python Write to primary, read from replicas class VectorDBCluster: def __init__(self, primary, replicas): self.primary = primary self.replicas = replicas self.replica_index = 0
def write(self, vector): All writes go to primary self.primary.upsert(vector)
def search(self, query_vector, k=5): Read from replica (round-robin) replica = self.replicas[self.replica_index] self.replica_index = (self.replica_index + 1) % len(self.replicas)
return replica.search(query_vector, k=k) `
Auto-Scaling
`yaml Kubernetes HPA (Horizontal Pod Autoscaler) apiVersion: autoscaling/v2 kind: HorizontalPodAutoscaler metadata: name: rag-api-hpa spec: scaleTargetRef: apiVersion: apps/v1 kind: Deployment name: rag-api minReplicas: 2 maxReplicas: 10 metrics: • type: Resource resource: name: cpu target: type: Utilization averageUtilization: 70 • type: Resource resource: name: memory target: type: Utilization averageUtilization: 80 `
Caching
Query Caching
`python import redis import hashlib import json
class QueryCache: def __init__(self, redis_client, ttl=3600): self.redis = redis_client self.ttl = ttl
def get_cache_key(self, query, k): Hash query for cache key query_hash = hashlib.md5(f"{query}:{k}".encode()).hexdigest() return f"rag:query:{query_hash}"
def get(self, query, k): key = self.get_cache_key(query, k) cached = self.redis.get(key)
if cached: return json.loads(cached) return None
def set(self, query, k, result): key = self.get_cache_key(query, k) self.redis.setex(key, self.ttl, json.dumps(result))
Usage cache = QueryCache(redis.Redis(host='localhost'))
def rag_query(query, k=5): Check cache cached = cache.get(query, k) if cached: return cached
Execute RAG result = execute_rag_pipeline(query, k)
Cache result cache.set(query, k, result)
return result `
Embedding Caching
`python class EmbeddingCache: def __init__(self, redis_client): self.redis = redis_client
def get_embedding(self, text): key = f"emb:{hashlib.md5(text.encode()).hexdigest()}"
Check cache cached = self.redis.get(key) if cached: return np.frombuffer(cached, dtype=np.float32)
Generate embedding embedding = embed_model.encode(text)
Cache (no TTL - embeddings don't change) self.redis.set(key, embedding.tobytes())
return embedding `
Cache Invalidation
`python def update_document(doc_id, new_content): Update vector DB embedding = embed(new_content) vector_db.upsert(doc_id, embedding, metadata={'content': new_content})
Invalidate related cache entries invalidate_cache_for_document(doc_id)
def invalidate_cache_for_document(doc_id): Find all cached queries that retrieved this document pattern = f"rag:query:" for key in redis.scan_iter(match=pattern): cached_result = json.loads(redis.get(key))
If this document was in the results, invalidate if any(doc['id'] == doc_id for doc in cached_result.get('documents', [])): redis.delete(key) `
Error Handling
Graceful Degradation
`python class RobustRAG: def __init__(self, primary_llm, fallback_llm, vector_db): self.primary_llm = primary_llm self.fallback_llm = fallback_llm self.vector_db = vector_db
async def query(self, user_query, k=5): try: Try primary retrieval contexts = await self.vector_db.search(user_query, k=k) except Exception as e: logger.error(f"Vector DB error: {e}")
Fallback: return empty contexts contexts = [] Or: use keyword search fallback contexts = await self.keyword_search_fallback(user_query)
try: Try primary LLM answer = await self.primary_llm.generate( query=user_query, contexts=contexts ) except Exception as e: logger.error(f"Primary LLM error: {e}")
try: Fallback to secondary LLM answer = await self.fallback_llm.generate( query=user_query, contexts=contexts ) except Exception as e2: logger.error(f"Fallback LLM error: {e2}")
Ultimate fallback answer = "I'm experiencing technical difficulties. Please try again later."
return { 'answer': answer, 'contexts': contexts, 'fallback_used': False Track for monitoring } `
Retry Logic
`python from tenacity import retry, stop_after_attempt, wait_exponential
@retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10), reraise=True ) async def resilient_llm_call(llm, prompt): """ Retry with exponential backoff • Attempt 1: immediate • Attempt 2: wait 1s • Attempt 3: wait 2s """ return await llm.generate(prompt) `
Circuit Breaker
`python class CircuitBreaker: def __init__(self, failure_threshold=5, timeout=60): self.failure_count = 0 self.failure_threshold = failure_threshold self.timeout = timeout self.state = 'CLOSED' CLOSED, OPEN, HALF_OPEN self.last_failure_time = None
async def call(self, func, args, kwargs): if self.state == 'OPEN': if time.time() - self.last_failure_time > self.timeout: self.state = 'HALF_OPEN' else: raise Exception("Circuit breaker is OPEN")
try: result = await func(args, *kwargs)
Success: reset if self.state == 'HALF_OPEN': self.state = 'CLOSED' self.failure_count = 0
return result
except Exception as e: self.failure_count += 1 self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold: self.state = 'OPEN'
raise e
Usage llm_breaker = CircuitBreaker(failure_threshold=5, timeout=60)
async def safe_llm_call(prompt): return await llm_breaker.call(llm.generate, prompt) `
Monitoring
Metrics
`python from prometheus_client import Counter, Histogram, Gauge
Request metrics query_counter = Counter('rag_queries_total', 'Total RAG queries') query_duration = Histogram('rag_query_duration_seconds', 'Query duration') error_counter = Counter('rag_errors_total', 'Total errors', ['type'])
Component metrics retrieval_latency = Histogram('rag_retrieval_latency_seconds', 'Retrieval latency') llm_latency = Histogram('rag_llm_latency_seconds', 'LLM latency')
Quality metrics avg_precision = Gauge('rag_precision_at_5', 'Average precision@5') thumbs_up_rate = Gauge('rag_thumbs_up_rate', 'Thumbs up rate')
Usage @query_duration.time() async def handle_query(query): query_counter.inc()
try: Retrieval with retrieval_latency.time(): contexts = await retrieve(query)
Generation with llm_latency.time(): answer = await generate(query, contexts)
return answer
except Exception as e: error_counter.labels(type=type(e).__name__).inc() raise `
Logging
`python import logging import json
Structured logging class JSONFormatter(logging.Formatter): def format(self, record): log_data = { 'timestamp': self.formatTime(record), 'level': record.levelname, 'message': record.getMessage(), 'module': record.module, }
if hasattr(record, 'query'): log_data['query'] = record.query if hasattr(record, 'latency'): log_data['latency'] = record.latency
return json.dumps(log_data)
Configure logger logger = logging.getLogger('rag') handler = logging.StreamHandler() handler.setFormatter(JSONFormatter()) logger.addHandler(handler) logger.setLevel(logging.INFO)
Usage logger.info('Query processed', extra={ 'query': user_query, 'latency': latency_ms, 'num_contexts': len(contexts) }) `
Alerting
`yaml Prometheus alerting rules groups: • name: rag_alerts rules: • alert: HighErrorRate expr: rate(rag_errors_total[5m]) > 0.1 for: 5m annotations: summary: "High error rate in RAG system" description: "Error rate is {{ $value }} errors/sec" • alert: HighLatency expr: histogram_quantile(0.95, rate(rag_query_duration_seconds_bucket[5m])) > 5 for: 10m annotations: summary: "High query latency" description: "p95 latency is {{ $value }}s" • alert: LowPrecision expr: rag_precision_at_5 < 0.6 for: 30m annotations: summary: "Retrieval precision degraded" description: "Precision@5 is {{ $value }}" `
Data Pipeline
Document Ingestion
`python class DocumentIngestionPipeline: def __init__(self, vector_db, embedding_service): self.vector_db = vector_db self.embedding_service = embedding_service
async def ingest_document(self, document): try: Extract text text = extract_text(document) Chunk chunks = chunk_document(text, chunk_size=512, overlap=50) Embed (batched) embeddings = await self.embedding_service.embed_batch(chunks) Upload to vector DB (batched) await self.vector_db.upsert_batch( ids=[f"{document.id}_{i}" for i in range(len(chunks))], embeddings=embeddings, metadatas=[{ 'doc_id': document.id, 'chunk_index': i, 'content': chunk } for i, chunk in enumerate(chunks)] )
logger.info(f"Ingested document {document.id}: {len(chunks)} chunks")
except Exception as e: logger.error(f"Failed to ingest document {document.id}: {e}") raise
Background job async def batch_ingestion(): pipeline = DocumentIngestionPipeline(vector_db, embedding_service)
Get new documents new_docs = await get_new_documents()
Process in parallel tasks = [pipeline.ingest_document(doc) for doc in new_docs] await asyncio.gather(tasks, return_exceptions=True) `
Incremental Updates
`python async def update_document(doc_id, new_content): Delete old chunks await vector_db.delete(filter={'doc_id': doc_id})
Ingest new version await ingestion_pipeline.ingest_document({ 'id': doc_id, 'content': new_content })
Invalidate caches invalidate_cache_for_document(doc_id) `
Cost Optimization
Embeddings
`python Batch embedding for better throughput/cost async def cost_optimized_embedding(texts, batch_size=100): embeddings = []
for i in range(0, len(texts), batch_size): batch = texts[i:i+batch_size]
Single API call for batch batch_embeddings = await embedding_api.embed_batch(batch) embeddings.extend(batch_embeddings)
Rate limiting await asyncio.sleep(0.1)
return embeddings `
LLM Calls
`python Reduce token usage def optimize_context(query, chunks, max_tokens=2000): """ Fit as much relevant context as possible within token budget """ selected_chunks = [] total_tokens = 0
for chunk in chunks: chunk_tokens = count_tokens(chunk)
if total_tokens + chunk_tokens <= max_tokens: selected_chunks.append(chunk) total_tokens += chunk_tokens else: break
return selected_chunks `
Caching Strategy • Cache embeddings indefinitely (deterministic) • Cache frequent queries (1 hour TTL) • Cache popular query patterns (24 hour TTL) • Invalidate on content updates
Security
API Authentication
`python from fastapi import Depends, HTTPException, Security from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
security = HTTPBearer()
async def verify_token(credentials: HTTPAuthorizationCredentials = Security(security)): token = credentials.credentials
Verify JWT try: payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"]) return payload except jwt.ExpiredSignatureError: raise HTTPException(status_code=401, detail="Token expired") except jwt.InvalidTokenError: raise HTTPException(status_code=401, detail="Invalid token")
@app.post("/query") async def query(request: QueryRequest, user = Depends(verify_token)): Process query return await rag_system.query(request.query) `
Rate Limiting
`python from slowapi import Limiter from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
@app.post("/query") @limiter.limit("100/hour") async def query(request: QueryRequest): return await rag_system.query(request.query) `
Input Sanitization
`python def sanitize_query(query: str) -> str: Limit length max_length = 1000 query = query[:max_length]
Remove potentially harmful characters query = re.sub(r'[^\w\s\?.,!-]', '', query)
Prevent prompt injection injection_patterns = [ r'ignore previous instructions', r'disregard above', r'system:', ]
for pattern in injection_patterns: if re.search(pattern, query, re.IGNORECASE): raise ValueError("Potentially harmful query detected")
return query ``
Deployment Checklist • [ ] Vector DB: Replicated, backed up • [ ] API: Auto-scaling, health checks • [ ] Caching: Redis cluster, eviction policy • [ ] Monitoring: Metrics, logs, alerts • [ ] Error handling: Retries, fallbacks, circuit breakers • [ ] Security: Authentication, rate limiting, input validation • [ ] Documentation: API docs, runbooks • [ ] Testing: Load tests, chaos testing • [ ] Rollback plan: Blue-green deployment • [ ] On-call: Runbooks, escalation procedures
Next Steps
Optimizing query processing and managing context windows effectively are critical for cost and quality. The next guides cover query optimization techniques and context window management strategies.