5. Retrieval

Ensemble Retrieval : Combiner plusieurs retrievers

10 mars 2026
Équipe Ailog

Implémentez l'ensemble retrieval pour combiner les forces de plusieurs retrievers. Voting, stacking et stratégies de fusion avancées.

Ensemble Retrieval : Combiner plusieurs retrievers

L'ensemble retrieval applique le principe du machine learning ensemble aux systèmes de recherche : combiner les prédictions de plusieurs modèles pour obtenir de meilleurs résultats que chaque modèle individuellement. Ce guide explore comment orchestrer plusieurs retrievers pour maximiser la qualité du retrieval.

Pourquoi l'ensemble retrieval ?

Chaque retriever a ses angles morts :

RetrieverForcesFaiblesses
Dense (BGE)Sémantique généraleTermes rares
Dense (E5)MultilingueRequêtes courtes
Sparse (BM25)Correspondance exacteSynonymes
Sparse (TF-IDF)RapideMoins précis
Knowledge GraphRelationsCouverture limitée

Un ensemble compense les faiblesses de chaque retriever en exploitant leurs forces complémentaires.

Benchmark : Ensemble vs Single Retriever

ConfigurationNDCG@10Recall@10Latence
BGE seul0.680.7245ms
BM25 seul0.610.6812ms
BGE + BM250.740.8152ms
BGE + E5 + BM250.770.8485ms

L'ensemble à 3 retrievers améliore le NDCG de 13% pour seulement 2x la latence.

Stratégies d'ensemble

1. Voting (Hard Ensemble)

Chaque retriever "vote" pour les documents, on garde ceux avec le plus de votes :

DEVELOPERpython
from collections import Counter class VotingEnsemble: def __init__(self, retrievers: list): self.retrievers = retrievers def search(self, query: str, top_k: int = 5) -> list[dict]: # Collecter les votes de chaque retriever all_results = {} for retriever in self.retrievers: results = retriever.search(query, top_k=top_k * 2) for rank, result in enumerate(results): doc_id = result["id"] if doc_id not in all_results: all_results[doc_id] = { "content": result["content"], "votes": 0, "retrievers": [] } all_results[doc_id]["votes"] += 1 all_results[doc_id]["retrievers"].append(retriever.name) # Trier par nombre de votes sorted_results = sorted( all_results.values(), key=lambda x: x["votes"], reverse=True ) return sorted_results[:top_k] # Exemple ensemble = VotingEnsemble([ DenseRetriever("bge"), DenseRetriever("e5"), SparseRetriever("bm25") ]) results = ensemble.search("Comment configurer OAuth ?") for r in results: print(f"Votes: {r['votes']}, Retrievers: {r['retrievers']}")

2. Score Fusion (Soft Ensemble)

Combine les scores normalisés de chaque retriever :

