7. OptimizationAdvanced

Evaluating a RAG System: Metrics and Methodologies

January 27, 2026
23 min read
Ailog Team

Complete guide to measuring your RAG performance: faithfulness, relevancy, recall, and automated evaluation frameworks.

Evaluating a RAG System: Metrics and Methodologies

A RAG system may appear to work correctly on the surface, but how do you know if it truly meets expectations? Rigorous evaluation is the key to moving from a prototype to a production-quality product. This guide presents the metrics, frameworks, and methodologies to measure and improve your RAG.

Why Evaluation is Critical

The Problem with Subjective Evaluations

Without objective metrics, RAG evaluation often comes down to:

  • "It looks correct" (confirmation bias)
  • A few manual tests on favorable cases
  • Late user feedback in production

This approach masks serious problems:

  • Subtle but frequent hallucinations
  • Off-topic responses for certain question categories
  • Progressive quality degradation after updates

Evaluation Dimensions

A RAG system must be evaluated on multiple axes:

DimensionKey QuestionExample Problem
RetrievalAre the right documents found?Relevant documents not retrieved
GenerationIs the answer faithful to sources?Hallucinations, contradictions
End-to-endDoes the answer address the question?Correct but off-topic answer
LatencyIs response time acceptable?Timeout, user frustration
RobustnessDoes the system handle edge cases?Crash on malformed queries

Retrieval Metrics

Recall@k

Measures the proportion of relevant documents found among the top k results.

DEVELOPERpython
def recall_at_k(retrieved_ids: list[str], relevant_ids: list[str], k: int) -> float: """ Recall@k: Proportion of relevant documents retrieved Args: retrieved_ids: IDs of retrieved documents (ordered by score) relevant_ids: IDs of actually relevant documents k: Number of results to consider Returns: Score between 0 and 1 """ if not relevant_ids: return 0.0 retrieved_k = set(retrieved_ids[:k]) relevant_set = set(relevant_ids) return len(retrieved_k & relevant_set) / len(relevant_set) # Example retrieved = ["doc1", "doc3", "doc5", "doc2", "doc7"] relevant = ["doc1", "doc2", "doc4"] print(f"Recall@3: {recall_at_k(retrieved, relevant, 3)}") # 0.33 (1/3) print(f"Recall@5: {recall_at_k(retrieved, relevant, 5)}") # 0.67 (2/3)

Interpretation:

  • Recall@5 of 0.8+: Excellent, most relevant documents are retrieved
  • Recall@5 of 0.5-0.8: Acceptable, but improvement possible
  • Recall@5 < 0.5: Problematic, many relevant documents missed

Precision@k

Measures the proportion of relevant documents among the k retrieved.

DEVELOPERpython
def precision_at_k(retrieved_ids: list[str], relevant_ids: list[str], k: int) -> float: """ Precision@k: Proportion of retrieved documents that are relevant """ if k == 0: return 0.0 retrieved_k = set(retrieved_ids[:k]) relevant_set = set(relevant_ids) return len(retrieved_k & relevant_set) / k # Example print(f"Precision@3: {precision_at_k(retrieved, relevant, 3)}") # 0.33 (1/3) print(f"Precision@5: {precision_at_k(retrieved, relevant, 5)}") # 0.40 (2/5)

MRR (Mean Reciprocal Rank)

Measures the average position of the first relevant document.

DEVELOPERpython
def mrr(queries_results: list[tuple[list[str], list[str]]]) -> float: """ Mean Reciprocal Rank Args: queries_results: List of (retrieved_documents, relevant_documents) Returns: MRR score between 0 and 1 """ reciprocal_ranks = [] for retrieved, relevant in queries_results: relevant_set = set(relevant) for i, doc_id in enumerate(retrieved): if doc_id in relevant_set: reciprocal_ranks.append(1 / (i + 1)) break else: reciprocal_ranks.append(0) return sum(reciprocal_ranks) / len(reciprocal_ranks) if reciprocal_ranks else 0 # Example: 3 queries results = [ (["doc1", "doc2", "doc3"], ["doc1"]), # First = relevant -> 1/1 (["doc4", "doc1", "doc2"], ["doc1"]), # Second = relevant -> 1/2 (["doc5", "doc6", "doc7"], ["doc8"]), # None relevant -> 0 ] print(f"MRR: {mrr(results)}") # (1 + 0.5 + 0) / 3 = 0.5

