Ensemble Retrieval: Combining Multiple Retrievers
Implement ensemble retrieval to combine the strengths of multiple retrievers. Voting, stacking, and advanced fusion strategies.
Ensemble Retrieval: Combining Multiple Retrievers
Ensemble retrieval applies the machine learning ensemble principle to search systems: combining predictions from multiple models to achieve better results than any single model. This guide explores how to orchestrate multiple retrievers to maximize retrieval quality.
Why Ensemble Retrieval?
Each retriever has its blind spots:
| Retriever | Strengths | Weaknesses |
|---|---|---|
| Dense (BGE) | General semantics | Rare terms |
| Dense (E5) | Multilingual | Short queries |
| Sparse (BM25) | Exact matching | Synonyms |
| Sparse (TF-IDF) | Fast | Less precise |
| Knowledge Graph | Relationships | Limited coverage |
An ensemble compensates for each retriever's weaknesses by exploiting their complementary strengths.
Benchmark: Ensemble vs Single Retriever
| Configuration | NDCG@10 | Recall@10 | Latency |
|---|---|---|---|
| BGE alone | 0.68 | 0.72 | 45ms |
| BM25 alone | 0.61 | 0.68 | 12ms |
| BGE + BM25 | 0.74 | 0.81 | 52ms |
| BGE + E5 + BM25 | 0.77 | 0.84 | 85ms |
The 3-retriever ensemble improves NDCG by 13% for only 2x the latency.
Ensemble Strategies
1. Voting (Hard Ensemble)
Each retriever "votes" for documents, keep those with the most votes:
DEVELOPERpythonfrom collections import Counter class VotingEnsemble: def __init__(self, retrievers: list): self.retrievers = retrievers def search(self, query: str, top_k: int = 5) -> list[dict]: # Collect votes from each retriever all_results = {} for retriever in self.retrievers: results = retriever.search(query, top_k=top_k * 2) for rank, result in enumerate(results): doc_id = result["id"] if doc_id not in all_results: all_results[doc_id] = { "content": result["content"], "votes": 0, "retrievers": [] } all_results[doc_id]["votes"] += 1 all_results[doc_id]["retrievers"].append(retriever.name) # Sort by vote count sorted_results = sorted( all_results.values(), key=lambda x: x["votes"], reverse=True ) return sorted_results[:top_k] # Example ensemble = VotingEnsemble([ DenseRetriever("bge"), DenseRetriever("e5"), SparseRetriever("bm25") ]) results = ensemble.search("How to configure OAuth?") for r in results: print(f"Votes: {r['votes']}, Retrievers: {r['retrievers']}")
2. Score Fusion (Soft Ensemble)
Combines normalized scores from each retriever:
DEVELOPERpythonimport numpy as np class ScoreFusionEnsemble: def __init__( self, retrievers: list, weights: list[float] = None, normalization: str = "min_max" # "min_max", "z_score", "rank" ): self.retrievers = retrievers self.weights = weights or [1.0] * len(retrievers) self.normalization = normalization def search(self, query: str, top_k: int = 5) -> list[dict]: all_results = {} for retriever, weight in zip(self.retrievers, self.weights): results = retriever.search(query, top_k=top_k * 2) # Normalize scores scores = [r["score"] for r in results] normalized = self._normalize(scores) for result, norm_score in zip(results, normalized): doc_id = result["id"] if doc_id not in all_results: all_results[doc_id] = { "content": result["content"], "scores": {}, "weighted_sum": 0 } all_results[doc_id]["scores"][retriever.name] = norm_score all_results[doc_id]["weighted_sum"] += weight * norm_score # Sort by weighted score sorted_results = sorted( all_results.values(), key=lambda x: x["weighted_sum"], reverse=True ) return sorted_results[:top_k] def _normalize(self, scores: list[float]) -> list[float]: if not scores: return [] if self.normalization == "min_max": min_s, max_s = min(scores), max(scores) range_s = max_s - min_s if max_s != min_s else 1 return [(s - min_s) / range_s for s in scores] elif self.normalization == "z_score": mean = np.mean(scores) std = np.std(scores) or 1 return [(s - mean) / std for s in scores] elif self.normalization == "rank": # Rank-based score (0 to 1) n = len(scores) return [(n - i) / n for i in range(n)] # Example with custom weights ensemble = ScoreFusionEnsemble( retrievers=[dense_bge, dense_e5, sparse_bm25], weights=[0.4, 0.3, 0.3], normalization="min_max" )
3. Reciprocal Rank Fusion (RRF)
Combines rankings without requiring score normalization:
DEVELOPERpythonclass RRFEnsemble: def __init__( self, retrievers: list, k: int = 60, weights: list[float] = None ): self.retrievers = retrievers self.k = k self.weights = weights or [1.0] * len(retrievers) def search(self, query: str, top_k: int = 5) -> list[dict]: rrf_scores = {} doc_contents = {} for retriever, weight in zip(self.retrievers, self.weights): results = retriever.search(query, top_k=top_k * 3) for rank, result in enumerate(results, start=1): doc_id = result["id"] doc_contents[doc_id] = result["content"] if doc_id not in rrf_scores: rrf_scores[doc_id] = 0 # Weighted RRF formula rrf_scores[doc_id] += weight / (self.k + rank) # Build results sorted_ids = sorted(rrf_scores.keys(), key=lambda x: rrf_scores[x], reverse=True) return [ { "id": doc_id, "content": doc_contents[doc_id], "rrf_score": rrf_scores[doc_id] } for doc_id in sorted_ids[:top_k] ] # Example rrf_ensemble = RRFEnsemble( retrievers=[dense_bge, sparse_bm25], k=60, weights=[0.6, 0.4] )
4. Stacking with Reranker
Uses a reranking model to combine results:
DEVELOPERpythonfrom sentence_transformers import CrossEncoder class StackedEnsemble: def __init__( self, retrievers: list, reranker_model: str = "cross-encoder/ms-marco-MiniLM-L-6-v2" ): self.retrievers = retrievers self.reranker = CrossEncoder(reranker_model) def search(self, query: str, top_k: int = 5, rerank_k: int = 20) -> list[dict]: # 1. Collect candidates from all retrievers candidates = {} for retriever in self.retrievers: results = retriever.search(query, top_k=rerank_k) for result in results: doc_id = result["id"] if doc_id not in candidates: candidates[doc_id] = result["content"] # 2. Rerank all candidates candidate_list = list(candidates.items()) pairs = [[query, content] for _, content in candidate_list] rerank_scores = self.reranker.predict(pairs) # 3. Build final results results = [ { "id": doc_id, "content": content, "rerank_score": float(score) } for (doc_id, content), score in zip(candidate_list, rerank_scores) ] return sorted(results, key=lambda x: x["rerank_score"], reverse=True)[:top_k]
5. Cascade Ensemble
Cascade approach: first retriever filters, subsequent ones refine:
DEVELOPERpythonclass CascadeEnsemble: def __init__( self, fast_retriever, precise_retriever, cascade_threshold: float = 0.7 ): self.fast = fast_retriever self.precise = precise_retriever self.threshold = cascade_threshold def search(self, query: str, top_k: int = 5) -> list[dict]: # Stage 1: Fast retrieval (large recall) fast_results = self.fast.search(query, top_k=top_k * 4) # Check if results are confident enough max_score = max(r["score"] for r in fast_results) if fast_results else 0 if max_score >= self.threshold: # High confidence: return fast results return fast_results[:top_k] # Stage 2: Precise retrieval on candidates candidate_ids = [r["id"] for r in fast_results] precise_results = self.precise.search( query, top_k=top_k, filter_ids=candidate_ids # Search only among candidates ) return precise_results
Specialized Ensembles
Multi-Domain Ensemble
Use different retrievers based on domain:
DEVELOPERpythonclass MultiDomainEnsemble: def __init__(self): self.domain_retrievers = { "technical": [ DenseRetriever("codesearch"), SparseRetriever("bm25") ], "general": [ DenseRetriever("bge"), DenseRetriever("e5") ], "multilingual": [ DenseRetriever("multilingual-e5"), DenseRetriever("mbert") ] } self.domain_classifier = DomainClassifier() def search(self, query: str, top_k: int = 5) -> list[dict]: # Detect domain domain = self.domain_classifier.predict(query) # Select appropriate retrievers retrievers = self.domain_retrievers.get(domain, self.domain_retrievers["general"]) # Ensemble on selected retrievers ensemble = RRFEnsemble(retrievers) return ensemble.search(query, top_k=top_k)
Adaptive Ensemble
Dynamically adjust weights based on query characteristics:
DEVELOPERpythonclass AdaptiveEnsemble: def __init__(self, retrievers: list): self.retrievers = retrievers self.query_analyzer = QueryAnalyzer() def search(self, query: str, top_k: int = 5) -> list[dict]: # Analyze query query_features = self.query_analyzer.analyze(query) # Compute adaptive weights weights = self._compute_adaptive_weights(query_features) # Ensemble with adaptive weights ensemble = ScoreFusionEnsemble( self.retrievers, weights=weights ) return ensemble.search(query, top_k=top_k) def _compute_adaptive_weights(self, features: dict) -> list[float]: weights = [] for retriever in self.retrievers: weight = 1.0 # Dense performs well on long queries if retriever.type == "dense" and features["length"] > 10: weight *= 1.3 # Sparse performs well on technical terms if retriever.type == "sparse" and features["has_technical_terms"]: weight *= 1.4 # Boost if language matches if hasattr(retriever, "language") and retriever.language == features["language"]: weight *= 1.2 weights.append(weight) # Normalize total = sum(weights) return [w / total for w in weights] class QueryAnalyzer: def analyze(self, query: str) -> dict: return { "length": len(query.split()), "has_technical_terms": self._detect_technical(query), "language": self._detect_language(query), "is_question": query.strip().endswith("?") } def _detect_technical(self, query: str) -> bool: technical_patterns = ["api", "config", "error", "oauth", "webhook"] return any(p in query.lower() for p in technical_patterns) def _detect_language(self, query: str) -> str: # Simplified - use langdetect in production french_words = ["comment", "pourquoi", "quel", "est-ce"] return "fr" if any(w in query.lower() for w in french_words) else "en"
Performance Optimization
Parallel Search
DEVELOPERpythonimport asyncio from concurrent.futures import ThreadPoolExecutor class ParallelEnsemble: def __init__(self, retrievers: list, max_workers: int = 4): self.retrievers = retrievers self.executor = ThreadPoolExecutor(max_workers=max_workers) async def search(self, query: str, top_k: int = 5) -> list[dict]: loop = asyncio.get_event_loop() # Launch all searches in parallel tasks = [ loop.run_in_executor( self.executor, retriever.search, query, top_k * 2 ) for retriever in self.retrievers ] # Wait for all results all_results = await asyncio.gather(*tasks) # Merge with RRF return self._rrf_fusion(all_results, top_k) def _rrf_fusion(self, all_results: list, top_k: int, k: int = 60) -> list[dict]: rrf_scores = {} contents = {} for results in all_results: for rank, result in enumerate(results, start=1): doc_id = result["id"] contents[doc_id] = result["content"] rrf_scores[doc_id] = rrf_scores.get(doc_id, 0) + 1 / (k + rank) sorted_ids = sorted(rrf_scores.keys(), key=lambda x: rrf_scores[x], reverse=True) return [ {"id": doc_id, "content": contents[doc_id], "score": rrf_scores[doc_id]} for doc_id in sorted_ids[:top_k] ]
Smart Caching
DEVELOPERpythonclass CachedEnsemble: def __init__(self, ensemble, cache_ttl: int = 3600): self.ensemble = ensemble self.cache = {} self.cache_ttl = cache_ttl def search(self, query: str, top_k: int = 5) -> list[dict]: cache_key = f"{query}:{top_k}" # Check cache if cache_key in self.cache: cached, timestamp = self.cache[cache_key] if time.time() - timestamp < self.cache_ttl: return cached # Execute search results = self.ensemble.search(query, top_k) # Cache results self.cache[cache_key] = (results, time.time()) return results
Evaluation and Tuning
DEVELOPERpythonclass EnsembleEvaluator: def evaluate_configurations( self, queries: list[dict], retrievers: list, configurations: list[dict] ) -> pd.DataFrame: """ Test different ensemble configurations configurations = [ {"type": "rrf", "k": 60}, {"type": "score_fusion", "weights": [0.5, 0.3, 0.2]}, {"type": "stacking"}, ] """ results = [] for config in configurations: ensemble = self._create_ensemble(retrievers, config) metrics = { "config": str(config), "ndcg@5": [], "recall@5": [], "latency_ms": [] } for query_data in queries: query = query_data["query"] relevant = query_data["relevant_docs"] start = time.time() results_search = ensemble.search(query, top_k=5) latency = (time.time() - start) * 1000 retrieved_ids = [r["id"] for r in results_search] # Calculate metrics metrics["ndcg@5"].append(self._ndcg(retrieved_ids, relevant, k=5)) metrics["recall@5"].append(self._recall(retrieved_ids, relevant, k=5)) metrics["latency_ms"].append(latency) # Averages results.append({ "config": config, "ndcg@5": np.mean(metrics["ndcg@5"]), "recall@5": np.mean(metrics["recall@5"]), "latency_ms": np.mean(metrics["latency_ms"]) }) return pd.DataFrame(results).sort_values("ndcg@5", ascending=False)
Next Steps
Ensemble retrieval maximizes quality by combining multiple approaches. To go further:
- Hybrid Retrieval Fusion - Combining dense and sparse
- Query Routing - Route to the right sources
- Retrieval Fundamentals - Overview
Ensemble Retrieval with Ailog
Ailog automatically orchestrates multiple retrievers:
- Adaptive ensemble based on query type
- Optimized RRF fusion with learned weights
- Parallel search to minimize latency
- Integrated monitoring to optimize configurations
Try for free and get turnkey ensemble retrieval.
Tags
Related Posts
Hybrid Fusion: Combining Dense and Sparse Retrieval
Master hybrid fusion to combine semantic and lexical search. RRF, weighted fusion, and optimal combination strategies explained.
Query Routing: Direct Queries to the Right Source
Implement query routing to direct each query to the optimal data source. Classification, LLM routing, and advanced strategies explained.
Metadata Filtering: Refine RAG Search
Master metadata filtering for precise RAG searches. Filter types, indexing, combined queries, and optimization techniques.