AnleitungExperte

Bereitstellung von RAG-Systemen in der Produktion

20. Februar 2025
14 Minuten Lesezeit
Équipe de Recherche Ailog

RAG für den Produktionseinsatz: Architektur, Skalierung, Überwachung, Fehlerbehandlung und betriebliche Best Practices für zuverlässige Bereitstellungen.

Production vs Prototype

AspectPrototypeProduction
DisponibilitéMeilleur effortSLA 99.9%+
LatenceVariablep95 < 2s
Gestion des erreursPlantagesDégradation gracieuse
SurveillanceAucuneComplète
CoûtPeu importeOptimisé
ÉchelleUtilisateur uniqueMilliers concurrents
DonnéesÉchantillonCorpus complet

Architecture de Production

Architecture de Base

┌─────────┐     ┌──────────┐     ┌──────────┐     ┌─────┐
│ Requête │────>│ API      │────>│ Base de  │────>│ LLM │
│ Util.   │     │ Gateway  │     │ Données  │     │     │
└─────────┘     └──────────┘     └──────────┘     └─────┘
                     │                 │              │
                     v                 v              v
                ┌──────────┐     ┌──────────┐   ┌──────────┐
                │ Couche   │     │ Service  │   │ Surveillance│
                │ Cache    │     │ Embedding│   │          │
                └──────────┘     └──────────┘   └──────────┘

Composants

API Gateway

  • Limitation de débit
  • Authentification
  • Routage des requêtes
  • Équilibrage de charge

Base de Données Vectorielle

  • Gérée (Pinecone) ou auto-hébergée (Qdrant)
  • Répliquée pour HA
  • Sauvegardes configurées

Service d'Embedding

  • Service séparé pour les embeddings
  • Traitement par lots
  • Mise en cache du modèle

Service LLM

  • Fournisseur API (OpenAI) ou auto-hébergé
  • Fournisseurs de secours
  • Streaming des réponses

Couche de Cache

  • Redis ou Memcached
  • Cache des requêtes fréquentes
  • Cache des embeddings

Surveillance

  • Prometheus + Grafana
  • Suivi des erreurs (Sentry)
  • Journalisation (stack ELK)

Stratégies de Mise à l'Échelle

Mise à l'Échelle Verticale

Base de Données Vectorielle

  • Plus de CPU : Recherche plus rapide
  • Plus de RAM : Index plus grands en mémoire
  • GPU : Recherche de similarité accélérée

Serveurs API

  • Plus de CPU : Gérer plus de requêtes concurrentes
  • Plus de RAM : Caches plus grands

Limites :

  • Maximum d'une seule machine
  • Coûteux à haut niveau
  • Pas de redondance

Mise à l'Échelle Horizontale

Équilibrage de Charge

DEVELOPERpython
# Mehrere API-Server hinter dem Load Balancer # NGINX-Konfiguration upstream rag_api { least_conn; # Zum am wenigsten ausgelasteten Server routen server api1.example.com:8000; server api2.example.com:8000; server api3.example.com:8000; } server { listen 80; location / { proxy_pass http://rag_api; proxy_set_header Host $host; } }

Sharding de la Base de Données Vectorielle

DEVELOPERpython
# Shard nach Dokument-ID def get_shard(doc_id, num_shards=4): return hash(doc_id) % num_shards # Alle Shards abfragen, Ergebnisse zusammenführen async def distributed_search(query_vector, k=5): tasks = [ search_shard(shard_id, query_vector, k=k) for shard_id in range(num_shards) ] all_results = await asyncio.gather(*tasks) # Zusammenführen und neu sortieren merged = merge_results(all_results) return merged[:k]

Réplicas de Lecture

DEVELOPERpython
# Auf dem Primärknoten schreiben, von Replikas lesen class VectorDBCluster: def __init__(self, primary, replicas): self.primary = primary self.replicas = replicas self.replica_index = 0 def write(self, vector): # Alle Schreibvorgänge gehen an den Primärknoten self.primary.upsert(vector) def search(self, query_vector, k=5): # Vom Replica lesen (Round-Robin) replica = self.replicas[self.replica_index] self.replica_index = (self.replica_index + 1) % len(self.replicas) return replica.search(query_vector, k=k)

Auto-Scaling

