Evaluating a RAG System: Metrics and Methodologies
Complete guide to measuring your RAG performance: faithfulness, relevancy, recall, and automated evaluation frameworks.
Evaluating a RAG System: Metrics and Methodologies
A RAG system may appear to work correctly on the surface, but how do you know if it truly meets expectations? Rigorous evaluation is the key to moving from a prototype to a production-quality product. This guide presents the metrics, frameworks, and methodologies to measure and improve your RAG.
Why Evaluation is Critical
The Problem with Subjective Evaluations
Without objective metrics, RAG evaluation often comes down to:
- "It looks correct" (confirmation bias)
- A few manual tests on favorable cases
- Late user feedback in production
This approach masks serious problems:
- Subtle but frequent hallucinations
- Off-topic responses for certain question categories
- Progressive quality degradation after updates
Evaluation Dimensions
A RAG system must be evaluated on multiple axes:
| Dimension | Key Question | Example Problem |
|---|---|---|
| Retrieval | Are the right documents found? | Relevant documents not retrieved |
| Generation | Is the answer faithful to sources? | Hallucinations, contradictions |
| End-to-end | Does the answer address the question? | Correct but off-topic answer |
| Latency | Is response time acceptable? | Timeout, user frustration |
| Robustness | Does the system handle edge cases? | Crash on malformed queries |
Retrieval Metrics
Recall@k
Measures the proportion of relevant documents found among the top k results.
DEVELOPERpythondef recall_at_k(retrieved_ids: list[str], relevant_ids: list[str], k: int) -> float: """ Recall@k: Proportion of relevant documents retrieved Args: retrieved_ids: IDs of retrieved documents (ordered by score) relevant_ids: IDs of actually relevant documents k: Number of results to consider Returns: Score between 0 and 1 """ if not relevant_ids: return 0.0 retrieved_k = set(retrieved_ids[:k]) relevant_set = set(relevant_ids) return len(retrieved_k & relevant_set) / len(relevant_set) # Example retrieved = ["doc1", "doc3", "doc5", "doc2", "doc7"] relevant = ["doc1", "doc2", "doc4"] print(f"Recall@3: {recall_at_k(retrieved, relevant, 3)}") # 0.33 (1/3) print(f"Recall@5: {recall_at_k(retrieved, relevant, 5)}") # 0.67 (2/3)
Interpretation:
- Recall@5 of 0.8+: Excellent, most relevant documents are retrieved
- Recall@5 of 0.5-0.8: Acceptable, but improvement possible
- Recall@5 < 0.5: Problematic, many relevant documents missed
Precision@k
Measures the proportion of relevant documents among the k retrieved.
DEVELOPERpythondef precision_at_k(retrieved_ids: list[str], relevant_ids: list[str], k: int) -> float: """ Precision@k: Proportion of retrieved documents that are relevant """ if k == 0: return 0.0 retrieved_k = set(retrieved_ids[:k]) relevant_set = set(relevant_ids) return len(retrieved_k & relevant_set) / k # Example print(f"Precision@3: {precision_at_k(retrieved, relevant, 3)}") # 0.33 (1/3) print(f"Precision@5: {precision_at_k(retrieved, relevant, 5)}") # 0.40 (2/5)
MRR (Mean Reciprocal Rank)
Measures the average position of the first relevant document.
DEVELOPERpythondef mrr(queries_results: list[tuple[list[str], list[str]]]) -> float: """ Mean Reciprocal Rank Args: queries_results: List of (retrieved_documents, relevant_documents) Returns: MRR score between 0 and 1 """ reciprocal_ranks = [] for retrieved, relevant in queries_results: relevant_set = set(relevant) for i, doc_id in enumerate(retrieved): if doc_id in relevant_set: reciprocal_ranks.append(1 / (i + 1)) break else: reciprocal_ranks.append(0) return sum(reciprocal_ranks) / len(reciprocal_ranks) if reciprocal_ranks else 0 # Example: 3 queries results = [ (["doc1", "doc2", "doc3"], ["doc1"]), # First = relevant -> 1/1 (["doc4", "doc1", "doc2"], ["doc1"]), # Second = relevant -> 1/2 (["doc5", "doc6", "doc7"], ["doc8"]), # None relevant -> 0 ] print(f"MRR: {mrr(results)}") # (1 + 0.5 + 0) / 3 = 0.5
NDCG (Normalized Discounted Cumulative Gain)
Takes into account order AND graded relevance scores.
DEVELOPERpythonimport numpy as np def dcg_at_k(relevances: list[float], k: int) -> float: """ Discounted Cumulative Gain """ relevances = np.array(relevances[:k]) if len(relevances) == 0: return 0.0 # Log base 2, 1-indexed position discounts = np.log2(np.arange(2, len(relevances) + 2)) return np.sum(relevances / discounts) def ndcg_at_k(relevances: list[float], k: int) -> float: """ Normalized DCG: Compare to ideal DCG """ dcg = dcg_at_k(relevances, k) # Ideal DCG: relevances sorted descending ideal_relevances = sorted(relevances, reverse=True) idcg = dcg_at_k(ideal_relevances, k) return dcg / idcg if idcg > 0 else 0.0 # Example with graded relevance scores (0=not relevant, 1=somewhat, 2=very) relevances = [2, 0, 1, 1, 0] # Doc 1 very relevant, doc 2 not, etc. print(f"NDCG@5: {ndcg_at_k(relevances, 5):.3f}")
Generation Metrics
Faithfulness
Measures if the answer is faithful to the provided context, without hallucination.
DEVELOPERpythonclass FaithfulnessEvaluator: def __init__(self, llm): self.llm = llm async def evaluate( self, question: str, context: str, answer: str ) -> dict: """ Evaluate answer faithfulness to context """ # Step 1: Extract claims from the answer claims = await self._extract_claims(answer) # Step 2: Verify each claim against the context verification_results = [] for claim in claims: is_supported = await self._verify_claim(claim, context) verification_results.append({ "claim": claim, "supported": is_supported }) # Calculate score supported_count = sum(1 for r in verification_results if r["supported"]) score = supported_count / len(claims) if claims else 1.0 return { "score": score, "claims": verification_results, "total_claims": len(claims), "supported_claims": supported_count } async def _extract_claims(self, answer: str) -> list[str]: """ Extract factual claims from the answer """ prompt = f""" Extract all factual claims from this answer. Each claim should be a simple, verifiable sentence. Answer: {answer} Claims (one per line): """ response = await self.llm.generate(prompt, temperature=0) return [line.strip() for line in response.split("\n") if line.strip()] async def _verify_claim(self, claim: str, context: str) -> bool: """ Verify if a claim is supported by the context """ prompt = f""" Is the following claim explicitly supported by the context? Context: {context} Claim: {claim} Answer only "YES" or "NO". """ response = await self.llm.generate(prompt, temperature=0) return response.strip().upper() == "YES"
Answer Relevancy
Measures if the answer actually addresses the question asked.
DEVELOPERpythonclass RelevancyEvaluator: def __init__(self, llm, embedding_model): self.llm = llm self.embedder = embedding_model async def evaluate( self, question: str, answer: str ) -> dict: """ Evaluate answer relevancy to the question """ # Method 1: Generate questions from the answer generated_questions = await self._generate_questions(answer) # Method 2: Calculate similarity with original question similarities = [] question_embedding = self.embedder.encode(question) for gen_q in generated_questions: gen_embedding = self.embedder.encode(gen_q) sim = self._cosine_similarity(question_embedding, gen_embedding) similarities.append(sim) score = sum(similarities) / len(similarities) if similarities else 0 return { "score": score, "generated_questions": generated_questions, "similarities": similarities } async def _generate_questions(self, answer: str, n: int = 3) -> list[str]: """ Generate questions that the answer could respond to """ prompt = f""" Generate {n} different questions that this answer could respond to. Answer: {answer} Questions (one per line): """ response = await self.llm.generate(prompt, temperature=0.5) return [line.strip() for line in response.split("\n") if line.strip()][:n] def _cosine_similarity(self, a, b): return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
Context Recall
Measures if the retrieved context contains the information needed to answer.
DEVELOPERpythonclass ContextRecallEvaluator: def __init__(self, llm): self.llm = llm async def evaluate( self, question: str, context: str, ground_truth: str ) -> dict: """ Evaluate if context contains information from expected answer """ # Extract facts from expected answer gt_facts = await self._extract_facts(ground_truth) # Check presence of each fact in context attributions = [] for fact in gt_facts: present = await self._check_presence(fact, context) attributions.append({ "fact": fact, "present_in_context": present }) # Score = proportion of facts present present_count = sum(1 for a in attributions if a["present_in_context"]) score = present_count / len(gt_facts) if gt_facts else 1.0 return { "score": score, "attributions": attributions, "total_facts": len(gt_facts), "facts_in_context": present_count }
RAGAS Framework
RAGAS (Retrieval Augmented Generation Assessment) is the reference framework for RAG evaluation.
Installation and Configuration
DEVELOPERpython# pip install ragas from ragas import evaluate from ragas.metrics import ( faithfulness, answer_relevancy, context_recall, context_precision, answer_correctness ) from datasets import Dataset # Prepare evaluation data eval_data = { "question": [ "What is the return policy?", "How do I contact support?", ], "answer": [ "You have 30 days to return an unused product.", "You can contact support by email at [email protected].", ], "contexts": [ ["The return policy allows customers to return any unused product within 30 days."], ["Support is reachable by email at [email protected] or by phone at 555-123-4567."], ], "ground_truth": [ "Customers can return unused products within 30 days.", "Support is available via email ([email protected]) and phone.", ] } dataset = Dataset.from_dict(eval_data) # Evaluate results = evaluate( dataset, metrics=[ faithfulness, answer_relevancy, context_recall, context_precision, ] ) print(results)
Interpreting RAGAS Results
| Metric | Ideal Score | Acceptable Threshold | Action if Low |
|---|---|---|---|
| Faithfulness | > 0.9 | > 0.7 | Improve anti-hallucination prompt |
| Answer Relevancy | > 0.85 | > 0.7 | Refine generation prompt |
| Context Recall | > 0.8 | > 0.6 | Improve retrieval, expand sources |
| Context Precision | > 0.8 | > 0.6 | Improve ranking, filter noise |
Creating an Evaluation Dataset
Automatic Question Generation
DEVELOPERpythonclass EvalDatasetGenerator: def __init__(self, llm): self.llm = llm async def generate_from_documents( self, documents: list[dict], questions_per_doc: int = 3 ) -> list[dict]: """ Generate an evaluation dataset from documents """ eval_data = [] for doc in documents: # Generate questions questions = await self._generate_questions(doc["content"], questions_per_doc) for q in questions: # Generate expected answer ground_truth = await self._generate_answer(q, doc["content"]) eval_data.append({ "question": q, "ground_truth": ground_truth, "source_doc_id": doc["id"], "source_content": doc["content"] }) return eval_data async def _generate_questions(self, content: str, n: int) -> list[str]: """ Generate questions from content """ prompt = f""" Generate {n} diverse and realistic questions that a user might ask and that this document can answer. Document: {content[:2000]} Questions (one per line, varied in complexity): """ response = await self.llm.generate(prompt, temperature=0.7) return [line.strip().lstrip("0123456789.- ") for line in response.split("\n") if line.strip()][:n] async def _generate_answer(self, question: str, content: str) -> str: """ Generate expected answer based on document """ prompt = f""" Answer this question using only information from the document. Document: {content[:2000]} Question: {question} Concise and factual answer: """ return await self.llm.generate(prompt, temperature=0)
Human Validation
DEVELOPERpythonclass HumanValidation: def __init__(self, db): self.db = db async def create_validation_batch( self, eval_samples: list[dict], annotators: list[str] ) -> str: """ Create a validation batch for annotators """ batch_id = str(uuid.uuid4()) for sample in eval_samples: await self.db.insert("validation_tasks", { "batch_id": batch_id, "sample_id": sample["id"], "question": sample["question"], "rag_answer": sample["answer"], "context": sample["context"], "status": "pending", "assigned_to": None }) # Assign to annotators await self._assign_tasks(batch_id, annotators) return batch_id async def collect_annotations(self, batch_id: str) -> dict: """ Collect annotations and calculate inter-annotator agreement """ tasks = await self.db.find("validation_tasks", {"batch_id": batch_id}) # Calculate Cohen's Kappa for agreement annotations = {} for task in tasks: sample_id = task["sample_id"] if sample_id not in annotations: annotations[sample_id] = [] if task.get("annotation"): annotations[sample_id].append(task["annotation"]) # Check agreement agreement_scores = [] for sample_id, annots in annotations.items(): if len(annots) >= 2: agreement = self._calculate_agreement(annots) agreement_scores.append(agreement) return { "batch_id": batch_id, "total_samples": len(eval_samples), "completed": len([a for a in annotations.values() if len(a) >= 2]), "average_agreement": sum(agreement_scores) / len(agreement_scores) if agreement_scores else 0 }
Automated Evaluation Pipeline
CI/CD Integration
DEVELOPERpythonimport json from datetime import datetime class RAGEvaluationPipeline: def __init__(self, rag_system, evaluator, dataset_path: str): self.rag = rag_system self.evaluator = evaluator self.dataset = self._load_dataset(dataset_path) def _load_dataset(self, path: str) -> list[dict]: with open(path) as f: return json.load(f) async def run_evaluation(self, version: str = None) -> dict: """ Run a complete evaluation """ results = { "version": version or datetime.now().isoformat(), "timestamp": datetime.now().isoformat(), "samples": [], "metrics": {} } # Evaluate each sample for sample in self.dataset: # Execute RAG rag_result = await self.rag.query(sample["question"]) # Evaluate sample_result = { "question": sample["question"], "ground_truth": sample["ground_truth"], "rag_answer": rag_result["answer"], "retrieved_docs": rag_result["sources"], "scores": {} } # Faithfulness faithfulness = await self.evaluator.faithfulness( sample["question"], rag_result["context"], rag_result["answer"] ) sample_result["scores"]["faithfulness"] = faithfulness["score"] # Relevancy relevancy = await self.evaluator.relevancy( sample["question"], rag_result["answer"] ) sample_result["scores"]["relevancy"] = relevancy["score"] results["samples"].append(sample_result) # Aggregate metrics results["metrics"] = self._aggregate_metrics(results["samples"]) return results def _aggregate_metrics(self, samples: list[dict]) -> dict: """ Aggregate metrics across all samples """ metrics = {} for metric_name in ["faithfulness", "relevancy"]: scores = [s["scores"].get(metric_name, 0) for s in samples] metrics[metric_name] = { "mean": sum(scores) / len(scores), "min": min(scores), "max": max(scores), "std": np.std(scores) } return metrics async def check_thresholds(self, results: dict, thresholds: dict) -> bool: """ Check if results pass defined thresholds """ passed = True for metric, threshold in thresholds.items(): actual = results["metrics"].get(metric, {}).get("mean", 0) if actual < threshold: print(f"FAIL: {metric} = {actual:.3f} < {threshold}") passed = False else: print(f"PASS: {metric} = {actual:.3f} >= {threshold}") return passed
GitHub Actions Configuration
DEVELOPERyaml# .github/workflows/rag-evaluation.yml name: RAG Evaluation on: pull_request: paths: - 'rag/**' - 'prompts/**' schedule: - cron: '0 6 * * *' # Daily at 6am jobs: evaluate: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 - name: Setup Python uses: actions/setup-python@v4 with: python-version: '3.11' - name: Install dependencies run: pip install -r requirements-eval.txt - name: Run evaluation env: OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }} run: python scripts/run_evaluation.py - name: Check thresholds run: python scripts/check_thresholds.py --results eval_results.json - name: Upload results uses: actions/upload-artifact@v3 with: name: eval-results path: eval_results.json - name: Comment PR if: github.event_name == 'pull_request' uses: actions/github-script@v6 with: script: | const results = require('./eval_results.json'); const body = `## RAG Evaluation Results | Metric | Score | Threshold | Status | |--------|-------|-----------|--------| | Faithfulness | ${results.metrics.faithfulness.mean.toFixed(3)} | 0.80 | ${results.metrics.faithfulness.mean >= 0.8 ? 'Pass' : 'Fail'} | | Relevancy | ${results.metrics.relevancy.mean.toFixed(3)} | 0.75 | ${results.metrics.relevancy.mean >= 0.75 ? 'Pass' : 'Fail'} | `; github.rest.issues.createComment({ owner: context.repo.owner, repo: context.repo.repo, issue_number: context.issue.number, body: body });
Production Monitoring
DEVELOPERpythonclass ProductionMonitor: def __init__(self, analytics_db, alert_service): self.db = analytics_db self.alerts = alert_service async def log_interaction(self, interaction: dict): """ Log a RAG interaction for monitoring """ await self.db.insert("rag_interactions", { "timestamp": datetime.now(), "query": interaction["query"], "answer": interaction["answer"], "sources": interaction["sources"], "latency_ms": interaction["latency_ms"], "user_id": interaction.get("user_id"), "session_id": interaction.get("session_id") }) # Check alerts await self._check_alerts(interaction) async def _check_alerts(self, interaction: dict): """ Check alert conditions """ # High latency if interaction["latency_ms"] > 5000: await self.alerts.send("HIGH_LATENCY", { "latency_ms": interaction["latency_ms"], "query": interaction["query"][:100] }) # No sources found if not interaction["sources"]: await self.alerts.send("NO_SOURCES", { "query": interaction["query"] }) async def get_daily_metrics(self) -> dict: """ Daily metrics for dashboard """ today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) return { "total_queries": await self.db.count( "rag_interactions", {"timestamp": {"$gte": today}} ), "avg_latency_ms": await self.db.avg( "rag_interactions", "latency_ms", {"timestamp": {"$gte": today}} ), "zero_result_rate": await self._zero_result_rate(today), "unique_users": await self.db.distinct_count( "rag_interactions", "user_id", {"timestamp": {"$gte": today}} ) }
Evaluation Checklist
Before Deployment
- Evaluation dataset of 100+ representative questions
- Ground truth validated by domain experts
- Thresholds defined for each metric
- CI/CD pipeline configured
Ongoing
- Latency metrics monitoring
- Degradation alerts
- User feedback collected and analyzed
- Weekly evaluations on new cases
Learn More
- Introduction to RAG - Understand the fundamentals
- Retrieval Fundamentals - Improve search
- RAG Generation - Optimize responses
Simplified Evaluation with Ailog
Setting up a robust RAG evaluation pipeline takes time and expertise. With Ailog, benefit from built-in evaluation tools:
- Quality dashboard with real-time metrics
- Automatic evaluation on each deployment
- Integrated feedback loop for continuous improvement
- Alerts on performance degradation
- History of evaluations for long-term tracking
Discover Ailog and measure your RAG quality easily.
Tags
Related Posts
Reduce RAG Latency: From 2000ms to 200ms
10x faster RAG: parallel retrieval, streaming responses, and architectural optimizations for sub-200ms latency.
RAG Monitoring and Observability
Monitor RAG systems in production: track latency, costs, accuracy, and user satisfaction with metrics and dashboards.
Caching Strategies to Reduce RAG Latency and Cost
Cut costs by 80%: implement semantic caching, embedding caching, and response caching for production RAG.