5. Retrieval

Ensemble Retrieval: Mehrere retrievers kombinieren

10. März 2026
Équipe Ailog

Implementieren Sie Ensemble Retrieval, um die Stärken mehrerer retrievers zu kombinieren. Voting, stacking und fortgeschrittene Fusionsstrategien.

Ensemble Retrieval : Combiner plusieurs retrievers

L'ensemble retrieval applique le principe du machine learning ensemble aux systèmes de recherche : combiner les prédictions de plusieurs modèles pour obtenir de meilleurs résultats que chaque modèle individuellement. Ce guide explore comment orchestrer plusieurs retrievers pour maximiser la qualité du retrieval.

Pourquoi l'ensemble retrieval ?

Chaque retriever a ses angles morts :

RetrieverForcesFaiblesses
Dense (BGE)Sémantique généraleTermes rares
Dense (E5)MultilingueRequêtes courtes
Sparse (BM25)Correspondance exacteSynonymes
Sparse (TF-IDF)RapideMoins précis
Knowledge GraphRelationsCouverture limitée

Un ensemble compense les faiblesses de chaque retriever en exploitant leurs forces complémentaires.

Benchmark : Ensemble vs Single Retriever

ConfigurationNDCG@10Recall@10Latence
BGE seul0.680.7245ms
BM25 seul0.610.6812ms
BGE + BM250.740.8152ms
BGE + E5 + BM250.770.8485ms

L'ensemble à 3 retrievers améliore le NDCG de 13% pour seulement 2x la latence.

Stratégies d'ensemble

1. Voting (Hard Ensemble)

Chaque retriever "vote" pour les documents, on garde ceux avec le plus de votes :

DEVELOPERpython
from collections import Counter class VotingEnsemble: def __init__(self, retrievers: list): self.retrievers = retrievers def search(self, query: str, top_k: int = 5) -> list[dict]: # Stimmen jedes retrievers sammeln all_results = {} for retriever in self.retrievers: results = retriever.search(query, top_k=top_k * 2) for rank, result in enumerate(results): doc_id = result["id"] if doc_id not in all_results: all_results[doc_id] = { "content": result["content"], "votes": 0, "retrievers": [] } all_results[doc_id]["votes"] += 1 all_results[doc_id]["retrievers"].append(retriever.name) # Nach Anzahl der Stimmen sortieren sorted_results = sorted( all_results.values(), key=lambda x: x["votes"], reverse=True ) return sorted_results[:top_k] # Beispiel ensemble = VotingEnsemble([ DenseRetriever("bge"), DenseRetriever("e5"), SparseRetriever("bm25") ]) results = ensemble.search("Comment configurer OAuth ?") for r in results: print(f"Votes: {r['votes']}, Retrievers: {r['retrievers']}")

2. Score Fusion (Soft Ensemble)

Combine les scores normalisés de chaque retriever :