NDCG (Normalized Discounted Cumulative Gain)

Takes into account order AND graded relevance scores.

DEVELOPERpython
import numpy as np def dcg_at_k(relevances: list[float], k: int) -> float: """ Discounted Cumulative Gain """ relevances = np.array(relevances[:k]) if len(relevances) == 0: return 0.0 # Log base 2, 1-indexed position discounts = np.log2(np.arange(2, len(relevances) + 2)) return np.sum(relevances / discounts) def ndcg_at_k(relevances: list[float], k: int) -> float: """ Normalized DCG: Compare to ideal DCG """ dcg = dcg_at_k(relevances, k) # Ideal DCG: relevances sorted descending ideal_relevances = sorted(relevances, reverse=True) idcg = dcg_at_k(ideal_relevances, k) return dcg / idcg if idcg > 0 else 0.0 # Example with graded relevance scores (0=not relevant, 1=somewhat, 2=very) relevances = [2, 0, 1, 1, 0] # Doc 1 very relevant, doc 2 not, etc. print(f"NDCG@5: {ndcg_at_k(relevances, 5):.3f}")

Generation Metrics

Faithfulness

Measures if the answer is faithful to the provided context, without hallucination.

DEVELOPERpython
class FaithfulnessEvaluator: def __init__(self, llm): self.llm = llm async def evaluate( self, question: str, context: str, answer: str ) -> dict: """ Evaluate answer faithfulness to context """ # Step 1: Extract claims from the answer claims = await self._extract_claims(answer) # Step 2: Verify each claim against the context verification_results = [] for claim in claims: is_supported = await self._verify_claim(claim, context) verification_results.append({ "claim": claim, "supported": is_supported }) # Calculate score supported_count = sum(1 for r in verification_results if r["supported"]) score = supported_count / len(claims) if claims else 1.0 return { "score": score, "claims": verification_results, "total_claims": len(claims), "supported_claims": supported_count } async def _extract_claims(self, answer: str) -> list[str]: """ Extract factual claims from the answer """ prompt = f""" Extract all factual claims from this answer. Each claim should be a simple, verifiable sentence. Answer: {answer} Claims (one per line): """ response = await self.llm.generate(prompt, temperature=0) return [line.strip() for line in response.split("\n") if line.strip()] async def _verify_claim(self, claim: str, context: str) -> bool: """ Verify if a claim is supported by the context """ prompt = f""" Is the following claim explicitly supported by the context? Context: {context} Claim: {claim} Answer only "YES" or "NO". """ response = await self.llm.generate(prompt, temperature=0) return response.strip().upper() == "YES"

Answer Relevancy

Measures if the answer actually addresses the question asked.

DEVELOPERpython
class RelevancyEvaluator: def __init__(self, llm, embedding_model): self.llm = llm self.embedder = embedding_model async def evaluate( self, question: str, answer: str ) -> dict: """ Evaluate answer relevancy to the question """ # Method 1: Generate questions from the answer generated_questions = await self._generate_questions(answer) # Method 2: Calculate similarity with original question similarities = [] question_embedding = self.embedder.encode(question) for gen_q in generated_questions: gen_embedding = self.embedder.encode(gen_q) sim = self._cosine_similarity(question_embedding, gen_embedding) similarities.append(sim) score = sum(similarities) / len(similarities) if similarities else 0 return { "score": score, "generated_questions": generated_questions, "similarities": similarities } async def _generate_questions(self, answer: str, n: int = 3) -> list[str]: """ Generate questions that the answer could respond to """ prompt = f""" Generate {n} different questions that this answer could respond to. Answer: {answer} Questions (one per line): """ response = await self.llm.generate(prompt, temperature=0.5) return [line.strip() for line in response.split("\n") if line.strip()][:n] def _cosine_similarity(self, a, b): return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))