DEVELOPERyaml
# Kubernetes HPA (Horizontal Pod Autoscaler) apiVersion: autoscaling/v2 kind: HorizontalPodAutoscaler metadata: name: rag-api-hpa spec: scaleTargetRef: apiVersion: apps/v1 kind: Deployment name: rag-api minReplicas: 2 maxReplicas: 10 metrics: - type: Resource resource: name: cpu target: type: Utilization averageUtilization: 70 - type: Resource resource: name: memory target: type: Utilization averageUtilization: 80

Mise en Cache

Cache de Requêtes

DEVELOPERpython
import redis import hashlib import json class QueryCache: def __init__(self, redis_client, ttl=3600): self.redis = redis_client self.ttl = ttl def get_cache_key(self, query, k): # Die Anfrage für den Cache-Schlüssel hashen query_hash = hashlib.md5(f"{query}:{k}".encode()).hexdigest() return f"rag:query:{query_hash}" def get(self, query, k): key = self.get_cache_key(query, k) cached = self.redis.get(key) if cached: return json.loads(cached) return None def set(self, query, k, result): key = self.get_cache_key(query, k) self.redis.setex(key, self.ttl, json.dumps(result)) # Verwendung cache = QueryCache(redis.Redis(host='localhost')) def rag_query(query, k=5): # Cache prüfen cached = cache.get(query, k) if cached: return cached # RAG-Pipeline ausführen result = execute_rag_pipeline(query, k) # Ergebnis cachen cache.set(query, k, result) return result

Cache d'Embeddings

DEVELOPERpython
class EmbeddingCache: def __init__(self, redis_client): self.redis = redis_client def get_embedding(self, text): key = f"emb:{hashlib.md5(text.encode()).hexdigest()}" # Cache prüfen cached = self.redis.get(key) if cached: return np.frombuffer(cached, dtype=np.float32) # Embedding erzeugen embedding = embed_model.encode(text) # Cachen (kein TTL - Embeddings ändern sich nicht) self.redis.set(key, embedding.tobytes()) return embedding

Invalidation du Cache

DEVELOPERpython
def update_document(doc_id, new_content): # Vector-Datenbank aktualisieren embedding = embed(new_content) vector_db.upsert(doc_id, embedding, metadata={'content': new_content}) # Zugehörige Cache-Einträge invalidieren invalidate_cache_for_document(doc_id) def invalidate_cache_for_document(doc_id): # Alle gecachten Anfragen finden, die dieses Dokument enthalten haben pattern = f"rag:query:*" for key in redis.scan_iter(match=pattern): cached_result = json.loads(redis.get(key)) # Wenn dieses Dokument in den Ergebnissen war, invalidieren if any(doc['id'] == doc_id for doc in cached_result.get('documents', [])): redis.delete(key)

Gestion des Erreurs

Dégradation Gracieuse

DEVELOPERpython
class RobustRAG: def __init__(self, primary_llm, fallback_llm, vector_db): self.primary_llm = primary_llm self.fallback_llm = fallback_llm self.vector_db = vector_db async def query(self, user_query, k=5): try: # Primäre Retrieval versuchen contexts = await self.vector_db.search(user_query, k=k) except Exception as e: logger.error(f"Vector DB error: {e}") # Fallback: leere Kontexte zurückgeben contexts = [] # Oder: eine keyword-basierte Fallback-Suche verwenden # contexts = await self.keyword_search_fallback(user_query) try: # Primäres LLM versuchen answer = await self.primary_llm.generate( query=user_query, contexts=contexts ) except Exception as e: logger.error(f"Primary LLM error: {e}") try: # Auf das sekundäre LLM umschalten answer = await self.fallback_llm.generate( query=user_query, contexts=contexts ) except Exception as e2: logger.error(f"Fallback LLM error: {e2}") # Letzter Fallback answer = "I'm experiencing technical difficulties. Please try again later." return { 'answer': answer, 'contexts': contexts, 'fallback_used': False # Zur Überwachung nachverfolgen }

Logique de Retry

DEVELOPERpython
from tenacity import retry, stop_after_attempt, wait_exponential @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10), reraise=True ) async def resilient_llm_call(llm, prompt): """ Retry mit exponentiellem Backoff - Versuch 1: sofort - Versuch 2: 1s warten - Versuch 3: 2s warten """ return await llm.generate(prompt)

Circuit Breaker

