Bereitstellung von RAG-Systemen in der Produktion
RAG für den Produktionseinsatz: Architektur, Skalierung, Überwachung, Fehlerbehandlung und betriebliche Best Practices für zuverlässige Bereitstellungen.
Production vs Prototype
| Aspect | Prototype | Production |
|---|---|---|
| Disponibilité | Meilleur effort | SLA 99.9%+ |
| Latence | Variable | p95 < 2s |
| Gestion des erreurs | Plantages | Dégradation gracieuse |
| Surveillance | Aucune | Complète |
| Coût | Peu importe | Optimisé |
| Échelle | Utilisateur unique | Milliers concurrents |
| Données | Échantillon | Corpus complet |
Architecture de Production
Architecture de Base
┌─────────┐ ┌──────────┐ ┌──────────┐ ┌─────┐
│ Requête │────>│ API │────>│ Base de │────>│ LLM │
│ Util. │ │ Gateway │ │ Données │ │ │
└─────────┘ └──────────┘ └──────────┘ └─────┘
│ │ │
v v v
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Couche │ │ Service │ │ Surveillance│
│ Cache │ │ Embedding│ │ │
└──────────┘ └──────────┘ └──────────┘
Composants
API Gateway
- Limitation de débit
- Authentification
- Routage des requêtes
- Équilibrage de charge
Base de Données Vectorielle
- Gérée (Pinecone) ou auto-hébergée (Qdrant)
- Répliquée pour HA
- Sauvegardes configurées
Service d'Embedding
- Service séparé pour les embeddings
- Traitement par lots
- Mise en cache du modèle
Service LLM
- Fournisseur API (OpenAI) ou auto-hébergé
- Fournisseurs de secours
- Streaming des réponses
Couche de Cache
- Redis ou Memcached
- Cache des requêtes fréquentes
- Cache des embeddings
Surveillance
- Prometheus + Grafana
- Suivi des erreurs (Sentry)
- Journalisation (stack ELK)
Stratégies de Mise à l'Échelle
Mise à l'Échelle Verticale
Base de Données Vectorielle
- Plus de CPU : Recherche plus rapide
- Plus de RAM : Index plus grands en mémoire
- GPU : Recherche de similarité accélérée
Serveurs API
- Plus de CPU : Gérer plus de requêtes concurrentes
- Plus de RAM : Caches plus grands
Limites :
- Maximum d'une seule machine
- Coûteux à haut niveau
- Pas de redondance
Mise à l'Échelle Horizontale
Équilibrage de Charge
DEVELOPERpython# Mehrere API-Server hinter dem Load Balancer # NGINX-Konfiguration upstream rag_api { least_conn; # Zum am wenigsten ausgelasteten Server routen server api1.example.com:8000; server api2.example.com:8000; server api3.example.com:8000; } server { listen 80; location / { proxy_pass http://rag_api; proxy_set_header Host $host; } }
Sharding de la Base de Données Vectorielle
DEVELOPERpython# Shard nach Dokument-ID def get_shard(doc_id, num_shards=4): return hash(doc_id) % num_shards # Alle Shards abfragen, Ergebnisse zusammenführen async def distributed_search(query_vector, k=5): tasks = [ search_shard(shard_id, query_vector, k=k) for shard_id in range(num_shards) ] all_results = await asyncio.gather(*tasks) # Zusammenführen und neu sortieren merged = merge_results(all_results) return merged[:k]
Réplicas de Lecture
DEVELOPERpython# Auf dem Primärknoten schreiben, von Replikas lesen class VectorDBCluster: def __init__(self, primary, replicas): self.primary = primary self.replicas = replicas self.replica_index = 0 def write(self, vector): # Alle Schreibvorgänge gehen an den Primärknoten self.primary.upsert(vector) def search(self, query_vector, k=5): # Vom Replica lesen (Round-Robin) replica = self.replicas[self.replica_index] self.replica_index = (self.replica_index + 1) % len(self.replicas) return replica.search(query_vector, k=k)
Auto-Scaling
DEVELOPERyaml# Kubernetes HPA (Horizontal Pod Autoscaler) apiVersion: autoscaling/v2 kind: HorizontalPodAutoscaler metadata: name: rag-api-hpa spec: scaleTargetRef: apiVersion: apps/v1 kind: Deployment name: rag-api minReplicas: 2 maxReplicas: 10 metrics: - type: Resource resource: name: cpu target: type: Utilization averageUtilization: 70 - type: Resource resource: name: memory target: type: Utilization averageUtilization: 80
Mise en Cache
Cache de Requêtes
DEVELOPERpythonimport redis import hashlib import json class QueryCache: def __init__(self, redis_client, ttl=3600): self.redis = redis_client self.ttl = ttl def get_cache_key(self, query, k): # Die Anfrage für den Cache-Schlüssel hashen query_hash = hashlib.md5(f"{query}:{k}".encode()).hexdigest() return f"rag:query:{query_hash}" def get(self, query, k): key = self.get_cache_key(query, k) cached = self.redis.get(key) if cached: return json.loads(cached) return None def set(self, query, k, result): key = self.get_cache_key(query, k) self.redis.setex(key, self.ttl, json.dumps(result)) # Verwendung cache = QueryCache(redis.Redis(host='localhost')) def rag_query(query, k=5): # Cache prüfen cached = cache.get(query, k) if cached: return cached # RAG-Pipeline ausführen result = execute_rag_pipeline(query, k) # Ergebnis cachen cache.set(query, k, result) return result
Cache d'Embeddings
DEVELOPERpythonclass EmbeddingCache: def __init__(self, redis_client): self.redis = redis_client def get_embedding(self, text): key = f"emb:{hashlib.md5(text.encode()).hexdigest()}" # Cache prüfen cached = self.redis.get(key) if cached: return np.frombuffer(cached, dtype=np.float32) # Embedding erzeugen embedding = embed_model.encode(text) # Cachen (kein TTL - Embeddings ändern sich nicht) self.redis.set(key, embedding.tobytes()) return embedding
Invalidation du Cache
DEVELOPERpythondef update_document(doc_id, new_content): # Vector-Datenbank aktualisieren embedding = embed(new_content) vector_db.upsert(doc_id, embedding, metadata={'content': new_content}) # Zugehörige Cache-Einträge invalidieren invalidate_cache_for_document(doc_id) def invalidate_cache_for_document(doc_id): # Alle gecachten Anfragen finden, die dieses Dokument enthalten haben pattern = f"rag:query:*" for key in redis.scan_iter(match=pattern): cached_result = json.loads(redis.get(key)) # Wenn dieses Dokument in den Ergebnissen war, invalidieren if any(doc['id'] == doc_id for doc in cached_result.get('documents', [])): redis.delete(key)
Gestion des Erreurs
Dégradation Gracieuse
DEVELOPERpythonclass RobustRAG: def __init__(self, primary_llm, fallback_llm, vector_db): self.primary_llm = primary_llm self.fallback_llm = fallback_llm self.vector_db = vector_db async def query(self, user_query, k=5): try: # Primäre Retrieval versuchen contexts = await self.vector_db.search(user_query, k=k) except Exception as e: logger.error(f"Vector DB error: {e}") # Fallback: leere Kontexte zurückgeben contexts = [] # Oder: eine keyword-basierte Fallback-Suche verwenden # contexts = await self.keyword_search_fallback(user_query) try: # Primäres LLM versuchen answer = await self.primary_llm.generate( query=user_query, contexts=contexts ) except Exception as e: logger.error(f"Primary LLM error: {e}") try: # Auf das sekundäre LLM umschalten answer = await self.fallback_llm.generate( query=user_query, contexts=contexts ) except Exception as e2: logger.error(f"Fallback LLM error: {e2}") # Letzter Fallback answer = "I'm experiencing technical difficulties. Please try again later." return { 'answer': answer, 'contexts': contexts, 'fallback_used': False # Zur Überwachung nachverfolgen }
Logique de Retry
DEVELOPERpythonfrom tenacity import retry, stop_after_attempt, wait_exponential @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10), reraise=True ) async def resilient_llm_call(llm, prompt): """ Retry mit exponentiellem Backoff - Versuch 1: sofort - Versuch 2: 1s warten - Versuch 3: 2s warten """ return await llm.generate(prompt)
Circuit Breaker
DEVELOPERpythonclass CircuitBreaker: def __init__(self, failure_threshold=5, timeout=60): self.failure_count = 0 self.failure_threshold = failure_threshold self.timeout = timeout self.state = 'CLOSED' # CLOSED, OPEN, HALF_OPEN self.last_failure_time = None async def call(self, func, *args, **kwargs): if self.state == 'OPEN': if time.time() - self.last_failure_time > self.timeout: self.state = 'HALF_OPEN' else: raise Exception("Circuit breaker is OPEN") try: result = await func(*args, **kwargs) # Erfolg: zurücksetzen if self.state == 'HALF_OPEN': self.state = 'CLOSED' self.failure_count = 0 return result except Exception as e: self.failure_count += 1 self.last_failure_time = time.time() if self.failure_count >= self.failure_threshold: self.state = 'OPEN' raise e # Verwendung llm_breaker = CircuitBreaker(failure_threshold=5, timeout=60) async def safe_llm_call(prompt): return await llm_breaker.call(llm.generate, prompt)
Surveillance
Métriques
DEVELOPERpythonfrom prometheus_client import Counter, Histogram, Gauge # Anfrage-Metriken query_counter = Counter('rag_queries_total', 'Total RAG queries') query_duration = Histogram('rag_query_duration_seconds', 'Query duration') error_counter = Counter('rag_errors_total', 'Total errors', ['type']) # Komponenten-Metriken retrieval_latency = Histogram('rag_retrieval_latency_seconds', 'Retrieval latency') llm_latency = Histogram('rag_llm_latency_seconds', 'LLM latency') # Qualitätsmetriken avg_precision = Gauge('rag_precision_at_5', 'Average precision@5') thumbs_up_rate = Gauge('rag_thumbs_up_rate', 'Thumbs up rate') # Verwendung @query_duration.time() async def handle_query(query): query_counter.inc() try: # Retrieval with retrieval_latency.time(): contexts = await retrieve(query) # Generierung with llm_latency.time(): answer = await generate(query, contexts) return answer except Exception as e: error_counter.labels(type=type(e).__name__).inc() raise
Journalisation
DEVELOPERpythonimport logging import json # Strukturierte Protokollierung class JSONFormatter(logging.Formatter): def format(self, record): log_data = { 'timestamp': self.formatTime(record), 'level': record.levelname, 'message': record.getMessage(), 'module': record.module, } if hasattr(record, 'query'): log_data['query'] = record.query if hasattr(record, 'latency'): log_data['latency'] = record.latency return json.dumps(log_data) # Logger konfigurieren logger = logging.getLogger('rag') handler = logging.StreamHandler() handler.setFormatter(JSONFormatter()) logger.addHandler(handler) logger.setLevel(logging.INFO) # Verwendung logger.info('Query processed', extra={ 'query': user_query, 'latency': latency_ms, 'num_contexts': len(contexts) })
Alertes
DEVELOPERyaml# Prometheus-Alertregeln groups: - name: rag_alerts rules: - alert: HighErrorRate expr: rate(rag_errors_total[5m]) > 0.1 for: 5m annotations: summary: "High error rate in RAG system" description: "Error rate is {{ $value }} errors/sec" - alert: HighLatency expr: histogram_quantile(0.95, rate(rag_query_duration_seconds_bucket[5m])) > 5 for: 10m annotations: summary: "High query latency" description: "p95 latency is {{ $value }}s" - alert: LowPrecision expr: rag_precision_at_5 < 0.6 for: 30m annotations: summary: "Retrieval precision degraded" description: "Precision@5 is {{ $value }}"
Pipeline de Données
Ingestion de Documents
DEVELOPERpythonclass DocumentIngestionPipeline: def __init__(self, vector_db, embedding_service): self.vector_db = vector_db self.embedding_service = embedding_service async def ingest_document(self, document): try: # 1. Text extrahieren text = extract_text(document) # 2. Zerteilen chunks = chunk_document(text, chunk_size=512, overlap=50) # 3. Embeddings erzeugen (Batch) embeddings = await self.embedding_service.embed_batch(chunks) # 4. In die Vector-Datenbank hochladen (Batch) await self.vector_db.upsert_batch( ids=[f"{document.id}_{i}" for i in range(len(chunks))], embeddings=embeddings, metadatas=[{ 'doc_id': document.id, 'chunk_index': i, 'content': chunk } for i, chunk in enumerate(chunks)] ) logger.info(f"Ingested document {document.id}: {len(chunks)} chunks") except Exception as e: logger.error(f"Failed to ingest document {document.id}: {e}") raise # Hintergrund-Job async def batch_ingestion(): pipeline = DocumentIngestionPipeline(vector_db, embedding_service) # Neue Dokumente holen new_docs = await get_new_documents() # Parallel verarbeiten tasks = [pipeline.ingest_document(doc) for doc in new_docs] await asyncio.gather(*tasks, return_exceptions=True)
Mises à Jour Incrémentales
DEVELOPERpythonasync def update_document(doc_id, new_content): # Alte Chunks löschen await vector_db.delete(filter={'doc_id': doc_id}) # Neue Version ingestieren await ingestion_pipeline.ingest_document({ 'id': doc_id, 'content': new_content }) # Caches invalidieren invalidate_cache_for_document(doc_id)
Optimisation des Coûts
Embeddings
DEVELOPERpython# Embeddings batchweise für besseren Durchsatz/Kosten async def cost_optimized_embedding(texts, batch_size=100): embeddings = [] for i in range(0, len(texts), batch_size): batch = texts[i:i+batch_size] # Einziger API-Aufruf für das Batch batch_embeddings = await embedding_api.embed_batch(batch) embeddings.extend(batch_embeddings) # Rate-Limiting await asyncio.sleep(0.1) return embeddings
Appels LLM
DEVELOPERpython# Token-Nutzung reduzieren def optimize_context(query, chunks, max_tokens=2000): """ So viel relevanten Kontext wie möglich in das Token-Budget einpassen """ selected_chunks = [] total_tokens = 0 for chunk in chunks: chunk_tokens = count_tokens(chunk) if total_tokens + chunk_tokens <= max_tokens: selected_chunks.append(chunk) total_tokens += chunk_tokens else: break return selected_chunks
Stratégie de Cache
- Mettre en cache les embeddings indéfiniment (déterministe)
- Mettre en cache les requêtes fréquentes (TTL 1 heure)
- Mettre en cache les patterns de requêtes populaires (TTL 24 heures)
- Invalider lors des mises à jour de contenu
Sécurité
Authentification API
DEVELOPERpythonfrom fastapi import Depends, HTTPException, Security from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials security = HTTPBearer() async def verify_token(credentials: HTTPAuthorizationCredentials = Security(security)): token = credentials.credentials # JWT prüfen try: payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"]) return payload except jwt.ExpiredSignatureError: raise HTTPException(status_code=401, detail="Token expired") except jwt.InvalidTokenError: raise HTTPException(status_code=401, detail="Invalid token") @app.post("/query") async def query(request: QueryRequest, user = Depends(verify_token)): # Traiter la requête return await rag_system.query(request.query)
Limitation de Débit
DEVELOPERpythonfrom slowapi import Limiter from slowapi.util import get_remote_address limiter = Limiter(key_func=get_remote_address) @app.post("/query") @limiter.limit("100/hour") async def query(request: QueryRequest): return await rag_system.query(request.query)
Sanitisation des Entrées
DEVELOPERpythondef sanitize_query(query: str) -> str: # Länge begrenzen max_length = 1000 query = query[:max_length] # Potenziell gefährliche Zeichen entfernen query = re.sub(r'[^\w\s\?.,!-]', '', query) # Prompt-Injection verhindern injection_patterns = [ r'ignore previous instructions', r'disregard above', r'system:', ] for pattern in injection_patterns: if re.search(pattern, query, re.IGNORECASE): raise ValueError("Potentially harmful query detected") return query
Checklist de Déploiement
- Base de données vectorielle : Répliquée, sauvegardée
- API : Auto-scaling, health checks
- Mise en cache : Cluster Redis, politique d'éviction
- Surveillance : Métriques, logs, alertes
- Gestion des erreurs : Retries, secours, circuit breakers
- Sécurité : Authentification, limitation de débit, validation des entrées
- Documentation : Docs API, runbooks
- Tests : Tests de charge, tests de chaos
- Plan de rollback : Déploiement blue-green
- Astreinte : Runbooks, procédures d'escalade
Prochaines Étapes
Die Optimierung der Anfrageverarbeitung und das effiziente Management von Kontextfenstern sind entscheidend für Kosten und Qualität. Die folgenden Leitfäden behandeln Techniken zur Anfrageoptimierung und Strategien zur Verwaltung von Kontextfenstern.
Tags
Verwandte Artikel
Überwachung und Observability von RAG-Systemen
Überwachen Sie RAG-Systeme im Produktivbetrieb: verfolgen Sie Latenz, Kosten, Genauigkeit und Benutzerzufriedenheit mit Metriken und Dashboards.
RAG as a Service: Der umfassende Leitfaden zu RAG-Plattformen in der Produktion
Erfahren Sie, was RAG as a Service (RAG-as-a-Service) ist, warum es die schnellste Lösung ist, um RAG-Anwendungen produktiv einzusetzen, und wie Sie die passende Plattform auswählen.
Guardrails für RAG: Ihre KI-Assistenten absichern
Implementieren Sie robuste Guardrails, um gefährliche, themenfremde oder unangemessene Antworten in Ihren produktiven RAG-Systemen zu vermeiden.