Context Recall

Measures if the retrieved context contains the information needed to answer.

DEVELOPERpython
class ContextRecallEvaluator: def __init__(self, llm): self.llm = llm async def evaluate( self, question: str, context: str, ground_truth: str ) -> dict: """ Evaluate if context contains information from expected answer """ # Extract facts from expected answer gt_facts = await self._extract_facts(ground_truth) # Check presence of each fact in context attributions = [] for fact in gt_facts: present = await self._check_presence(fact, context) attributions.append({ "fact": fact, "present_in_context": present }) # Score = proportion of facts present present_count = sum(1 for a in attributions if a["present_in_context"]) score = present_count / len(gt_facts) if gt_facts else 1.0 return { "score": score, "attributions": attributions, "total_facts": len(gt_facts), "facts_in_context": present_count }

RAGAS Framework

RAGAS (Retrieval Augmented Generation Assessment) is the reference framework for RAG evaluation.

Installation and Configuration

DEVELOPERpython
# pip install ragas from ragas import evaluate from ragas.metrics import ( faithfulness, answer_relevancy, context_recall, context_precision, answer_correctness ) from datasets import Dataset # Prepare evaluation data eval_data = { "question": [ "What is the return policy?", "How do I contact support?", ], "answer": [ "You have 30 days to return an unused product.", "You can contact support by email at [email protected].", ], "contexts": [ ["The return policy allows customers to return any unused product within 30 days."], ["Support is reachable by email at [email protected] or by phone at 555-123-4567."], ], "ground_truth": [ "Customers can return unused products within 30 days.", "Support is available via email ([email protected]) and phone.", ] } dataset = Dataset.from_dict(eval_data) # Evaluate results = evaluate( dataset, metrics=[ faithfulness, answer_relevancy, context_recall, context_precision, ] ) print(results)

Interpreting RAGAS Results

MetricIdeal ScoreAcceptable ThresholdAction if Low
Faithfulness> 0.9> 0.7Improve anti-hallucination prompt
Answer Relevancy> 0.85> 0.7Refine generation prompt
Context Recall> 0.8> 0.6Improve retrieval, expand sources
Context Precision> 0.8> 0.6Improve ranking, filter noise

Creating an Evaluation Dataset

Automatic Question Generation

DEVELOPERpython
class EvalDatasetGenerator: def __init__(self, llm): self.llm = llm async def generate_from_documents( self, documents: list[dict], questions_per_doc: int = 3 ) -> list[dict]: """ Generate an evaluation dataset from documents """ eval_data = [] for doc in documents: # Generate questions questions = await self._generate_questions(doc["content"], questions_per_doc) for q in questions: # Generate expected answer ground_truth = await self._generate_answer(q, doc["content"]) eval_data.append({ "question": q, "ground_truth": ground_truth, "source_doc_id": doc["id"], "source_content": doc["content"] }) return eval_data async def _generate_questions(self, content: str, n: int) -> list[str]: """ Generate questions from content """ prompt = f""" Generate {n} diverse and realistic questions that a user might ask and that this document can answer. Document: {content[:2000]} Questions (one per line, varied in complexity): """ response = await self.llm.generate(prompt, temperature=0.7) return [line.strip().lstrip("0123456789.- ") for line in response.split("\n") if line.strip()][:n] async def _generate_answer(self, question: str, content: str) -> str: """ Generate expected answer based on document """ prompt = f""" Answer this question using only information from the document. Document: {content[:2000]} Question: {question} Concise and factual answer: """ return await self.llm.generate(prompt, temperature=0)

