5. Retrieval

Ensemble Retrieval: Combining Multiple Retrievers

March 10, 2026
Ailog Team

Implement ensemble retrieval to combine the strengths of multiple retrievers. Voting, stacking, and advanced fusion strategies.

Ensemble Retrieval: Combining Multiple Retrievers

Ensemble retrieval applies the machine learning ensemble principle to search systems: combining predictions from multiple models to achieve better results than any single model. This guide explores how to orchestrate multiple retrievers to maximize retrieval quality.

Why Ensemble Retrieval?

Each retriever has its blind spots:

RetrieverStrengthsWeaknesses
Dense (BGE)General semanticsRare terms
Dense (E5)MultilingualShort queries
Sparse (BM25)Exact matchingSynonyms
Sparse (TF-IDF)FastLess precise
Knowledge GraphRelationshipsLimited coverage

An ensemble compensates for each retriever's weaknesses by exploiting their complementary strengths.

Benchmark: Ensemble vs Single Retriever

ConfigurationNDCG@10Recall@10Latency
BGE alone0.680.7245ms
BM25 alone0.610.6812ms
BGE + BM250.740.8152ms
BGE + E5 + BM250.770.8485ms

The 3-retriever ensemble improves NDCG by 13% for only 2x the latency.

Ensemble Strategies

1. Voting (Hard Ensemble)

Each retriever "votes" for documents, keep those with the most votes:

DEVELOPERpython
from collections import Counter class VotingEnsemble: def __init__(self, retrievers: list): self.retrievers = retrievers def search(self, query: str, top_k: int = 5) -> list[dict]: # Collect votes from each retriever all_results = {} for retriever in self.retrievers: results = retriever.search(query, top_k=top_k * 2) for rank, result in enumerate(results): doc_id = result["id"] if doc_id not in all_results: all_results[doc_id] = { "content": result["content"], "votes": 0, "retrievers": [] } all_results[doc_id]["votes"] += 1 all_results[doc_id]["retrievers"].append(retriever.name) # Sort by vote count sorted_results = sorted( all_results.values(), key=lambda x: x["votes"], reverse=True ) return sorted_results[:top_k] # Example ensemble = VotingEnsemble([ DenseRetriever("bge"), DenseRetriever("e5"), SparseRetriever("bm25") ]) results = ensemble.search("How to configure OAuth?") for r in results: print(f"Votes: {r['votes']}, Retrievers: {r['retrievers']}")

2. Score Fusion (Soft Ensemble)

Combines normalized scores from each retriever:

DEVELOPERpython
import numpy as np class ScoreFusionEnsemble: def __init__( self, retrievers: list, weights: list[float] = None, normalization: str = "min_max" # "min_max", "z_score", "rank" ): self.retrievers = retrievers self.weights = weights or [1.0] * len(retrievers) self.normalization = normalization def search(self, query: str, top_k: int = 5) -> list[dict]: all_results = {} for retriever, weight in zip(self.retrievers, self.weights): results = retriever.search(query, top_k=top_k * 2) # Normalize scores scores = [r["score"] for r in results] normalized = self._normalize(scores) for result, norm_score in zip(results, normalized): doc_id = result["id"] if doc_id not in all_results: all_results[doc_id] = { "content": result["content"], "scores": {}, "weighted_sum": 0 } all_results[doc_id]["scores"][retriever.name] = norm_score all_results[doc_id]["weighted_sum"] += weight * norm_score # Sort by weighted score sorted_results = sorted( all_results.values(), key=lambda x: x["weighted_sum"], reverse=True ) return sorted_results[:top_k] def _normalize(self, scores: list[float]) -> list[float]: if not scores: return [] if self.normalization == "min_max": min_s, max_s = min(scores), max(scores) range_s = max_s - min_s if max_s != min_s else 1 return [(s - min_s) / range_s for s in scores] elif self.normalization == "z_score": mean = np.mean(scores) std = np.std(scores) or 1 return [(s - mean) / std for s in scores] elif self.normalization == "rank": # Rank-based score (0 to 1) n = len(scores) return [(n - i) / n for i in range(n)] # Example with custom weights ensemble = ScoreFusionEnsemble( retrievers=[dense_bge, dense_e5, sparse_bm25], weights=[0.4, 0.3, 0.3], normalization="min_max" )

3. Reciprocal Rank Fusion (RRF)

Combines rankings without requiring score normalization:

DEVELOPERpython
class RRFEnsemble: def __init__( self, retrievers: list, k: int = 60, weights: list[float] = None ): self.retrievers = retrievers self.k = k self.weights = weights or [1.0] * len(retrievers) def search(self, query: str, top_k: int = 5) -> list[dict]: rrf_scores = {} doc_contents = {} for retriever, weight in zip(self.retrievers, self.weights): results = retriever.search(query, top_k=top_k * 3) for rank, result in enumerate(results, start=1): doc_id = result["id"] doc_contents[doc_id] = result["content"] if doc_id not in rrf_scores: rrf_scores[doc_id] = 0 # Weighted RRF formula rrf_scores[doc_id] += weight / (self.k + rank) # Build results sorted_ids = sorted(rrf_scores.keys(), key=lambda x: rrf_scores[x], reverse=True) return [ { "id": doc_id, "content": doc_contents[doc_id], "rrf_score": rrf_scores[doc_id] } for doc_id in sorted_ids[:top_k] ] # Example rrf_ensemble = RRFEnsemble( retrievers=[dense_bge, sparse_bm25], k=60, weights=[0.6, 0.4] )

4. Stacking with Reranker

Uses a reranking model to combine results:

DEVELOPERpython
from sentence_transformers import CrossEncoder class StackedEnsemble: def __init__( self, retrievers: list, reranker_model: str = "cross-encoder/ms-marco-MiniLM-L-6-v2" ): self.retrievers = retrievers self.reranker = CrossEncoder(reranker_model) def search(self, query: str, top_k: int = 5, rerank_k: int = 20) -> list[dict]: # 1. Collect candidates from all retrievers candidates = {} for retriever in self.retrievers: results = retriever.search(query, top_k=rerank_k) for result in results: doc_id = result["id"] if doc_id not in candidates: candidates[doc_id] = result["content"] # 2. Rerank all candidates candidate_list = list(candidates.items()) pairs = [[query, content] for _, content in candidate_list] rerank_scores = self.reranker.predict(pairs) # 3. Build final results results = [ { "id": doc_id, "content": content, "rerank_score": float(score) } for (doc_id, content), score in zip(candidate_list, rerank_scores) ] return sorted(results, key=lambda x: x["rerank_score"], reverse=True)[:top_k]

5. Cascade Ensemble

Cascade approach: first retriever filters, subsequent ones refine:

DEVELOPERpython
class CascadeEnsemble: def __init__( self, fast_retriever, precise_retriever, cascade_threshold: float = 0.7 ): self.fast = fast_retriever self.precise = precise_retriever self.threshold = cascade_threshold def search(self, query: str, top_k: int = 5) -> list[dict]: # Stage 1: Fast retrieval (large recall) fast_results = self.fast.search(query, top_k=top_k * 4) # Check if results are confident enough max_score = max(r["score"] for r in fast_results) if fast_results else 0 if max_score >= self.threshold: # High confidence: return fast results return fast_results[:top_k] # Stage 2: Precise retrieval on candidates candidate_ids = [r["id"] for r in fast_results] precise_results = self.precise.search( query, top_k=top_k, filter_ids=candidate_ids # Search only among candidates ) return precise_results

Specialized Ensembles

Multi-Domain Ensemble

Use different retrievers based on domain:

DEVELOPERpython
class MultiDomainEnsemble: def __init__(self): self.domain_retrievers = { "technical": [ DenseRetriever("codesearch"), SparseRetriever("bm25") ], "general": [ DenseRetriever("bge"), DenseRetriever("e5") ], "multilingual": [ DenseRetriever("multilingual-e5"), DenseRetriever("mbert") ] } self.domain_classifier = DomainClassifier() def search(self, query: str, top_k: int = 5) -> list[dict]: # Detect domain domain = self.domain_classifier.predict(query) # Select appropriate retrievers retrievers = self.domain_retrievers.get(domain, self.domain_retrievers["general"]) # Ensemble on selected retrievers ensemble = RRFEnsemble(retrievers) return ensemble.search(query, top_k=top_k)

Adaptive Ensemble

Dynamically adjust weights based on query characteristics:

DEVELOPERpython
class AdaptiveEnsemble: def __init__(self, retrievers: list): self.retrievers = retrievers self.query_analyzer = QueryAnalyzer() def search(self, query: str, top_k: int = 5) -> list[dict]: # Analyze query query_features = self.query_analyzer.analyze(query) # Compute adaptive weights weights = self._compute_adaptive_weights(query_features) # Ensemble with adaptive weights ensemble = ScoreFusionEnsemble( self.retrievers, weights=weights ) return ensemble.search(query, top_k=top_k) def _compute_adaptive_weights(self, features: dict) -> list[float]: weights = [] for retriever in self.retrievers: weight = 1.0 # Dense performs well on long queries if retriever.type == "dense" and features["length"] > 10: weight *= 1.3 # Sparse performs well on technical terms if retriever.type == "sparse" and features["has_technical_terms"]: weight *= 1.4 # Boost if language matches if hasattr(retriever, "language") and retriever.language == features["language"]: weight *= 1.2 weights.append(weight) # Normalize total = sum(weights) return [w / total for w in weights] class QueryAnalyzer: def analyze(self, query: str) -> dict: return { "length": len(query.split()), "has_technical_terms": self._detect_technical(query), "language": self._detect_language(query), "is_question": query.strip().endswith("?") } def _detect_technical(self, query: str) -> bool: technical_patterns = ["api", "config", "error", "oauth", "webhook"] return any(p in query.lower() for p in technical_patterns) def _detect_language(self, query: str) -> str: # Simplified - use langdetect in production french_words = ["comment", "pourquoi", "quel", "est-ce"] return "fr" if any(w in query.lower() for w in french_words) else "en"

Performance Optimization

Parallel Search

DEVELOPERpython
import asyncio from concurrent.futures import ThreadPoolExecutor class ParallelEnsemble: def __init__(self, retrievers: list, max_workers: int = 4): self.retrievers = retrievers self.executor = ThreadPoolExecutor(max_workers=max_workers) async def search(self, query: str, top_k: int = 5) -> list[dict]: loop = asyncio.get_event_loop() # Launch all searches in parallel tasks = [ loop.run_in_executor( self.executor, retriever.search, query, top_k * 2 ) for retriever in self.retrievers ] # Wait for all results all_results = await asyncio.gather(*tasks) # Merge with RRF return self._rrf_fusion(all_results, top_k) def _rrf_fusion(self, all_results: list, top_k: int, k: int = 60) -> list[dict]: rrf_scores = {} contents = {} for results in all_results: for rank, result in enumerate(results, start=1): doc_id = result["id"] contents[doc_id] = result["content"] rrf_scores[doc_id] = rrf_scores.get(doc_id, 0) + 1 / (k + rank) sorted_ids = sorted(rrf_scores.keys(), key=lambda x: rrf_scores[x], reverse=True) return [ {"id": doc_id, "content": contents[doc_id], "score": rrf_scores[doc_id]} for doc_id in sorted_ids[:top_k] ]

Smart Caching

DEVELOPERpython
class CachedEnsemble: def __init__(self, ensemble, cache_ttl: int = 3600): self.ensemble = ensemble self.cache = {} self.cache_ttl = cache_ttl def search(self, query: str, top_k: int = 5) -> list[dict]: cache_key = f"{query}:{top_k}" # Check cache if cache_key in self.cache: cached, timestamp = self.cache[cache_key] if time.time() - timestamp < self.cache_ttl: return cached # Execute search results = self.ensemble.search(query, top_k) # Cache results self.cache[cache_key] = (results, time.time()) return results

Evaluation and Tuning

DEVELOPERpython
class EnsembleEvaluator: def evaluate_configurations( self, queries: list[dict], retrievers: list, configurations: list[dict] ) -> pd.DataFrame: """ Test different ensemble configurations configurations = [ {"type": "rrf", "k": 60}, {"type": "score_fusion", "weights": [0.5, 0.3, 0.2]}, {"type": "stacking"}, ] """ results = [] for config in configurations: ensemble = self._create_ensemble(retrievers, config) metrics = { "config": str(config), "ndcg@5": [], "recall@5": [], "latency_ms": [] } for query_data in queries: query = query_data["query"] relevant = query_data["relevant_docs"] start = time.time() results_search = ensemble.search(query, top_k=5) latency = (time.time() - start) * 1000 retrieved_ids = [r["id"] for r in results_search] # Calculate metrics metrics["ndcg@5"].append(self._ndcg(retrieved_ids, relevant, k=5)) metrics["recall@5"].append(self._recall(retrieved_ids, relevant, k=5)) metrics["latency_ms"].append(latency) # Averages results.append({ "config": config, "ndcg@5": np.mean(metrics["ndcg@5"]), "recall@5": np.mean(metrics["recall@5"]), "latency_ms": np.mean(metrics["latency_ms"]) }) return pd.DataFrame(results).sort_values("ndcg@5", ascending=False)

Next Steps

Ensemble retrieval maximizes quality by combining multiple approaches. To go further:


Ensemble Retrieval with Ailog

Ailog automatically orchestrates multiple retrievers:

  • Adaptive ensemble based on query type
  • Optimized RRF fusion with learned weights
  • Parallel search to minimize latency
  • Integrated monitoring to optimize configurations

Try for free and get turnkey ensemble retrieval.

Tags

ragretrievalensemblemulti-retrieverfusion

Related Posts

Ailog Assistant

Ici pour vous aider

Salut ! Pose-moi des questions sur Ailog et comment intégrer votre RAG dans vos projets !