DEVELOPERpython
class CircuitBreaker: def __init__(self, failure_threshold=5, timeout=60): self.failure_count = 0 self.failure_threshold = failure_threshold self.timeout = timeout self.state = 'CLOSED' # CLOSED, OPEN, HALF_OPEN self.last_failure_time = None async def call(self, func, *args, **kwargs): if self.state == 'OPEN': if time.time() - self.last_failure_time > self.timeout: self.state = 'HALF_OPEN' else: raise Exception("Circuit breaker is OPEN") try: result = await func(*args, **kwargs) # Erfolg: zurücksetzen if self.state == 'HALF_OPEN': self.state = 'CLOSED' self.failure_count = 0 return result except Exception as e: self.failure_count += 1 self.last_failure_time = time.time() if self.failure_count >= self.failure_threshold: self.state = 'OPEN' raise e # Verwendung llm_breaker = CircuitBreaker(failure_threshold=5, timeout=60) async def safe_llm_call(prompt): return await llm_breaker.call(llm.generate, prompt)

Surveillance

Métriques

DEVELOPERpython
from prometheus_client import Counter, Histogram, Gauge # Anfrage-Metriken query_counter = Counter('rag_queries_total', 'Total RAG queries') query_duration = Histogram('rag_query_duration_seconds', 'Query duration') error_counter = Counter('rag_errors_total', 'Total errors', ['type']) # Komponenten-Metriken retrieval_latency = Histogram('rag_retrieval_latency_seconds', 'Retrieval latency') llm_latency = Histogram('rag_llm_latency_seconds', 'LLM latency') # Qualitätsmetriken avg_precision = Gauge('rag_precision_at_5', 'Average precision@5') thumbs_up_rate = Gauge('rag_thumbs_up_rate', 'Thumbs up rate') # Verwendung @query_duration.time() async def handle_query(query): query_counter.inc() try: # Retrieval with retrieval_latency.time(): contexts = await retrieve(query) # Generierung with llm_latency.time(): answer = await generate(query, contexts) return answer except Exception as e: error_counter.labels(type=type(e).__name__).inc() raise

Journalisation

DEVELOPERpython
import logging import json # Strukturierte Protokollierung class JSONFormatter(logging.Formatter): def format(self, record): log_data = { 'timestamp': self.formatTime(record), 'level': record.levelname, 'message': record.getMessage(), 'module': record.module, } if hasattr(record, 'query'): log_data['query'] = record.query if hasattr(record, 'latency'): log_data['latency'] = record.latency return json.dumps(log_data) # Logger konfigurieren logger = logging.getLogger('rag') handler = logging.StreamHandler() handler.setFormatter(JSONFormatter()) logger.addHandler(handler) logger.setLevel(logging.INFO) # Verwendung logger.info('Query processed', extra={ 'query': user_query, 'latency': latency_ms, 'num_contexts': len(contexts) })

Alertes

DEVELOPERyaml
# Prometheus-Alertregeln groups: - name: rag_alerts rules: - alert: HighErrorRate expr: rate(rag_errors_total[5m]) > 0.1 for: 5m annotations: summary: "High error rate in RAG system" description: "Error rate is {{ $value }} errors/sec" - alert: HighLatency expr: histogram_quantile(0.95, rate(rag_query_duration_seconds_bucket[5m])) > 5 for: 10m annotations: summary: "High query latency" description: "p95 latency is {{ $value }}s" - alert: LowPrecision expr: rag_precision_at_5 < 0.6 for: 30m annotations: summary: "Retrieval precision degraded" description: "Precision@5 is {{ $value }}"

Pipeline de Données

Ingestion de Documents

DEVELOPERpython
class DocumentIngestionPipeline: def __init__(self, vector_db, embedding_service): self.vector_db = vector_db self.embedding_service = embedding_service async def ingest_document(self, document): try: # 1. Text extrahieren text = extract_text(document) # 2. Zerteilen chunks = chunk_document(text, chunk_size=512, overlap=50) # 3. Embeddings erzeugen (Batch) embeddings = await self.embedding_service.embed_batch(chunks) # 4. In die Vector-Datenbank hochladen (Batch) await self.vector_db.upsert_batch( ids=[f"{document.id}_{i}" for i in range(len(chunks))], embeddings=embeddings, metadatas=[{ 'doc_id': document.id, 'chunk_index': i, 'content': chunk } for i, chunk in enumerate(chunks)] ) logger.info(f"Ingested document {document.id}: {len(chunks)} chunks") except Exception as e: logger.error(f"Failed to ingest document {document.id}: {e}") raise # Hintergrund-Job async def batch_ingestion(): pipeline = DocumentIngestionPipeline(vector_db, embedding_service) # Neue Dokumente holen new_docs = await get_new_documents() # Parallel verarbeiten tasks = [pipeline.ingest_document(doc) for doc in new_docs] await asyncio.gather(*tasks, return_exceptions=True)