Human Validation

DEVELOPERpython
class HumanValidation: def __init__(self, db): self.db = db async def create_validation_batch( self, eval_samples: list[dict], annotators: list[str] ) -> str: """ Create a validation batch for annotators """ batch_id = str(uuid.uuid4()) for sample in eval_samples: await self.db.insert("validation_tasks", { "batch_id": batch_id, "sample_id": sample["id"], "question": sample["question"], "rag_answer": sample["answer"], "context": sample["context"], "status": "pending", "assigned_to": None }) # Assign to annotators await self._assign_tasks(batch_id, annotators) return batch_id async def collect_annotations(self, batch_id: str) -> dict: """ Collect annotations and calculate inter-annotator agreement """ tasks = await self.db.find("validation_tasks", {"batch_id": batch_id}) # Calculate Cohen's Kappa for agreement annotations = {} for task in tasks: sample_id = task["sample_id"] if sample_id not in annotations: annotations[sample_id] = [] if task.get("annotation"): annotations[sample_id].append(task["annotation"]) # Check agreement agreement_scores = [] for sample_id, annots in annotations.items(): if len(annots) >= 2: agreement = self._calculate_agreement(annots) agreement_scores.append(agreement) return { "batch_id": batch_id, "total_samples": len(eval_samples), "completed": len([a for a in annotations.values() if len(a) >= 2]), "average_agreement": sum(agreement_scores) / len(agreement_scores) if agreement_scores else 0 }

Automated Evaluation Pipeline

CI/CD Integration

DEVELOPERpython
import json from datetime import datetime class RAGEvaluationPipeline: def __init__(self, rag_system, evaluator, dataset_path: str): self.rag = rag_system self.evaluator = evaluator self.dataset = self._load_dataset(dataset_path) def _load_dataset(self, path: str) -> list[dict]: with open(path) as f: return json.load(f) async def run_evaluation(self, version: str = None) -> dict: """ Run a complete evaluation """ results = { "version": version or datetime.now().isoformat(), "timestamp": datetime.now().isoformat(), "samples": [], "metrics": {} } # Evaluate each sample for sample in self.dataset: # Execute RAG rag_result = await self.rag.query(sample["question"]) # Evaluate sample_result = { "question": sample["question"], "ground_truth": sample["ground_truth"], "rag_answer": rag_result["answer"], "retrieved_docs": rag_result["sources"], "scores": {} } # Faithfulness faithfulness = await self.evaluator.faithfulness( sample["question"], rag_result["context"], rag_result["answer"] ) sample_result["scores"]["faithfulness"] = faithfulness["score"] # Relevancy relevancy = await self.evaluator.relevancy( sample["question"], rag_result["answer"] ) sample_result["scores"]["relevancy"] = relevancy["score"] results["samples"].append(sample_result) # Aggregate metrics results["metrics"] = self._aggregate_metrics(results["samples"]) return results def _aggregate_metrics(self, samples: list[dict]) -> dict: """ Aggregate metrics across all samples """ metrics = {} for metric_name in ["faithfulness", "relevancy"]: scores = [s["scores"].get(metric_name, 0) for s in samples] metrics[metric_name] = { "mean": sum(scores) / len(scores), "min": min(scores), "max": max(scores), "std": np.std(scores) } return metrics async def check_thresholds(self, results: dict, thresholds: dict) -> bool: """ Check if results pass defined thresholds """ passed = True for metric, threshold in thresholds.items(): actual = results["metrics"].get(metric, {}).get("mean", 0) if actual < threshold: print(f"FAIL: {metric} = {actual:.3f} < {threshold}") passed = False else: print(f"PASS: {metric} = {actual:.3f} >= {threshold}") return passed

GitHub Actions Configuration