DEVELOPERpython
import numpy as np class ScoreFusionEnsemble: def __init__( self, retrievers: list, weights: list[float] = None, normalization: str = "min_max" # "min_max", "z_score", "rank" ): self.retrievers = retrievers self.weights = weights or [1.0] * len(retrievers) self.normalization = normalization def search(self, query: str, top_k: int = 5) -> list[dict]: all_results = {} for retriever, weight in zip(self.retrievers, self.weights): results = retriever.search(query, top_k=top_k * 2) # Normaliser les scores scores = [r["score"] for r in results] normalized = self._normalize(scores) for result, norm_score in zip(results, normalized): doc_id = result["id"] if doc_id not in all_results: all_results[doc_id] = { "content": result["content"], "scores": {}, "weighted_sum": 0 } all_results[doc_id]["scores"][retriever.name] = norm_score all_results[doc_id]["weighted_sum"] += weight * norm_score # Trier par score pondéré sorted_results = sorted( all_results.values(), key=lambda x: x["weighted_sum"], reverse=True ) return sorted_results[:top_k] def _normalize(self, scores: list[float]) -> list[float]: if not scores: return [] if self.normalization == "min_max": min_s, max_s = min(scores), max(scores) range_s = max_s - min_s if max_s != min_s else 1 return [(s - min_s) / range_s for s in scores] elif self.normalization == "z_score": mean = np.mean(scores) std = np.std(scores) or 1 return [(s - mean) / std for s in scores] elif self.normalization == "rank": # Score basé sur le rang (0 à 1) n = len(scores) return [(n - i) / n for i in range(n)] # Exemple avec poids personnalisés ensemble = ScoreFusionEnsemble( retrievers=[dense_bge, dense_e5, sparse_bm25], weights=[0.4, 0.3, 0.3], normalization="min_max" )

3. Reciprocal Rank Fusion (RRF)

Combine les rankings sans nécessiter de normalisation des scores :

DEVELOPERpython
class RRFEnsemble: def __init__( self, retrievers: list, k: int = 60, weights: list[float] = None ): self.retrievers = retrievers self.k = k self.weights = weights or [1.0] * len(retrievers) def search(self, query: str, top_k: int = 5) -> list[dict]: rrf_scores = {} doc_contents = {} for retriever, weight in zip(self.retrievers, self.weights): results = retriever.search(query, top_k=top_k * 3) for rank, result in enumerate(results, start=1): doc_id = result["id"] doc_contents[doc_id] = result["content"] if doc_id not in rrf_scores: rrf_scores[doc_id] = 0 # Formule RRF pondérée rrf_scores[doc_id] += weight / (self.k + rank) # Construire les résultats sorted_ids = sorted(rrf_scores.keys(), key=lambda x: rrf_scores[x], reverse=True) return [ { "id": doc_id, "content": doc_contents[doc_id], "rrf_score": rrf_scores[doc_id] } for doc_id in sorted_ids[:top_k] ] # Exemple rrf_ensemble = RRFEnsemble( retrievers=[dense_bge, sparse_bm25], k=60, weights=[0.6, 0.4] )

4. Stacking avec Reranker

Utilise un modèle de reranking pour combiner les résultats :

DEVELOPERpython
from sentence_transformers import CrossEncoder class StackedEnsemble: def __init__( self, retrievers: list, reranker_model: str = "cross-encoder/ms-marco-MiniLM-L-6-v2" ): self.retrievers = retrievers self.reranker = CrossEncoder(reranker_model) def search(self, query: str, top_k: int = 5, rerank_k: int = 20) -> list[dict]: # 1. Collecter les candidats de tous les retrievers candidates = {} for retriever in self.retrievers: results = retriever.search(query, top_k=rerank_k) for result in results: doc_id = result["id"] if doc_id not in candidates: candidates[doc_id] = result["content"] # 2. Reranker tous les candidats candidate_list = list(candidates.items()) pairs = [[query, content] for _, content in candidate_list] rerank_scores = self.reranker.predict(pairs) # 3. Construire les résultats finaux results = [ { "id": doc_id, "content": content, "rerank_score": float(score) } for (doc_id, content), score in zip(candidate_list, rerank_scores) ] return sorted(results, key=lambda x: x["rerank_score"], reverse=True)[:top_k]

5. Cascade Ensemble

Approche en cascade : le premier retriever filtre, les suivants affinent :

DEVELOPERpython
class CascadeEnsemble: def __init__( self, fast_retriever, precise_retriever, cascade_threshold: float = 0.7 ): self.fast = fast_retriever self.precise = precise_retriever self.threshold = cascade_threshold def search(self, query: str, top_k: int = 5) -> list[dict]: # Étape 1 : Retrieval rapide (large recall) fast_results = self.fast.search(query, top_k=top_k * 4) # Vérifier si les résultats sont suffisamment confiants max_score = max(r["score"] for r in fast_results) if fast_results else 0 if max_score >= self.threshold: # Haute confiance : retourner les résultats rapides return fast_results[:top_k] # Étape 2 : Retrieval précis sur les candidats candidate_ids = [r["id"] for r in fast_results] precise_results = self.precise.search( query, top_k=top_k, filter_ids=candidate_ids # Rechercher uniquement parmi les candidats ) return precise_results

Ensembles spécialisés

Ensemble multi-domaine

Utiliser différents retrievers selon le domaine :

DEVELOPERpython
class MultiDomainEnsemble: def __init__(self): self.domain_retrievers = { "technical": [ DenseRetriever("codesearch"), SparseRetriever("bm25") ], "general": [ DenseRetriever("bge"), DenseRetriever("e5") ], "multilingual": [ DenseRetriever("multilingual-e5"), DenseRetriever("mbert") ] } self.domain_classifier = DomainClassifier() def search(self, query: str, top_k: int = 5) -> list[dict]: # Détecter le domaine domain = self.domain_classifier.predict(query) # Sélectionner les retrievers appropriés retrievers = self.domain_retrievers.get(domain, self.domain_retrievers["general"]) # Ensemble sur les retrievers sélectionnés ensemble = RRFEnsemble(retrievers) return ensemble.search(query, top_k=top_k)

Ensemble adaptatif

Ajuster les poids dynamiquement selon les caractéristiques de la requête :

DEVELOPERpython
class AdaptiveEnsemble: def __init__(self, retrievers: list): self.retrievers = retrievers self.query_analyzer = QueryAnalyzer() def search(self, query: str, top_k: int = 5) -> list[dict]: # Analyser la requête query_features = self.query_analyzer.analyze(query) # Calculer les poids adaptatifs weights = self._compute_adaptive_weights(query_features) # Ensemble avec poids adaptatifs ensemble = ScoreFusionEnsemble( self.retrievers, weights=weights ) return ensemble.search(query, top_k=top_k) def _compute_adaptive_weights(self, features: dict) -> list[float]: weights = [] for retriever in self.retrievers: weight = 1.0 # Dense performant sur requêtes longues if retriever.type == "dense" and features["length"] > 10: weight *= 1.3 # Sparse performant sur termes techniques if retriever.type == "sparse" and features["has_technical_terms"]: weight *= 1.4 # Boost si la langue correspond if hasattr(retriever, "language") and retriever.language == features["language"]: weight *= 1.2 weights.append(weight) # Normaliser total = sum(weights) return [w / total for w in weights] class QueryAnalyzer: def analyze(self, query: str) -> dict: return { "length": len(query.split()), "has_technical_terms": self._detect_technical(query), "language": self._detect_language(query), "is_question": query.strip().endswith("?") } def _detect_technical(self, query: str) -> bool: technical_patterns = ["api", "config", "error", "oauth", "webhook"] return any(p in query.lower() for p in technical_patterns) def _detect_language(self, query: str) -> str: # Simplifié - utilisez langdetect en production french_words = ["comment", "pourquoi", "quel", "est-ce"] return "fr" if any(w in query.lower() for w in french_words) else "en"

Optimisation des performances

Recherche parallèle

DEVELOPERpython
import asyncio from concurrent.futures import ThreadPoolExecutor class ParallelEnsemble: def __init__(self, retrievers: list, max_workers: int = 4): self.retrievers = retrievers self.executor = ThreadPoolExecutor(max_workers=max_workers) async def search(self, query: str, top_k: int = 5) -> list[dict]: loop = asyncio.get_event_loop() # Lancer toutes les recherches en parallèle tasks = [ loop.run_in_executor( self.executor, retriever.search, query, top_k * 2 ) for retriever in self.retrievers ] # Attendre tous les résultats all_results = await asyncio.gather(*tasks) # Fusionner avec RRF return self._rrf_fusion(all_results, top_k) def _rrf_fusion(self, all_results: list, top_k: int, k: int = 60) -> list[dict]: rrf_scores = {} contents = {} for results in all_results: for rank, result in enumerate(results, start=1): doc_id = result["id"] contents[doc_id] = result["content"] rrf_scores[doc_id] = rrf_scores.get(doc_id, 0) + 1 / (k + rank) sorted_ids = sorted(rrf_scores.keys(), key=lambda x: rrf_scores[x], reverse=True) return [ {"id": doc_id, "content": contents[doc_id], "score": rrf_scores[doc_id]} for doc_id in sorted_ids[:top_k] ]

Cache intelligent

DEVELOPERpython
class CachedEnsemble: def __init__(self, ensemble, cache_ttl: int = 3600): self.ensemble = ensemble self.cache = {} self.cache_ttl = cache_ttl def search(self, query: str, top_k: int = 5) -> list[dict]: cache_key = f"{query}:{top_k}" # Vérifier le cache if cache_key in self.cache: cached, timestamp = self.cache[cache_key] if time.time() - timestamp < self.cache_ttl: return cached # Exécuter la recherche results = self.ensemble.search(query, top_k) # Mettre en cache self.cache[cache_key] = (results, time.time()) return results

Évaluation et tuning

DEVELOPERpython
class EnsembleEvaluator: def evaluate_configurations( self, queries: list[dict], retrievers: list, configurations: list[dict] ) -> pd.DataFrame: """ Tester différentes configurations d'ensemble configurations = [ {"type": "rrf", "k": 60}, {"type": "score_fusion", "weights": [0.5, 0.3, 0.2]}, {"type": "stacking"}, ] """ results = [] for config in configurations: ensemble = self._create_ensemble(retrievers, config) metrics = { "config": str(config), "ndcg@5": [], "recall@5": [], "latency_ms": [] } for query_data in queries: query = query_data["query"] relevant = query_data["relevant_docs"] start = time.time() results_search = ensemble.search(query, top_k=5) latency = (time.time() - start) * 1000 retrieved_ids = [r["id"] for r in results_search] # Calculer les métriques metrics["ndcg@5"].append(self._ndcg(retrieved_ids, relevant, k=5)) metrics["recall@5"].append(self._recall(retrieved_ids, relevant, k=5)) metrics["latency_ms"].append(latency) # Moyennes results.append({ "config": config, "ndcg@5": np.mean(metrics["ndcg@5"]), "recall@5": np.mean(metrics["recall@5"]), "latency_ms": np.mean(metrics["latency_ms"]) }) return pd.DataFrame(results).sort_values("ndcg@5", ascending=False)

Prochaines étapes

L'ensemble retrieval maximise la qualité en combinant plusieurs approches. Pour aller plus loin :


Ensemble retrieval avec Ailog

Ailog orchestre automatiquement plusieurs retrievers :

  • Ensemble adaptatif selon le type de requête
  • Fusion RRF optimisée avec poids appris
  • Recherche parallèle pour minimiser la latence
  • Monitoring intégré pour optimiser les configurations

Testez gratuitement et bénéficiez d'un retrieval ensemble clé en main.

FAQ

L'ensemble retrieval est recommandé quand votre corpus contient des types de contenus variés (technique, conversationnel, structuré) ou quand les requêtes utilisateurs sont hétérogènes. Si vos documents et requêtes sont homogènes, un retriever dense bien calibré peut suffire avec une latence réduite.
RRF (Reciprocal Rank Fusion) est plus robuste car il ne nécessite pas de normalisation des scores entre retrievers. Privilégiez-le quand les retrievers ont des échelles de scores très différentes. La fusion par scores normalisés fonctionne mieux quand vous voulez pondérer finement la contribution de chaque retriever.
Avec une exécution parallèle, l'overhead est minimal (10-20ms). La latence totale est dominée par le retriever le plus lent. Utilisez un cache pour les requêtes fréquentes et envisagez une approche cascade pour les cas où la latence est critique.
Commencez avec des poids égaux puis ajustez empiriquement sur un ensemble de test représentatif. Mesurez le NDCG@10 et le Recall@10 pour chaque configuration. Les poids peuvent aussi être appris automatiquement via une optimisation sur données de feedback utilisateur.
Oui, mais au-delà de 3-4 retrievers, les gains marginaux diminuent fortement tandis que la complexité augmente. Chaque retriever ajouté doit apporter une complémentarité réelle. Mesurez toujours l'amélioration effective avant d'ajouter un nouveau retriever à l'ensemble.

Tags

ragretrievalensemblemulti-retrieverfusion

Articles connexes

Ailog Assistant

Ici pour vous aider

Salut ! Pose-moi des questions sur Ailog et comment intégrer votre RAG dans vos projets !