DEVELOPERpython
import numpy as np class ScoreFusionEnsemble: def __init__( self, retrievers: list, weights: list[float] = None, normalization: str = "min_max" # "min_max", "z_score", "rank" ): self.retrievers = retrievers self.weights = weights or [1.0] * len(retrievers) self.normalization = normalization def search(self, query: str, top_k: int = 5) -> list[dict]: all_results = {} for retriever, weight in zip(self.retrievers, self.weights): results = retriever.search(query, top_k=top_k * 2) # Scores normalisieren scores = [r["score"] for r in results] normalized = self._normalize(scores) for result, norm_score in zip(results, normalized): doc_id = result["id"] if doc_id not in all_results: all_results[doc_id] = { "content": result["content"], "scores": {}, "weighted_sum": 0 } all_results[doc_id]["scores"][retriever.name] = norm_score all_results[doc_id]["weighted_sum"] += weight * norm_score # Trier par score pondéré sorted_results = sorted( all_results.values(), key=lambda x: x["weighted_sum"], reverse=True ) return sorted_results[:top_k] def _normalize(self, scores: list[float]) -> list[float]: if not scores: return [] if self.normalization == "min_max": min_s, max_s = min(scores), max(scores) range_s = max_s - min_s if max_s != min_s else 1 return [(s - min_s) / range_s for s in scores] elif self.normalization == "z_score": mean = np.mean(scores) std = np.std(scores) or 1 return [(s - mean) / std for s in scores] elif self.normalization == "rank": # Rangbasierter Score (0 bis 1) n = len(scores) return [(n - i) / n for i in range(n)] # Beispiel mit benutzerdefinierten Gewichten ensemble = ScoreFusionEnsemble( retrievers=[dense_bge, dense_e5, sparse_bm25], weights=[0.4, 0.3, 0.3], normalization="min_max" )

3. Reciprocal Rank Fusion (RRF)

Combine les rankings sans nécessiter de normalisation des scores :

DEVELOPERpython
class RRFEnsemble: def __init__( self, retrievers: list, k: int = 60, weights: list[float] = None ): self.retrievers = retrievers self.k = k self.weights = weights or [1.0] * len(retrievers) def search(self, query: str, top_k: int = 5) -> list[dict]: rrf_scores = {} doc_contents = {} for retriever, weight in zip(self.retrievers, self.weights): results = retriever.search(query, top_k=top_k * 3) for rank, result in enumerate(results, start=1): doc_id = result["id"] doc_contents[doc_id] = result["content"] if doc_id not in rrf_scores: rrf_scores[doc_id] = 0 # Gewichtete RRF-Formel rrf_scores[doc_id] += weight / (self.k + rank) # Construire les résultats sorted_ids = sorted(rrf_scores.keys(), key=lambda x: rrf_scores[x], reverse=True) return [ { "id": doc_id, "content": doc_contents[doc_id], "rrf_score": rrf_scores[doc_id] } for doc_id in sorted_ids[:top_k] ] # Beispiel rrf_ensemble = RRFEnsemble( retrievers=[dense_bge, sparse_bm25], k=60, weights=[0.6, 0.4] )

4. Stacking avec Reranker

Utilise un modèle de reranking pour combiner les résultats :

DEVELOPERpython
from sentence_transformers import CrossEncoder class StackedEnsemble: def __init__( self, retrievers: list, reranker_model: str = "cross-encoder/ms-marco-MiniLM-L-6-v2" ): self.retrievers = retrievers self.reranker = CrossEncoder(reranker_model) def search(self, query: str, top_k: int = 5, rerank_k: int = 20) -> list[dict]: # 1. Kandidaten von allen retrievern sammeln candidates = {} for retriever in self.retrievers: results = retriever.search(query, top_k=rerank_k) for result in results: doc_id = result["id"] if doc_id not in candidates: candidates[doc_id] = result["content"] # 2. Alle Kandidaten neu bewerten candidate_list = list(candidates.items()) pairs = [[query, content] for _, content in candidate_list] rerank_scores = self.reranker.predict(pairs) # 3. Endgültige Ergebnisse erstellen results = [ { "id": doc_id, "content": content, "rerank_score": float(score) } for (doc_id, content), score in zip(candidate_list, rerank_scores) ] return sorted(results, key=lambda x: x["rerank_score"], reverse=True)[:top_k]

5. Cascade Ensemble

Approche en cascade : le premier retriever filtre, les suivants affinent :

DEVELOPERpython
class CascadeEnsemble: def __init__( self, fast_retriever, precise_retriever, cascade_threshold: float = 0.7 ): self.fast = fast_retriever self.precise = precise_retriever self.threshold = cascade_threshold def search(self, query: str, top_k: int = 5) -> list[dict]: # Schritt 1: Schnelle retrieval (großer Recall) fast_results = self.fast.search(query, top_k=top_k * 4) # Prüfen, ob die Ergebnisse ausreichend vertrauenswürdig sind max_score = max(r["score"] for r in fast_results) if fast_results else 0 if max_score >= self.threshold: # Hohe Zuversicht: Schnelle Ergebnisse zurückgeben return fast_results[:top_k] # Schritt 2: Präzises retrieval auf den Kandidaten candidate_ids = [r["id"] for r in fast_results] precise_results = self.precise.search( query, top_k=top_k, filter_ids=candidate_ids # Nur unter den Kandidaten suchen ) return precise_results

Ensembles spécialisés

Ensemble multi-domaine

Utiliser différents retrievers selon le domaine :

DEVELOPERpython
class MultiDomainEnsemble: def __init__(self): self.domain_retrievers = { "technical": [ DenseRetriever("codesearch"), SparseRetriever("bm25") ], "general": [ DenseRetriever("bge"), DenseRetriever("e5") ], "multilingual": [ DenseRetriever("multilingual-e5"), DenseRetriever("mbert") ] } self.domain_classifier = DomainClassifier() def search(self, query: str, top_k: int = 5) -> list[dict]: # Domäne erkennen domain = self.domain_classifier.predict(query) # Geeignete retriever auswählen retrievers = self.domain_retrievers.get(domain, self.domain_retrievers["general"]) # Ensemble für die ausgewählten retrievers ensemble = RRFEnsemble(retrievers) return ensemble.search(query, top_k=top_k)

Ensemble adaptatif

Ajuster les poids dynamiquement selon les caractéristiques de la requête :

DEVELOPERpython
class AdaptiveEnsemble: def __init__(self, retrievers: list): self.retrievers = retrievers self.query_analyzer = QueryAnalyzer() def search(self, query: str, top_k: int = 5) -> list[dict]: # Anfrage analysieren query_features = self.query_analyzer.analyze(query) # Adaptive Gewichte berechnen weights = self._compute_adaptive_weights(query_features) # Ensemble mit adaptiven Gewichten ensemble = ScoreFusionEnsemble( self.retrievers, weights=weights ) return ensemble.search(query, top_k=top_k) def _compute_adaptive_weights(self, features: dict) -> list[float]: weights = [] for retriever in self.retrievers: weight = 1.0 # Dense gut bei langen Anfragen if retriever.type == "dense" and features["length"] > 10: weight *= 1.3 # Sparse gut bei technischen Begriffen if retriever.type == "sparse" and features["has_technical_terms"]: weight *= 1.4 # Boost, wenn die Sprache übereinstimmt if hasattr(retriever, "language") and retriever.language == features["language"]: weight *= 1.2 weights.append(weight) # Normalisieren total = sum(weights) return [w / total for w in weights] class QueryAnalyzer: def analyze(self, query: str) -> dict: return { "length": len(query.split()), "has_technical_terms": self._detect_technical(query), "language": self._detect_language(query), "is_question": query.strip().endswith("?") } def _detect_technical(self, query: str) -> bool: technical_patterns = ["api", "config", "error", "oauth", "webhook"] return any(p in query.lower() for p in technical_patterns) def _detect_language(self, query: str) -> str: # Vereinfacht - in Produktion langdetect verwenden french_words = ["comment", "pourquoi", "quel", "est-ce"] return "fr" if any(w in query.lower() for w in french_words) else "en"

Optimisation des performances

Recherche parallèle

DEVELOPERpython
import asyncio from concurrent.futures import ThreadPoolExecutor class ParallelEnsemble: def __init__(self, retrievers: list, max_workers: int = 4): self.retrievers = retrievers self.executor = ThreadPoolExecutor(max_workers=max_workers) async def search(self, query: str, top_k: int = 5) -> list[dict]: loop = asyncio.get_event_loop() # Alle Suchen parallel starten tasks = [ loop.run_in_executor( self.executor, retriever.search, query, top_k * 2 ) for retriever in self.retrievers ] # Auf alle Ergebnisse warten all_results = await asyncio.gather(*tasks) # Mit RRF zusammenführen return self._rrf_fusion(all_results, top_k) def _rrf_fusion(self, all_results: list, top_k: int, k: int = 60) -> list[dict]: rrf_scores = {} contents = {} for results in all_results: for rank, result in enumerate(results, start=1): doc_id = result["id"] contents[doc_id] = result["content"] rrf_scores[doc_id] = rrf_scores.get(doc_id, 0) + 1 / (k + rank) sorted_ids = sorted(rrf_scores.keys(), key=lambda x: rrf_scores[x], reverse=True) return [ {"id": doc_id, "content": contents[doc_id], "score": rrf_scores[doc_id]} for doc_id in sorted_ids[:top_k] ]

Cache intelligent

DEVELOPERpython
class CachedEnsemble: def __init__(self, ensemble, cache_ttl: int = 3600): self.ensemble = ensemble self.cache = {} self.cache_ttl = cache_ttl def search(self, query: str, top_k: int = 5) -> list[dict]: cache_key = f"{query}:{top_k}" # Cache überprüfen if cache_key in self.cache: cached, timestamp = self.cache[cache_key] if time.time() - timestamp < self.cache_ttl: return cached # Suche ausführen results = self.ensemble.search(query, top_k) # Cachen self.cache[cache_key] = (results, time.time()) return results

Évaluation et tuning

DEVELOPERpython
class EnsembleEvaluator: def evaluate_configurations( self, queries: list[dict], retrievers: list, configurations: list[dict] ) -> pd.DataFrame: """ Tester différentes configurations d'ensemble configurations = [ {"type": "rrf", "k": 60}, {"type": "score_fusion", "weights": [0.5, 0.3, 0.2]}, {"type": "stacking"}, ] """ results = [] for config in configurations: ensemble = self._create_ensemble(retrievers, config) metrics = { "config": str(config), "ndcg@5": [], "recall@5": [], "latency_ms": [] } for query_data in queries: query = query_data["query"] relevant = query_data["relevant_docs"] start = time.time() results_search = ensemble.search(query, top_k=5) latency = (time.time() - start) * 1000 retrieved_ids = [r["id"] for r in results_search] # Calculer les métriques metrics["ndcg@5"].append(self._ndcg(retrieved_ids, relevant, k=5)) metrics["recall@5"].append(self._recall(retrieved_ids, relevant, k=5)) metrics["latency_ms"].append(latency) # Durchschnitte results.append({ "config": config, "ndcg@5": np.mean(metrics["ndcg@5"]), "recall@5": np.mean(metrics["recall@5"]), "latency_ms": np.mean(metrics["latency_ms"]) }) return pd.DataFrame(results).sort_values("ndcg@5", ascending=False)

Prochaines étapes

L'ensemble retrieval maximise la qualité en combinant plusieurs approches. Pour aller plus loin :


Ensemble retrieval avec Ailog

Ailog orchestre automatiquement plusieurs retrievers :

  • Ensemble adaptatif selon le type de requête
  • Fusion RRF optimisée avec poids appris
  • Recherche parallèle pour minimiser la latence
  • Monitoring intégré pour optimiser les configurations

Testez gratuitement et bénéficiez d'un retrieval ensemble clé en main.

FAQ

L'ensemble retrieval est recommandé quand votre corpus contient des types de contenus variés (technique, conversationnel, structuré) ou quand les requêtes utilisateurs sont hétérogènes. Si vos documents et requêtes sont homogènes, un retriever dense bien calibré peut suffire avec une latence réduite.
RRF (Reciprocal Rank Fusion) est plus robuste car il ne nécessite pas de normalisation des scores entre retrievers. Privilégiez-le quand les retrievers ont des échelles de scores très différentes. La fusion par scores normalisés fonctionne mieux quand vous voulez pondérer finement la contribution de chaque retriever.
Avec une exécution parallèle, l'overhead est minimal (10-20ms). La latence totale est dominée par le retriever le plus lent. Utilisez un cache pour les requêtes fréquentes et envisagez une approche cascade pour les cas où la latence est critique.
Commencez avec des poids égaux puis ajustez empiriquement sur un ensemble de test représentatif. Mesurez le NDCG@10 et le Recall@10 pour chaque configuration. Les poids peuvent aussi être appris automatiquement via une optimisation sur données de feedback utilisateur.
Oui, mais au-delà de 3-4 retrievers, les gains marginaux diminuent fortement tandis que la complexité augmente. Chaque retriever ajouté doit apporter une complémentarité réelle. Mesurez toujours l'amélioration effective avant d'ajouter un nouveau retriever à l'ensemble.

Tags

ragretrievalensemblemulti-retrieverfusion

Verwandte Artikel

Ailog Assistant

Ici pour vous aider

Salut ! Pose-moi des questions sur Ailog et comment intégrer votre RAG dans vos projets !