Surveillance et Observabilité des Systèmes RAG

Surveillez les systèmes RAG en production : suivez la latence, les coûts, la précision et la satisfaction utilisateur avec des métriques et tableaux de bord.

Auteur
Équipe de Recherche Ailog
Date de publication
Temps de lecture
12 min de lecture
Niveau
advanced
Étape du pipeline RAG
Optimization

Métriques Clés à Suivre

Performance : • Latence (p50, p95, p99) • Débit (requêtes/seconde) • Taux d'erreur

Qualité : • Précision de la récupération • Qualité de la réponse • Feedback utilisateur

Coût : • Coûts API par requête • Coûts de stockage • Coûts de calcul

Configuration Prometheus + Grafana

``yaml docker-compose.yml version: '3.8' services: prometheus: image: prom/prometheus:latest ports: • "9090:9090" volumes: • ./prometheus.yml:/etc/prometheus/prometheus.yml

grafana: image: grafana/grafana:latest ports: • "3000:3000" environment: • GF_SECURITY_ADMIN_PASSWORD=admin `

Instrumentation du Pipeline RAG

`python from prometheus_client import Counter, Histogram, Gauge, start_http_server import time

Metrics query_counter = Counter('rag_queries_total', 'Total RAG queries') latency_histogram = Histogram('rag_latency_seconds', 'RAG latency', buckets=[0.1, 0.5, 1, 2, 5]) error_counter = Counter('rag_errors_total', 'Total RAG errors', ['error_type']) cost_counter = Counter('rag_cost_usd', 'Total costs', ['component']) relevance_gauge = Gauge('rag_relevance_score', 'Average relevance score')

def instrumented_rag(query): query_counter.inc() start = time.time()

try: Embedding emb_start = time.time() embedding = embed(query) emb_time = time.time() - emb_start cost_counter.labels(component='embedding').inc(0.001)

Retrieval ret_start = time.time() docs = vector_db.search(embedding, limit=5) ret_time = time.time() - ret_start

LLM llm_start = time.time() response = llm_call(query, docs) llm_time = time.time() - llm_start cost_counter.labels(component='llm').inc(0.01)

Track latency total_time = time.time() - start latency_histogram.observe(total_time)

Log breakdown logger.info(f"Latency breakdown: emb={emb_time:.3f}s, ret={ret_time:.3f}s, llm={llm_time:.3f}s")

return response

except Exception as e: error_counter.labels(error_type=type(e).__name__).inc() raise

Start metrics server start_http_server(8000) `

Suivi de la Qualité de Récupération

`python from sklearn.metrics import ndcg_score import numpy as np

def track_retrieval_quality(query, retrieved_docs, ground_truth_docs): Calculate nDCG relevance_scores = [] for doc in retrieved_docs: if doc.id in ground_truth_docs: relevance_scores.append(1) else: relevance_scores.append(0)

ndcg = ndcg_score([relevance_scores], [range(len(relevance_scores))])

Update metric relevance_gauge.set(ndcg)

Log to database db.insert({ "timestamp": time.time(), "query": query, "ndcg": ndcg, "retrieved_ids": [doc.id for doc in retrieved_docs] }) `

Collecte de Feedback Utilisateur

`python from fastapi import FastAPI from pydantic import BaseModel

app = FastAPI()

class Feedback(BaseModel): query_id: str rating: int 1-5 helpful: bool comment: str | None

feedback_counter = Counter( 'rag_feedback_total', 'User feedback', ['rating'] )

@app.post("/feedback") def collect_feedback(feedback: Feedback): Track in Prometheus feedback_counter.labels(rating=str(feedback.rating)).inc()

Store in database db.insert({ "query_id": feedback.query_id, "rating": feedback.rating, "helpful": feedback.helpful, "comment": feedback.comment, "timestamp": time.time() })

return {"status": "ok"} `

Traçage Distribué avec OpenTelemetry

`python from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.exporter.jaeger.thrift import JaegerExporter

Setup tracing trace.set_tracer_provider(TracerProvider()) jaeger_exporter = JaegerExporter( agent_host_name="localhost", agent_port=6831, ) trace.get_tracer_provider().add_span_processor( BatchSpanProcessor(jaeger_exporter) )

tracer = trace.get_tracer(__name__)

def traced_rag(query): with tracer.start_as_current_span("rag_pipeline") as span: span.set_attribute("query", query)

Embedding with tracer.start_as_current_span("embedding"): embedding = embed(query)

Retrieval with tracer.start_as_current_span("retrieval"): docs = vector_db.search(embedding, limit=5) span.set_attribute("num_docs_retrieved", len(docs))

Reranking with tracer.start_as_current_span("reranking"): reranked = rerank(query, docs)

LLM with tracer.start_as_current_span("llm_generation"): response = llm_call(query, reranked) span.set_attribute("response_length", len(response))

return response `

Bonnes Pratiques de Logging

`python import structlog import json

logger = structlog.get_logger()

def logged_rag(query, user_id): logger.info( "rag_query_started", query=query, user_id=user_id, timestamp=time.time() )

