Ensemble Retrieval : Combiner plusieurs retrievers
Implémentez l'ensemble retrieval pour combiner les forces de plusieurs retrievers. Voting, stacking et stratégies de fusion avancées.
Ensemble Retrieval : Combiner plusieurs retrievers
L'ensemble retrieval applique le principe du machine learning ensemble aux systèmes de recherche : combiner les prédictions de plusieurs modèles pour obtenir de meilleurs résultats que chaque modèle individuellement. Ce guide explore comment orchestrer plusieurs retrievers pour maximiser la qualité du retrieval.
Pourquoi l'ensemble retrieval ?
Chaque retriever a ses angles morts :
| Retriever | Forces | Faiblesses |
|---|---|---|
| Dense (BGE) | Sémantique générale | Termes rares |
| Dense (E5) | Multilingue | Requêtes courtes |
| Sparse (BM25) | Correspondance exacte | Synonymes |
| Sparse (TF-IDF) | Rapide | Moins précis |
| Knowledge Graph | Relations | Couverture limitée |
Un ensemble compense les faiblesses de chaque retriever en exploitant leurs forces complémentaires.
Benchmark : Ensemble vs Single Retriever
| Configuration | NDCG@10 | Recall@10 | Latence |
|---|---|---|---|
| BGE seul | 0.68 | 0.72 | 45ms |
| BM25 seul | 0.61 | 0.68 | 12ms |
| BGE + BM25 | 0.74 | 0.81 | 52ms |
| BGE + E5 + BM25 | 0.77 | 0.84 | 85ms |
L'ensemble à 3 retrievers améliore le NDCG de 13% pour seulement 2x la latence.
Stratégies d'ensemble
1. Voting (Hard Ensemble)
Chaque retriever "vote" pour les documents, on garde ceux avec le plus de votes :
DEVELOPERpythonfrom collections import Counter class VotingEnsemble: def __init__(self, retrievers: list): self.retrievers = retrievers def search(self, query: str, top_k: int = 5) -> list[dict]: # Collecter les votes de chaque retriever all_results = {} for retriever in self.retrievers: results = retriever.search(query, top_k=top_k * 2) for rank, result in enumerate(results): doc_id = result["id"] if doc_id not in all_results: all_results[doc_id] = { "content": result["content"], "votes": 0, "retrievers": [] } all_results[doc_id]["votes"] += 1 all_results[doc_id]["retrievers"].append(retriever.name) # Trier par nombre de votes sorted_results = sorted( all_results.values(), key=lambda x: x["votes"], reverse=True ) return sorted_results[:top_k] # Exemple ensemble = VotingEnsemble([ DenseRetriever("bge"), DenseRetriever("e5"), SparseRetriever("bm25") ]) results = ensemble.search("Comment configurer OAuth ?") for r in results: print(f"Votes: {r['votes']}, Retrievers: {r['retrievers']}")
2. Score Fusion (Soft Ensemble)
Combine les scores normalisés de chaque retriever :
DEVELOPERpythonimport numpy as np class ScoreFusionEnsemble: def __init__( self, retrievers: list, weights: list[float] = None, normalization: str = "min_max" # "min_max", "z_score", "rank" ): self.retrievers = retrievers self.weights = weights or [1.0] * len(retrievers) self.normalization = normalization def search(self, query: str, top_k: int = 5) -> list[dict]: all_results = {} for retriever, weight in zip(self.retrievers, self.weights): results = retriever.search(query, top_k=top_k * 2) # Normaliser les scores scores = [r["score"] for r in results] normalized = self._normalize(scores) for result, norm_score in zip(results, normalized): doc_id = result["id"] if doc_id not in all_results: all_results[doc_id] = { "content": result["content"], "scores": {}, "weighted_sum": 0 } all_results[doc_id]["scores"][retriever.name] = norm_score all_results[doc_id]["weighted_sum"] += weight * norm_score # Trier par score pondéré sorted_results = sorted( all_results.values(), key=lambda x: x["weighted_sum"], reverse=True ) return sorted_results[:top_k] def _normalize(self, scores: list[float]) -> list[float]: if not scores: return [] if self.normalization == "min_max": min_s, max_s = min(scores), max(scores) range_s = max_s - min_s if max_s != min_s else 1 return [(s - min_s) / range_s for s in scores] elif self.normalization == "z_score": mean = np.mean(scores) std = np.std(scores) or 1 return [(s - mean) / std for s in scores] elif self.normalization == "rank": # Score basé sur le rang (0 à 1) n = len(scores) return [(n - i) / n for i in range(n)] # Exemple avec poids personnalisés ensemble = ScoreFusionEnsemble( retrievers=[dense_bge, dense_e5, sparse_bm25], weights=[0.4, 0.3, 0.3], normalization="min_max" )
3. Reciprocal Rank Fusion (RRF)
Combine les rankings sans nécessiter de normalisation des scores :
DEVELOPERpythonclass RRFEnsemble: def __init__( self, retrievers: list, k: int = 60, weights: list[float] = None ): self.retrievers = retrievers self.k = k self.weights = weights or [1.0] * len(retrievers) def search(self, query: str, top_k: int = 5) -> list[dict]: rrf_scores = {} doc_contents = {} for retriever, weight in zip(self.retrievers, self.weights): results = retriever.search(query, top_k=top_k * 3) for rank, result in enumerate(results, start=1): doc_id = result["id"] doc_contents[doc_id] = result["content"] if doc_id not in rrf_scores: rrf_scores[doc_id] = 0 # Formule RRF pondérée rrf_scores[doc_id] += weight / (self.k + rank) # Construire les résultats sorted_ids = sorted(rrf_scores.keys(), key=lambda x: rrf_scores[x], reverse=True) return [ { "id": doc_id, "content": doc_contents[doc_id], "rrf_score": rrf_scores[doc_id] } for doc_id in sorted_ids[:top_k] ] # Exemple rrf_ensemble = RRFEnsemble( retrievers=[dense_bge, sparse_bm25], k=60, weights=[0.6, 0.4] )
4. Stacking avec Reranker
Utilise un modèle de reranking pour combiner les résultats :
DEVELOPERpythonfrom sentence_transformers import CrossEncoder class StackedEnsemble: def __init__( self, retrievers: list, reranker_model: str = "cross-encoder/ms-marco-MiniLM-L-6-v2" ): self.retrievers = retrievers self.reranker = CrossEncoder(reranker_model) def search(self, query: str, top_k: int = 5, rerank_k: int = 20) -> list[dict]: # 1. Collecter les candidats de tous les retrievers candidates = {} for retriever in self.retrievers: results = retriever.search(query, top_k=rerank_k) for result in results: doc_id = result["id"] if doc_id not in candidates: candidates[doc_id] = result["content"] # 2. Reranker tous les candidats candidate_list = list(candidates.items()) pairs = [[query, content] for _, content in candidate_list] rerank_scores = self.reranker.predict(pairs) # 3. Construire les résultats finaux results = [ { "id": doc_id, "content": content, "rerank_score": float(score) } for (doc_id, content), score in zip(candidate_list, rerank_scores) ] return sorted(results, key=lambda x: x["rerank_score"], reverse=True)[:top_k]
5. Cascade Ensemble
Approche en cascade : le premier retriever filtre, les suivants affinent :
DEVELOPERpythonclass CascadeEnsemble: def __init__( self, fast_retriever, precise_retriever, cascade_threshold: float = 0.7 ): self.fast = fast_retriever self.precise = precise_retriever self.threshold = cascade_threshold def search(self, query: str, top_k: int = 5) -> list[dict]: # Étape 1 : Retrieval rapide (large recall) fast_results = self.fast.search(query, top_k=top_k * 4) # Vérifier si les résultats sont suffisamment confiants max_score = max(r["score"] for r in fast_results) if fast_results else 0 if max_score >= self.threshold: # Haute confiance : retourner les résultats rapides return fast_results[:top_k] # Étape 2 : Retrieval précis sur les candidats candidate_ids = [r["id"] for r in fast_results] precise_results = self.precise.search( query, top_k=top_k, filter_ids=candidate_ids # Rechercher uniquement parmi les candidats ) return precise_results
Ensembles spécialisés
Ensemble multi-domaine
Utiliser différents retrievers selon le domaine :
DEVELOPERpythonclass MultiDomainEnsemble: def __init__(self): self.domain_retrievers = { "technical": [ DenseRetriever("codesearch"), SparseRetriever("bm25") ], "general": [ DenseRetriever("bge"), DenseRetriever("e5") ], "multilingual": [ DenseRetriever("multilingual-e5"), DenseRetriever("mbert") ] } self.domain_classifier = DomainClassifier() def search(self, query: str, top_k: int = 5) -> list[dict]: # Détecter le domaine domain = self.domain_classifier.predict(query) # Sélectionner les retrievers appropriés retrievers = self.domain_retrievers.get(domain, self.domain_retrievers["general"]) # Ensemble sur les retrievers sélectionnés ensemble = RRFEnsemble(retrievers) return ensemble.search(query, top_k=top_k)
Ensemble adaptatif
Ajuster les poids dynamiquement selon les caractéristiques de la requête :
DEVELOPERpythonclass AdaptiveEnsemble: def __init__(self, retrievers: list): self.retrievers = retrievers self.query_analyzer = QueryAnalyzer() def search(self, query: str, top_k: int = 5) -> list[dict]: # Analyser la requête query_features = self.query_analyzer.analyze(query) # Calculer les poids adaptatifs weights = self._compute_adaptive_weights(query_features) # Ensemble avec poids adaptatifs ensemble = ScoreFusionEnsemble( self.retrievers, weights=weights ) return ensemble.search(query, top_k=top_k) def _compute_adaptive_weights(self, features: dict) -> list[float]: weights = [] for retriever in self.retrievers: weight = 1.0 # Dense performant sur requêtes longues if retriever.type == "dense" and features["length"] > 10: weight *= 1.3 # Sparse performant sur termes techniques if retriever.type == "sparse" and features["has_technical_terms"]: weight *= 1.4 # Boost si la langue correspond if hasattr(retriever, "language") and retriever.language == features["language"]: weight *= 1.2 weights.append(weight) # Normaliser total = sum(weights) return [w / total for w in weights] class QueryAnalyzer: def analyze(self, query: str) -> dict: return { "length": len(query.split()), "has_technical_terms": self._detect_technical(query), "language": self._detect_language(query), "is_question": query.strip().endswith("?") } def _detect_technical(self, query: str) -> bool: technical_patterns = ["api", "config", "error", "oauth", "webhook"] return any(p in query.lower() for p in technical_patterns) def _detect_language(self, query: str) -> str: # Simplifié - utilisez langdetect en production french_words = ["comment", "pourquoi", "quel", "est-ce"] return "fr" if any(w in query.lower() for w in french_words) else "en"
Optimisation des performances
Recherche parallèle
DEVELOPERpythonimport asyncio from concurrent.futures import ThreadPoolExecutor class ParallelEnsemble: def __init__(self, retrievers: list, max_workers: int = 4): self.retrievers = retrievers self.executor = ThreadPoolExecutor(max_workers=max_workers) async def search(self, query: str, top_k: int = 5) -> list[dict]: loop = asyncio.get_event_loop() # Lancer toutes les recherches en parallèle tasks = [ loop.run_in_executor( self.executor, retriever.search, query, top_k * 2 ) for retriever in self.retrievers ] # Attendre tous les résultats all_results = await asyncio.gather(*tasks) # Fusionner avec RRF return self._rrf_fusion(all_results, top_k) def _rrf_fusion(self, all_results: list, top_k: int, k: int = 60) -> list[dict]: rrf_scores = {} contents = {} for results in all_results: for rank, result in enumerate(results, start=1): doc_id = result["id"] contents[doc_id] = result["content"] rrf_scores[doc_id] = rrf_scores.get(doc_id, 0) + 1 / (k + rank) sorted_ids = sorted(rrf_scores.keys(), key=lambda x: rrf_scores[x], reverse=True) return [ {"id": doc_id, "content": contents[doc_id], "score": rrf_scores[doc_id]} for doc_id in sorted_ids[:top_k] ]
Cache intelligent
DEVELOPERpythonclass CachedEnsemble: def __init__(self, ensemble, cache_ttl: int = 3600): self.ensemble = ensemble self.cache = {} self.cache_ttl = cache_ttl def search(self, query: str, top_k: int = 5) -> list[dict]: cache_key = f"{query}:{top_k}" # Vérifier le cache if cache_key in self.cache: cached, timestamp = self.cache[cache_key] if time.time() - timestamp < self.cache_ttl: return cached # Exécuter la recherche results = self.ensemble.search(query, top_k) # Mettre en cache self.cache[cache_key] = (results, time.time()) return results
Évaluation et tuning
DEVELOPERpythonclass EnsembleEvaluator: def evaluate_configurations( self, queries: list[dict], retrievers: list, configurations: list[dict] ) -> pd.DataFrame: """ Tester différentes configurations d'ensemble configurations = [ {"type": "rrf", "k": 60}, {"type": "score_fusion", "weights": [0.5, 0.3, 0.2]}, {"type": "stacking"}, ] """ results = [] for config in configurations: ensemble = self._create_ensemble(retrievers, config) metrics = { "config": str(config), "ndcg@5": [], "recall@5": [], "latency_ms": [] } for query_data in queries: query = query_data["query"] relevant = query_data["relevant_docs"] start = time.time() results_search = ensemble.search(query, top_k=5) latency = (time.time() - start) * 1000 retrieved_ids = [r["id"] for r in results_search] # Calculer les métriques metrics["ndcg@5"].append(self._ndcg(retrieved_ids, relevant, k=5)) metrics["recall@5"].append(self._recall(retrieved_ids, relevant, k=5)) metrics["latency_ms"].append(latency) # Moyennes results.append({ "config": config, "ndcg@5": np.mean(metrics["ndcg@5"]), "recall@5": np.mean(metrics["recall@5"]), "latency_ms": np.mean(metrics["latency_ms"]) }) return pd.DataFrame(results).sort_values("ndcg@5", ascending=False)
Prochaines étapes
L'ensemble retrieval maximise la qualité en combinant plusieurs approches. Pour aller plus loin :
- Hybrid Retrieval Fusion - Combiner dense et sparse
- Query Routing - Router vers les bonnes sources
- Fondamentaux du Retrieval - Vue d'ensemble
Ensemble retrieval avec Ailog
Ailog orchestre automatiquement plusieurs retrievers :
- Ensemble adaptatif selon le type de requête
- Fusion RRF optimisée avec poids appris
- Recherche parallèle pour minimiser la latence
- Monitoring intégré pour optimiser les configurations
Testez gratuitement et bénéficiez d'un retrieval ensemble clé en main.
FAQ
Tags
Articles connexes
Fusion hybride : Combiner dense et sparse retrieval
Maîtrisez la fusion hybride pour combiner recherche sémantique et lexicale. RRF, weighted fusion et stratégies de combinaison optimales.
Query Routing : Orienter les requêtes vers la bonne source
Implémentez le query routing pour diriger chaque requête vers la source de données optimale. Classification, routage LLM et stratégies avancées.
Filtrage par métadonnées : Affiner la recherche RAG
Maîtrisez le filtrage par métadonnées pour des recherches RAG précises. Types de filtres, indexation, requêtes combinées et optimisation.