DEVELOPERyaml
# .github/workflows/rag-evaluation.yml name: RAG Evaluation on: pull_request: paths: - 'rag/**' - 'prompts/**' schedule: - cron: '0 6 * * *' # Daily at 6am jobs: evaluate: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 - name: Setup Python uses: actions/setup-python@v4 with: python-version: '3.11' - name: Install dependencies run: pip install -r requirements-eval.txt - name: Run evaluation env: OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }} run: python scripts/run_evaluation.py - name: Check thresholds run: python scripts/check_thresholds.py --results eval_results.json - name: Upload results uses: actions/upload-artifact@v3 with: name: eval-results path: eval_results.json - name: Comment PR if: github.event_name == 'pull_request' uses: actions/github-script@v6 with: script: | const results = require('./eval_results.json'); const body = `## RAG Evaluation Results | Metric | Score | Threshold | Status | |--------|-------|-----------|--------| | Faithfulness | ${results.metrics.faithfulness.mean.toFixed(3)} | 0.80 | ${results.metrics.faithfulness.mean >= 0.8 ? 'Pass' : 'Fail'} | | Relevancy | ${results.metrics.relevancy.mean.toFixed(3)} | 0.75 | ${results.metrics.relevancy.mean >= 0.75 ? 'Pass' : 'Fail'} | `; github.rest.issues.createComment({ owner: context.repo.owner, repo: context.repo.repo, issue_number: context.issue.number, body: body });

Production Monitoring

DEVELOPERpython
class ProductionMonitor: def __init__(self, analytics_db, alert_service): self.db = analytics_db self.alerts = alert_service async def log_interaction(self, interaction: dict): """ Log a RAG interaction for monitoring """ await self.db.insert("rag_interactions", { "timestamp": datetime.now(), "query": interaction["query"], "answer": interaction["answer"], "sources": interaction["sources"], "latency_ms": interaction["latency_ms"], "user_id": interaction.get("user_id"), "session_id": interaction.get("session_id") }) # Check alerts await self._check_alerts(interaction) async def _check_alerts(self, interaction: dict): """ Check alert conditions """ # High latency if interaction["latency_ms"] > 5000: await self.alerts.send("HIGH_LATENCY", { "latency_ms": interaction["latency_ms"], "query": interaction["query"][:100] }) # No sources found if not interaction["sources"]: await self.alerts.send("NO_SOURCES", { "query": interaction["query"] }) async def get_daily_metrics(self) -> dict: """ Daily metrics for dashboard """ today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) return { "total_queries": await self.db.count( "rag_interactions", {"timestamp": {"$gte": today}} ), "avg_latency_ms": await self.db.avg( "rag_interactions", "latency_ms", {"timestamp": {"$gte": today}} ), "zero_result_rate": await self._zero_result_rate(today), "unique_users": await self.db.distinct_count( "rag_interactions", "user_id", {"timestamp": {"$gte": today}} ) }

Evaluation Checklist

Before Deployment

  • Evaluation dataset of 100+ representative questions
  • Ground truth validated by domain experts
  • Thresholds defined for each metric
  • CI/CD pipeline configured

Ongoing

  • Latency metrics monitoring
  • Degradation alerts
  • User feedback collected and analyzed
  • Weekly evaluations on new cases

Learn More


Simplified Evaluation with Ailog

Setting up a robust RAG evaluation pipeline takes time and expertise. With Ailog, benefit from built-in evaluation tools:

  • Quality dashboard with real-time metrics
  • Automatic evaluation on each deployment
  • Integrated feedback loop for continuous improvement
  • Alerts on performance degradation
  • History of evaluations for long-term tracking

Discover Ailog and measure your RAG quality easily.

Tags

RAGevaluationmetricsRAGASqualitybenchmarking

Related Posts

Ailog Assistant

Ici pour vous aider

Salut ! Pose-moi des questions sur Ailog et comment intégrer votre RAG dans vos projets !