try: Retrieval docs = retrieve(query) logger.info( "documents_retrieved", num_docs=len(docs), doc_ids=[d.id for d in docs], scores=[d.score for d in docs] )

LLM response = llm_call(query, docs) logger.info( "rag_query_completed", response_length=len(response), tokens_used=estimate_tokens(query, docs, response) )

return response

except Exception as e: logger.error( "rag_query_failed", error=str(e), error_type=type(e).__name__ ) raise `

Règles d'Alertes

`yaml prometheus_alerts.yml groups: • name: rag_alerts interval: 30s rules: High latency • alert: HighRAGLatency expr: histogram_quantile(0.95, rag_latency_seconds) > 2 for: 5m annotations: summary: "RAG p95 latency > 2s"

High error rate • alert: HighErrorRate expr: rate(rag_errors_total[5m]) > 0.05 for: 2m annotations: summary: "RAG error rate > 5%"

Low relevance • alert: LowRelevance expr: rag_relevance_score < 0.7 for: 10m annotations: summary: "RAG relevance score < 0.7"

High costs • alert: HighDailyCost expr: increase(rag_cost_usd[24h]) > 100 annotations: summary: "RAG costs > $100/day" `

Requêtes Dashboard (Grafana)

`promql Latence moyenne au fil du temps rate(rag_latency_seconds_sum[5m]) / rate(rag_latency_seconds_count[5m])

Débit de requêtes rate(rag_queries_total[1m])

Taux d'erreur rate(rag_errors_total[5m]) / rate(rag_queries_total[5m])

Coût par jour increase(rag_cost_usd[24h])

Satisfaction utilisateur avg(rag_feedback_total{rating="5"}) / sum(rag_feedback_total) `

Framework de Tests A/B

`python import random

def ab_test_rag(query, user_id): Assign user to variant variant = "A" if hash(user_id) % 2 == 0 else "B"

if variant == "A": Control: current RAG response = rag_pipeline_v1(query) else: Treatment: new RAG response = rag_pipeline_v2(query)

Track variant variant_counter.labels(variant=variant).inc()

Log for analysis db.insert({ "query": query, "user_id": user_id, "variant": variant, "timestamp": time.time() })

return response, variant ``

Le RAG en production nécessite une surveillance. Suivez tout, alertez sur les anomalies, itérez basé sur les données.

Tags

  • optimization
  • surveillance
  • observability
  • production
7. OptimizationAvancé

Surveillance et Observabilité des Systèmes RAG

11 novembre 2025
12 min de lecture
Équipe de Recherche Ailog

Surveillez les systèmes RAG en production : suivez la latence, les coûts, la précision et la satisfaction utilisateur avec des métriques et tableaux de bord.

Métriques Clés à Suivre

Performance :

  • Latence (p50, p95, p99)
  • Débit (requêtes/seconde)
  • Taux d'erreur

Qualité :

  • Précision de la récupération
  • Qualité de la réponse
  • Feedback utilisateur

Coût :

  • Coûts API par requête
  • Coûts de stockage
  • Coûts de calcul

Configuration Prometheus + Grafana

DEVELOPERyaml
# docker-compose.yml version: '3.8' services: prometheus: image: prom/prometheus:latest ports: - "9090:9090" volumes: - ./prometheus.yml:/etc/prometheus/prometheus.yml grafana: image: grafana/grafana:latest ports: - "3000:3000" environment: - GF_SECURITY_ADMIN_PASSWORD=admin

Instrumentation du Pipeline RAG

DEVELOPERpython
from prometheus_client import Counter, Histogram, Gauge, start_http_server import time # Metrics query_counter = Counter('rag_queries_total', 'Total RAG queries') latency_histogram = Histogram('rag_latency_seconds', 'RAG latency', buckets=[0.1, 0.5, 1, 2, 5]) error_counter = Counter('rag_errors_total', 'Total RAG errors', ['error_type']) cost_counter = Counter('rag_cost_usd', 'Total costs', ['component']) relevance_gauge = Gauge('rag_relevance_score', 'Average relevance score') def instrumented_rag(query): query_counter.inc() start = time.time() try: # Embedding emb_start = time.time() embedding = embed(query) emb_time = time.time() - emb_start cost_counter.labels(component='embedding').inc(0.001) # Retrieval ret_start = time.time() docs = vector_db.search(embedding, limit=5) ret_time = time.time() - ret_start # LLM llm_start = time.time() response = llm_call(query, docs) llm_time = time.time() - llm_start cost_counter.labels(component='llm').inc(0.01) # Track latency total_time = time.time() - start latency_histogram.observe(total_time) # Log breakdown logger.info(f"Latency breakdown: emb={emb_time:.3f}s, ret={ret_time:.3f}s, llm={llm_time:.3f}s") return response except Exception as e: error_counter.labels(error_type=type(e).__name__).inc() raise # Start metrics server start_http_server(8000)

Suivi de la Qualité de Récupération

