Ensemble Retrieval: Mehrere retrievers kombinieren
Implementieren Sie Ensemble Retrieval, um die Stärken mehrerer retrievers zu kombinieren. Voting, stacking und fortgeschrittene Fusionsstrategien.
Ensemble Retrieval : Combiner plusieurs retrievers
L'ensemble retrieval applique le principe du machine learning ensemble aux systèmes de recherche : combiner les prédictions de plusieurs modèles pour obtenir de meilleurs résultats que chaque modèle individuellement. Ce guide explore comment orchestrer plusieurs retrievers pour maximiser la qualité du retrieval.
Pourquoi l'ensemble retrieval ?
Chaque retriever a ses angles morts :
| Retriever | Forces | Faiblesses |
|---|---|---|
| Dense (BGE) | Sémantique générale | Termes rares |
| Dense (E5) | Multilingue | Requêtes courtes |
| Sparse (BM25) | Correspondance exacte | Synonymes |
| Sparse (TF-IDF) | Rapide | Moins précis |
| Knowledge Graph | Relations | Couverture limitée |
Un ensemble compense les faiblesses de chaque retriever en exploitant leurs forces complémentaires.
Benchmark : Ensemble vs Single Retriever
| Configuration | NDCG@10 | Recall@10 | Latence |
|---|---|---|---|
| BGE seul | 0.68 | 0.72 | 45ms |
| BM25 seul | 0.61 | 0.68 | 12ms |
| BGE + BM25 | 0.74 | 0.81 | 52ms |
| BGE + E5 + BM25 | 0.77 | 0.84 | 85ms |
L'ensemble à 3 retrievers améliore le NDCG de 13% pour seulement 2x la latence.
Stratégies d'ensemble
1. Voting (Hard Ensemble)
Chaque retriever "vote" pour les documents, on garde ceux avec le plus de votes :
DEVELOPERpythonfrom collections import Counter class VotingEnsemble: def __init__(self, retrievers: list): self.retrievers = retrievers def search(self, query: str, top_k: int = 5) -> list[dict]: # Stimmen jedes retrievers sammeln all_results = {} for retriever in self.retrievers: results = retriever.search(query, top_k=top_k * 2) for rank, result in enumerate(results): doc_id = result["id"] if doc_id not in all_results: all_results[doc_id] = { "content": result["content"], "votes": 0, "retrievers": [] } all_results[doc_id]["votes"] += 1 all_results[doc_id]["retrievers"].append(retriever.name) # Nach Anzahl der Stimmen sortieren sorted_results = sorted( all_results.values(), key=lambda x: x["votes"], reverse=True ) return sorted_results[:top_k] # Beispiel ensemble = VotingEnsemble([ DenseRetriever("bge"), DenseRetriever("e5"), SparseRetriever("bm25") ]) results = ensemble.search("Comment configurer OAuth ?") for r in results: print(f"Votes: {r['votes']}, Retrievers: {r['retrievers']}")
2. Score Fusion (Soft Ensemble)
Combine les scores normalisés de chaque retriever :
DEVELOPERpythonimport numpy as np class ScoreFusionEnsemble: def __init__( self, retrievers: list, weights: list[float] = None, normalization: str = "min_max" # "min_max", "z_score", "rank" ): self.retrievers = retrievers self.weights = weights or [1.0] * len(retrievers) self.normalization = normalization def search(self, query: str, top_k: int = 5) -> list[dict]: all_results = {} for retriever, weight in zip(self.retrievers, self.weights): results = retriever.search(query, top_k=top_k * 2) # Scores normalisieren scores = [r["score"] for r in results] normalized = self._normalize(scores) for result, norm_score in zip(results, normalized): doc_id = result["id"] if doc_id not in all_results: all_results[doc_id] = { "content": result["content"], "scores": {}, "weighted_sum": 0 } all_results[doc_id]["scores"][retriever.name] = norm_score all_results[doc_id]["weighted_sum"] += weight * norm_score # Trier par score pondéré sorted_results = sorted( all_results.values(), key=lambda x: x["weighted_sum"], reverse=True ) return sorted_results[:top_k] def _normalize(self, scores: list[float]) -> list[float]: if not scores: return [] if self.normalization == "min_max": min_s, max_s = min(scores), max(scores) range_s = max_s - min_s if max_s != min_s else 1 return [(s - min_s) / range_s for s in scores] elif self.normalization == "z_score": mean = np.mean(scores) std = np.std(scores) or 1 return [(s - mean) / std for s in scores] elif self.normalization == "rank": # Rangbasierter Score (0 bis 1) n = len(scores) return [(n - i) / n for i in range(n)] # Beispiel mit benutzerdefinierten Gewichten ensemble = ScoreFusionEnsemble( retrievers=[dense_bge, dense_e5, sparse_bm25], weights=[0.4, 0.3, 0.3], normalization="min_max" )
3. Reciprocal Rank Fusion (RRF)
Combine les rankings sans nécessiter de normalisation des scores :
DEVELOPERpythonclass RRFEnsemble: def __init__( self, retrievers: list, k: int = 60, weights: list[float] = None ): self.retrievers = retrievers self.k = k self.weights = weights or [1.0] * len(retrievers) def search(self, query: str, top_k: int = 5) -> list[dict]: rrf_scores = {} doc_contents = {} for retriever, weight in zip(self.retrievers, self.weights): results = retriever.search(query, top_k=top_k * 3) for rank, result in enumerate(results, start=1): doc_id = result["id"] doc_contents[doc_id] = result["content"] if doc_id not in rrf_scores: rrf_scores[doc_id] = 0 # Gewichtete RRF-Formel rrf_scores[doc_id] += weight / (self.k + rank) # Construire les résultats sorted_ids = sorted(rrf_scores.keys(), key=lambda x: rrf_scores[x], reverse=True) return [ { "id": doc_id, "content": doc_contents[doc_id], "rrf_score": rrf_scores[doc_id] } for doc_id in sorted_ids[:top_k] ] # Beispiel rrf_ensemble = RRFEnsemble( retrievers=[dense_bge, sparse_bm25], k=60, weights=[0.6, 0.4] )
4. Stacking avec Reranker
Utilise un modèle de reranking pour combiner les résultats :
DEVELOPERpythonfrom sentence_transformers import CrossEncoder class StackedEnsemble: def __init__( self, retrievers: list, reranker_model: str = "cross-encoder/ms-marco-MiniLM-L-6-v2" ): self.retrievers = retrievers self.reranker = CrossEncoder(reranker_model) def search(self, query: str, top_k: int = 5, rerank_k: int = 20) -> list[dict]: # 1. Kandidaten von allen retrievern sammeln candidates = {} for retriever in self.retrievers: results = retriever.search(query, top_k=rerank_k) for result in results: doc_id = result["id"] if doc_id not in candidates: candidates[doc_id] = result["content"] # 2. Alle Kandidaten neu bewerten candidate_list = list(candidates.items()) pairs = [[query, content] for _, content in candidate_list] rerank_scores = self.reranker.predict(pairs) # 3. Endgültige Ergebnisse erstellen results = [ { "id": doc_id, "content": content, "rerank_score": float(score) } for (doc_id, content), score in zip(candidate_list, rerank_scores) ] return sorted(results, key=lambda x: x["rerank_score"], reverse=True)[:top_k]
5. Cascade Ensemble
Approche en cascade : le premier retriever filtre, les suivants affinent :
DEVELOPERpythonclass CascadeEnsemble: def __init__( self, fast_retriever, precise_retriever, cascade_threshold: float = 0.7 ): self.fast = fast_retriever self.precise = precise_retriever self.threshold = cascade_threshold def search(self, query: str, top_k: int = 5) -> list[dict]: # Schritt 1: Schnelle retrieval (großer Recall) fast_results = self.fast.search(query, top_k=top_k * 4) # Prüfen, ob die Ergebnisse ausreichend vertrauenswürdig sind max_score = max(r["score"] for r in fast_results) if fast_results else 0 if max_score >= self.threshold: # Hohe Zuversicht: Schnelle Ergebnisse zurückgeben return fast_results[:top_k] # Schritt 2: Präzises retrieval auf den Kandidaten candidate_ids = [r["id"] for r in fast_results] precise_results = self.precise.search( query, top_k=top_k, filter_ids=candidate_ids # Nur unter den Kandidaten suchen ) return precise_results
Ensembles spécialisés
Ensemble multi-domaine
Utiliser différents retrievers selon le domaine :
DEVELOPERpythonclass MultiDomainEnsemble: def __init__(self): self.domain_retrievers = { "technical": [ DenseRetriever("codesearch"), SparseRetriever("bm25") ], "general": [ DenseRetriever("bge"), DenseRetriever("e5") ], "multilingual": [ DenseRetriever("multilingual-e5"), DenseRetriever("mbert") ] } self.domain_classifier = DomainClassifier() def search(self, query: str, top_k: int = 5) -> list[dict]: # Domäne erkennen domain = self.domain_classifier.predict(query) # Geeignete retriever auswählen retrievers = self.domain_retrievers.get(domain, self.domain_retrievers["general"]) # Ensemble für die ausgewählten retrievers ensemble = RRFEnsemble(retrievers) return ensemble.search(query, top_k=top_k)
Ensemble adaptatif
Ajuster les poids dynamiquement selon les caractéristiques de la requête :
DEVELOPERpythonclass AdaptiveEnsemble: def __init__(self, retrievers: list): self.retrievers = retrievers self.query_analyzer = QueryAnalyzer() def search(self, query: str, top_k: int = 5) -> list[dict]: # Anfrage analysieren query_features = self.query_analyzer.analyze(query) # Adaptive Gewichte berechnen weights = self._compute_adaptive_weights(query_features) # Ensemble mit adaptiven Gewichten ensemble = ScoreFusionEnsemble( self.retrievers, weights=weights ) return ensemble.search(query, top_k=top_k) def _compute_adaptive_weights(self, features: dict) -> list[float]: weights = [] for retriever in self.retrievers: weight = 1.0 # Dense gut bei langen Anfragen if retriever.type == "dense" and features["length"] > 10: weight *= 1.3 # Sparse gut bei technischen Begriffen if retriever.type == "sparse" and features["has_technical_terms"]: weight *= 1.4 # Boost, wenn die Sprache übereinstimmt if hasattr(retriever, "language") and retriever.language == features["language"]: weight *= 1.2 weights.append(weight) # Normalisieren total = sum(weights) return [w / total for w in weights] class QueryAnalyzer: def analyze(self, query: str) -> dict: return { "length": len(query.split()), "has_technical_terms": self._detect_technical(query), "language": self._detect_language(query), "is_question": query.strip().endswith("?") } def _detect_technical(self, query: str) -> bool: technical_patterns = ["api", "config", "error", "oauth", "webhook"] return any(p in query.lower() for p in technical_patterns) def _detect_language(self, query: str) -> str: # Vereinfacht - in Produktion langdetect verwenden french_words = ["comment", "pourquoi", "quel", "est-ce"] return "fr" if any(w in query.lower() for w in french_words) else "en"
Optimisation des performances
Recherche parallèle
DEVELOPERpythonimport asyncio from concurrent.futures import ThreadPoolExecutor class ParallelEnsemble: def __init__(self, retrievers: list, max_workers: int = 4): self.retrievers = retrievers self.executor = ThreadPoolExecutor(max_workers=max_workers) async def search(self, query: str, top_k: int = 5) -> list[dict]: loop = asyncio.get_event_loop() # Alle Suchen parallel starten tasks = [ loop.run_in_executor( self.executor, retriever.search, query, top_k * 2 ) for retriever in self.retrievers ] # Auf alle Ergebnisse warten all_results = await asyncio.gather(*tasks) # Mit RRF zusammenführen return self._rrf_fusion(all_results, top_k) def _rrf_fusion(self, all_results: list, top_k: int, k: int = 60) -> list[dict]: rrf_scores = {} contents = {} for results in all_results: for rank, result in enumerate(results, start=1): doc_id = result["id"] contents[doc_id] = result["content"] rrf_scores[doc_id] = rrf_scores.get(doc_id, 0) + 1 / (k + rank) sorted_ids = sorted(rrf_scores.keys(), key=lambda x: rrf_scores[x], reverse=True) return [ {"id": doc_id, "content": contents[doc_id], "score": rrf_scores[doc_id]} for doc_id in sorted_ids[:top_k] ]
Cache intelligent
DEVELOPERpythonclass CachedEnsemble: def __init__(self, ensemble, cache_ttl: int = 3600): self.ensemble = ensemble self.cache = {} self.cache_ttl = cache_ttl def search(self, query: str, top_k: int = 5) -> list[dict]: cache_key = f"{query}:{top_k}" # Cache überprüfen if cache_key in self.cache: cached, timestamp = self.cache[cache_key] if time.time() - timestamp < self.cache_ttl: return cached # Suche ausführen results = self.ensemble.search(query, top_k) # Cachen self.cache[cache_key] = (results, time.time()) return results
Évaluation et tuning
DEVELOPERpythonclass EnsembleEvaluator: def evaluate_configurations( self, queries: list[dict], retrievers: list, configurations: list[dict] ) -> pd.DataFrame: """ Tester différentes configurations d'ensemble configurations = [ {"type": "rrf", "k": 60}, {"type": "score_fusion", "weights": [0.5, 0.3, 0.2]}, {"type": "stacking"}, ] """ results = [] for config in configurations: ensemble = self._create_ensemble(retrievers, config) metrics = { "config": str(config), "ndcg@5": [], "recall@5": [], "latency_ms": [] } for query_data in queries: query = query_data["query"] relevant = query_data["relevant_docs"] start = time.time() results_search = ensemble.search(query, top_k=5) latency = (time.time() - start) * 1000 retrieved_ids = [r["id"] for r in results_search] # Calculer les métriques metrics["ndcg@5"].append(self._ndcg(retrieved_ids, relevant, k=5)) metrics["recall@5"].append(self._recall(retrieved_ids, relevant, k=5)) metrics["latency_ms"].append(latency) # Durchschnitte results.append({ "config": config, "ndcg@5": np.mean(metrics["ndcg@5"]), "recall@5": np.mean(metrics["recall@5"]), "latency_ms": np.mean(metrics["latency_ms"]) }) return pd.DataFrame(results).sort_values("ndcg@5", ascending=False)
Prochaines étapes
L'ensemble retrieval maximise la qualité en combinant plusieurs approches. Pour aller plus loin :
- Hybrid Retrieval Fusion - Combiner dense et sparse
- Query Routing - Router vers les bonnes sources
- Fondamentaux du Retrieval - Vue d'ensemble
Ensemble retrieval avec Ailog
Ailog orchestre automatiquement plusieurs retrievers :
- Ensemble adaptatif selon le type de requête
- Fusion RRF optimisée avec poids appris
- Recherche parallèle pour minimiser la latence
- Monitoring intégré pour optimiser les configurations
Testez gratuitement et bénéficiez d'un retrieval ensemble clé en main.
FAQ
Tags
Verwandte Artikel
Hybride Fusion: Dense- und Sparse-Retrieval kombinieren
Meistern Sie die hybride Fusion zur Kombination von semantischer und lexikalischer Suche. RRF, weighted fusion und optimale Kombinationsstrategien.
Query Routing: Anfragen an die richtige Quelle weiterleiten
Implementieren Sie Query Routing, um jede Anfrage zur optimalen Datenquelle zu leiten. Klassifizierung, LLM-Routing und fortgeschrittene Strategien.
Filtern nach Metadaten: RAG-Suche verfeinern
Beherrschen Sie das Filtern nach Metadaten für präzise RAG-Suchen. Filtertypen, Indexierung, kombinierte Abfragen und Optimierung.