Mises à Jour Incrémentales

DEVELOPERpython
async def update_document(doc_id, new_content): # Alte Chunks löschen await vector_db.delete(filter={'doc_id': doc_id}) # Neue Version ingestieren await ingestion_pipeline.ingest_document({ 'id': doc_id, 'content': new_content }) # Caches invalidieren invalidate_cache_for_document(doc_id)

Optimisation des Coûts

Embeddings

DEVELOPERpython
# Embeddings batchweise für besseren Durchsatz/Kosten async def cost_optimized_embedding(texts, batch_size=100): embeddings = [] for i in range(0, len(texts), batch_size): batch = texts[i:i+batch_size] # Einziger API-Aufruf für das Batch batch_embeddings = await embedding_api.embed_batch(batch) embeddings.extend(batch_embeddings) # Rate-Limiting await asyncio.sleep(0.1) return embeddings

Appels LLM

DEVELOPERpython
# Token-Nutzung reduzieren def optimize_context(query, chunks, max_tokens=2000): """ So viel relevanten Kontext wie möglich in das Token-Budget einpassen """ selected_chunks = [] total_tokens = 0 for chunk in chunks: chunk_tokens = count_tokens(chunk) if total_tokens + chunk_tokens <= max_tokens: selected_chunks.append(chunk) total_tokens += chunk_tokens else: break return selected_chunks

Stratégie de Cache

  • Mettre en cache les embeddings indéfiniment (déterministe)
  • Mettre en cache les requêtes fréquentes (TTL 1 heure)
  • Mettre en cache les patterns de requêtes populaires (TTL 24 heures)
  • Invalider lors des mises à jour de contenu

Sécurité

Authentification API

DEVELOPERpython
from fastapi import Depends, HTTPException, Security from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials security = HTTPBearer() async def verify_token(credentials: HTTPAuthorizationCredentials = Security(security)): token = credentials.credentials # JWT prüfen try: payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"]) return payload except jwt.ExpiredSignatureError: raise HTTPException(status_code=401, detail="Token expired") except jwt.InvalidTokenError: raise HTTPException(status_code=401, detail="Invalid token") @app.post("/query") async def query(request: QueryRequest, user = Depends(verify_token)): # Traiter la requête return await rag_system.query(request.query)

Limitation de Débit

DEVELOPERpython
from slowapi import Limiter from slowapi.util import get_remote_address limiter = Limiter(key_func=get_remote_address) @app.post("/query") @limiter.limit("100/hour") async def query(request: QueryRequest): return await rag_system.query(request.query)

Sanitisation des Entrées

DEVELOPERpython
def sanitize_query(query: str) -> str: # Länge begrenzen max_length = 1000 query = query[:max_length] # Potenziell gefährliche Zeichen entfernen query = re.sub(r'[^\w\s\?.,!-]', '', query) # Prompt-Injection verhindern injection_patterns = [ r'ignore previous instructions', r'disregard above', r'system:', ] for pattern in injection_patterns: if re.search(pattern, query, re.IGNORECASE): raise ValueError("Potentially harmful query detected") return query

Checklist de Déploiement

  • Base de données vectorielle : Répliquée, sauvegardée
  • API : Auto-scaling, health checks
  • Mise en cache : Cluster Redis, politique d'éviction
  • Surveillance : Métriques, logs, alertes
  • Gestion des erreurs : Retries, secours, circuit breakers
  • Sécurité : Authentification, limitation de débit, validation des entrées
  • Documentation : Docs API, runbooks
  • Tests : Tests de charge, tests de chaos
  • Plan de rollback : Déploiement blue-green
  • Astreinte : Runbooks, procédures d'escalade

Prochaines Étapes

Die Optimierung der Anfrageverarbeitung und das effiziente Management von Kontextfenstern sind entscheidend für Kosten und Qualität. Die folgenden Leitfäden behandeln Techniken zur Anfrageoptimierung und Strategien zur Verwaltung von Kontextfenstern.

Tags

productiondéploiementmise-à-échellesurveillance

Verwandte Artikel

Ailog Assistant

Ici pour vous aider

Salut ! Pose-moi des questions sur Ailog et comment intégrer votre RAG dans vos projets !