DEVELOPERpython
from sklearn.metrics import ndcg_score import numpy as np def track_retrieval_quality(query, retrieved_docs, ground_truth_docs): # Calculate nDCG relevance_scores = [] for doc in retrieved_docs: if doc.id in ground_truth_docs: relevance_scores.append(1) else: relevance_scores.append(0) ndcg = ndcg_score([relevance_scores], [range(len(relevance_scores))]) # Update metric relevance_gauge.set(ndcg) # Log to database db.insert({ "timestamp": time.time(), "query": query, "ndcg": ndcg, "retrieved_ids": [doc.id for doc in retrieved_docs] })

Collecte de Feedback Utilisateur

DEVELOPERpython
from fastapi import FastAPI from pydantic import BaseModel app = FastAPI() class Feedback(BaseModel): query_id: str rating: int # 1-5 helpful: bool comment: str | None feedback_counter = Counter( 'rag_feedback_total', 'User feedback', ['rating'] ) @app.post("/feedback") def collect_feedback(feedback: Feedback): # Track in Prometheus feedback_counter.labels(rating=str(feedback.rating)).inc() # Store in database db.insert({ "query_id": feedback.query_id, "rating": feedback.rating, "helpful": feedback.helpful, "comment": feedback.comment, "timestamp": time.time() }) return {"status": "ok"}

Traçage Distribué avec OpenTelemetry

DEVELOPERpython
from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.exporter.jaeger.thrift import JaegerExporter # Setup tracing trace.set_tracer_provider(TracerProvider()) jaeger_exporter = JaegerExporter( agent_host_name="localhost", agent_port=6831, ) trace.get_tracer_provider().add_span_processor( BatchSpanProcessor(jaeger_exporter) ) tracer = trace.get_tracer(__name__) def traced_rag(query): with tracer.start_as_current_span("rag_pipeline") as span: span.set_attribute("query", query) # Embedding with tracer.start_as_current_span("embedding"): embedding = embed(query) # Retrieval with tracer.start_as_current_span("retrieval"): docs = vector_db.search(embedding, limit=5) span.set_attribute("num_docs_retrieved", len(docs)) # Reranking with tracer.start_as_current_span("reranking"): reranked = rerank(query, docs) # LLM with tracer.start_as_current_span("llm_generation"): response = llm_call(query, reranked) span.set_attribute("response_length", len(response)) return response

Bonnes Pratiques de Logging

DEVELOPERpython
import structlog import json logger = structlog.get_logger() def logged_rag(query, user_id): logger.info( "rag_query_started", query=query, user_id=user_id, timestamp=time.time() ) try: # Retrieval docs = retrieve(query) logger.info( "documents_retrieved", num_docs=len(docs), doc_ids=[d.id for d in docs], scores=[d.score for d in docs] ) # LLM response = llm_call(query, docs) logger.info( "rag_query_completed", response_length=len(response), tokens_used=estimate_tokens(query, docs, response) ) return response except Exception as e: logger.error( "rag_query_failed", error=str(e), error_type=type(e).__name__ ) raise

Règles d'Alertes

DEVELOPERyaml
# prometheus_alerts.yml groups: - name: rag_alerts interval: 30s rules: # High latency - alert: HighRAGLatency expr: histogram_quantile(0.95, rag_latency_seconds) > 2 for: 5m annotations: summary: "RAG p95 latency > 2s" # High error rate - alert: HighErrorRate expr: rate(rag_errors_total[5m]) > 0.05 for: 2m annotations: summary: "RAG error rate > 5%" # Low relevance - alert: LowRelevance expr: rag_relevance_score < 0.7 for: 10m annotations: summary: "RAG relevance score < 0.7" # High costs - alert: HighDailyCost expr: increase(rag_cost_usd[24h]) > 100 annotations: summary: "RAG costs > $100/day"

Requêtes Dashboard (Grafana)

DEVELOPERpromql
# Latence moyenne au fil du temps rate(rag_latency_seconds_sum[5m]) / rate(rag_latency_seconds_count[5m]) # Débit de requêtes rate(rag_queries_total[1m]) # Taux d'erreur rate(rag_errors_total[5m]) / rate(rag_queries_total[5m]) # Coût par jour increase(rag_cost_usd[24h]) # Satisfaction utilisateur avg(rag_feedback_total{rating="5"}) / sum(rag_feedback_total)

Framework de Tests A/B

DEVELOPERpython
import random def ab_test_rag(query, user_id): # Assign user to variant variant = "A" if hash(user_id) % 2 == 0 else "B" if variant == "A": # Control: current RAG response = rag_pipeline_v1(query) else: # Treatment: new RAG response = rag_pipeline_v2(query) # Track variant variant_counter.labels(variant=variant).inc() # Log for analysis db.insert({ "query": query, "user_id": user_id, "variant": variant, "timestamp": time.time() }) return response, variant

Le RAG en production nécessite une surveillance. Suivez tout, alertez sur les anomalies, itérez basé sur les données.

Tags

optimizationsurveillanceobservabilityproduction

Articles connexes

Ailog Assistant

Ici pour vous aider

Salut ! Pose-moi des questions sur Ailog et comment intégrer votre RAG dans vos projets !