5. Retrieval

Hybrid Fusion: Combining Dense and Sparse Retrieval

March 10, 2026
Ailog Team

Master hybrid fusion to combine semantic and lexical search. RRF, weighted fusion, and optimal combination strategies explained.

Hybrid Fusion: Combining Dense and Sparse Retrieval

Hybrid fusion represents the state of the art in modern retrieval. By combining the semantic understanding of dense retrieval with the lexical precision of sparse retrieval, you get the best of both worlds. This guide explores fusion techniques, their implementations, and how to optimize your hybrid system.

Why Hybrid Fusion?

Each retrieval method has its strengths and weaknesses:

ScenarioDense OnlySparse OnlyHybrid
"How to cancel" → "Cancellation procedure"ExcellentFailsExcellent
"Error 503"MediumExcellentExcellent
"Connection problem wifi router"GoodGoodExcellent
Proper names + contextMediumGoodExcellent

Hybrid fusion captures cases where one or the other fails, improving recall without sacrificing precision.

BEIR Benchmark: Proof in Numbers

On the BEIR benchmark (diverse retrieval tasks), hybrid fusion consistently outperforms isolated approaches:

DatasetBM25Dense (BGE)HybridGain
MS MARCO22.834.237.1+8.5%
Natural Questions32.949.452.8+6.9%
TREC-COVID65.671.278.4+10.1%
SciFact66.572.376.8+6.2%

Fusion Techniques

1. Reciprocal Rank Fusion (RRF)

RRF is the most popular and robust fusion algorithm. It combines rankings without requiring normalized scores.

DEVELOPERpython
def reciprocal_rank_fusion( rankings: list[list[str]], k: int = 60 ) -> list[tuple[str, float]]: """ Reciprocal Rank Fusion rankings: List of rankings (each ranking = ordered list of IDs) k: Smoothing parameter (60 default, standard value) Formula: RRF_score(d) = Σ 1 / (k + rank(d)) """ fusion_scores = {} for ranking in rankings: for rank, doc_id in enumerate(ranking, start=1): if doc_id not in fusion_scores: fusion_scores[doc_id] = 0 fusion_scores[doc_id] += 1 / (k + rank) # Sort by descending score sorted_results = sorted( fusion_scores.items(), key=lambda x: x[1], reverse=True ) return sorted_results # Example usage dense_ranking = ["doc_a", "doc_c", "doc_b", "doc_d"] sparse_ranking = ["doc_b", "doc_a", "doc_e", "doc_c"] fused = reciprocal_rank_fusion([dense_ranking, sparse_ranking]) # [('doc_a', 0.032), ('doc_b', 0.032), ('doc_c', 0.031), ...]

RRF Advantages:

  • No score normalization required
  • Robust to outliers
  • Easy to tune k parameter

2. Weighted Score Fusion

Combines normalized scores with configurable weights:

DEVELOPERpython
def weighted_score_fusion( dense_results: list[dict], sparse_results: list[dict], alpha: float = 0.5 ) -> list[dict]: """ Weighted score fusion alpha: Dense weight (0 = sparse only, 1 = dense only) Formula: final_score = alpha × dense_norm + (1-alpha) × sparse_norm """ # Normalize scores (min-max) def normalize(results): if not results: return {} scores = [r["score"] for r in results] min_s, max_s = min(scores), max(scores) range_s = max_s - min_s if max_s != min_s else 1 return { r["id"]: (r["score"] - min_s) / range_s for r in results } dense_norm = normalize(dense_results) sparse_norm = normalize(sparse_results) # Fuse all_ids = set(dense_norm.keys()) | set(sparse_norm.keys()) fused = [] for doc_id in all_ids: d_score = dense_norm.get(doc_id, 0) s_score = sparse_norm.get(doc_id, 0) final_score = alpha * d_score + (1 - alpha) * s_score fused.append({ "id": doc_id, "score": final_score, "dense_score": d_score, "sparse_score": s_score }) return sorted(fused, key=lambda x: x["score"], reverse=True)

How to Choose Alpha?

Query TypeRecommended AlphaReason
Natural questions0.6-0.7Dense excels
Technical search0.4-0.5Balance
Codes/References0.2-0.3Sparse excels

3. Convex Combination with Reranking

Two-step approach: fusion then reranking for refinement:

DEVELOPERpython
from sentence_transformers import CrossEncoder class HybridRetrieverWithRerank: def __init__(self, dense_retriever, sparse_retriever): self.dense = dense_retriever self.sparse = sparse_retriever self.reranker = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2') def search(self, query: str, top_k: int = 5, rerank_k: int = 20): # Step 1: Get candidates from each retriever dense_results = self.dense.search(query, top_k=rerank_k) sparse_results = self.sparse.search(query, top_k=rerank_k) # Step 2: RRF fusion dense_ids = [r["id"] for r in dense_results] sparse_ids = [r["id"] for r in sparse_results] fused = reciprocal_rank_fusion([dense_ids, sparse_ids]) # Step 3: Rerank top candidates candidates = fused[:rerank_k] candidate_docs = self._get_documents([c[0] for c in candidates]) pairs = [[query, doc["content"]] for doc in candidate_docs] rerank_scores = self.reranker.predict(pairs) # Combine RRF and rerank scores final_results = [] for (doc_id, rrf_score), rerank_score, doc in zip(candidates, rerank_scores, candidate_docs): final_results.append({ "id": doc_id, "content": doc["content"], "score": 0.3 * rrf_score + 0.7 * rerank_score }) return sorted(final_results, key=lambda x: x["score"], reverse=True)[:top_k]

Implementation with Vector Databases

Qdrant: Native Hybrid Search

DEVELOPERpython
from qdrant_client import QdrantClient from qdrant_client.models import ( VectorParams, SparseVectorParams, PointStruct, SparseVector, SearchRequest, NamedVector, NamedSparseVector, Prefetch, FusionQuery, Fusion ) client = QdrantClient("localhost", port=6333) # Create hybrid collection client.create_collection( collection_name="hybrid_docs", vectors_config={ "dense": VectorParams(size=1024, distance="Cosine") }, sparse_vectors_config={ "sparse": SparseVectorParams() } ) # Index with both vector types def index_hybrid(doc_id: str, content: str, dense_emb, sparse_vec): client.upsert( collection_name="hybrid_docs", points=[PointStruct( id=doc_id, payload={"content": content}, vector={ "dense": dense_emb, "sparse": sparse_vec } )] ) # Native RRF hybrid search def hybrid_search(query: str, top_k: int = 5): query_dense = encode_dense(query) query_sparse = encode_sparse(query) results = client.query_points( collection_name="hybrid_docs", prefetch=[ Prefetch( query=query_dense, using="dense", limit=20 ), Prefetch( query=query_sparse, using="sparse", limit=20 ) ], query=FusionQuery(fusion=Fusion.RRF), limit=top_k ) return results

Elasticsearch: Combined Query

DEVELOPERpython
from elasticsearch import Elasticsearch es = Elasticsearch() def hybrid_search_es(query: str, query_embedding: list, top_k: int = 5): """ Elasticsearch hybrid search with kNN + BM25 """ response = es.search( index="hybrid_index", body={ "size": top_k, "query": { "bool": { "should": [ # BM25 search { "match": { "content": { "query": query, "boost": 0.5 } } }, # kNN search { "knn": { "field": "embedding", "query_vector": query_embedding, "k": 20, "num_candidates": 100, "boost": 0.5 } } ] } } } ) return response["hits"]["hits"]

Weaviate: Hybrid Alpha

DEVELOPERpython
import weaviate client = weaviate.Client("http://localhost:8080") def hybrid_search_weaviate(query: str, alpha: float = 0.5): """ Weaviate hybrid search alpha: 0 = BM25 only, 1 = vector only """ result = ( client.query .get("Document", ["content", "title"]) .with_hybrid( query=query, alpha=alpha, fusion_type="relativeScoreFusion" # or "rankedFusion" ) .with_limit(5) .do() ) return result["data"]["Get"]["Document"]

Advanced Strategies

Conditional Fusion

Dynamically adapt strategy based on query type:

DEVELOPERpython
class AdaptiveHybridRetriever: def __init__(self, dense, sparse, classifier): self.dense = dense self.sparse = sparse self.classifier = classifier # Classifies query type def search(self, query: str, top_k: int = 5): # Classify the query query_type = self.classifier.predict(query) if query_type == "exact_match": # Codes, references → sparse dominant alpha = 0.2 elif query_type == "semantic": # Natural questions → dense dominant alpha = 0.8 else: # Balanced hybrid alpha = 0.5 return self._hybrid_search(query, top_k, alpha) def _classify_query(self, query: str) -> str: """Simple heuristics to classify""" # Detect codes/references if re.search(r'[A-Z]{2,}\d+|#\d+|v\d+\.\d+', query): return "exact_match" # Very short queries → sparse if len(query.split()) <= 2: return "exact_match" # Questions → dense if query.lower().startswith(('how', 'why', 'what', 'which')): return "semantic" return "balanced"

Multi-Index Fusion

Combine multiple information sources:

DEVELOPERpython
def multi_source_fusion( query: str, retrievers: dict[str, Retriever], weights: dict[str, float], top_k: int = 5 ): """ Multi-source fusion retrievers = { "faq": faq_retriever, "docs": docs_retriever, "products": product_retriever } weights = {"faq": 1.5, "docs": 1.0, "products": 0.8} """ all_rankings = [] all_weights = [] for source_name, retriever in retrievers.items(): results = retriever.search(query, top_k=top_k * 2) ranking = [r["id"] for r in results] all_rankings.append(ranking) all_weights.append(weights.get(source_name, 1.0)) # Weighted RRF fusion_scores = {} for ranking, weight in zip(all_rankings, all_weights): for rank, doc_id in enumerate(ranking, start=1): if doc_id not in fusion_scores: fusion_scores[doc_id] = 0 fusion_scores[doc_id] += weight / (60 + rank) return sorted(fusion_scores.items(), key=lambda x: x[1], reverse=True)[:top_k]

Evaluation and Tuning

A/B Testing Parameters

DEVELOPERpython
def evaluate_fusion_params( test_queries: list[dict], dense_retriever, sparse_retriever, param_grid: dict ): """ Grid search on fusion parameters """ results = [] for alpha in param_grid.get("alpha", [0.3, 0.5, 0.7]): for k in param_grid.get("rrf_k", [20, 60, 100]): metrics = { "alpha": alpha, "rrf_k": k, "recall@5": [], "mrr": [] } for test_case in test_queries: query = test_case["query"] relevant = test_case["relevant_docs"] # Execute search dense_results = dense_retriever.search(query, top_k=20) sparse_results = sparse_retriever.search(query, top_k=20) # Fusion with parameters fused = reciprocal_rank_fusion( [[r["id"] for r in dense_results], [r["id"] for r in sparse_results]], k=k ) # Calculate metrics retrieved_ids = [doc_id for doc_id, _ in fused[:5]] hits = len(set(retrieved_ids) & set(relevant)) metrics["recall@5"].append(hits / len(relevant)) # MRR for i, doc_id in enumerate(retrieved_ids): if doc_id in relevant: metrics["mrr"].append(1 / (i + 1)) break else: metrics["mrr"].append(0) metrics["recall@5"] = np.mean(metrics["recall@5"]) metrics["mrr"] = np.mean(metrics["mrr"]) results.append(metrics) return pd.DataFrame(results).sort_values("recall@5", ascending=False)

Production Monitoring

DEVELOPERpython
class HybridRetrieverWithMetrics: def __init__(self, dense, sparse, metrics_client): self.dense = dense self.sparse = sparse self.metrics = metrics_client def search(self, query: str, top_k: int = 5): start = time.time() # Parallel searches dense_results = self.dense.search(query, top_k=20) sparse_results = self.sparse.search(query, top_k=20) # Fusion fused = self._fuse(dense_results, sparse_results) # Metrics duration = time.time() - start self.metrics.record("retrieval_latency_ms", duration * 1000) self.metrics.record("dense_top1_in_final", dense_results[0]["id"] in [f["id"] for f in fused[:5]]) self.metrics.record("sparse_top1_in_final", sparse_results[0]["id"] in [f["id"] for f in fused[:5]]) # Divergence analysis dense_set = set([r["id"] for r in dense_results[:5]]) sparse_set = set([r["id"] for r in sparse_results[:5]]) overlap = len(dense_set & sparse_set) / 5 self.metrics.record("dense_sparse_overlap", overlap) return fused[:top_k]

Next Steps

Hybrid fusion is the foundation of robust retrieval. To go further:


Automatic Hybrid Fusion with Ailog

Ailog implements hybrid fusion transparently:

  • Native RRF optimized for your content
  • Adaptive alpha based on query analysis
  • Automatic reranking for maximum precision
  • Integrated monitoring for continuous optimization

Try for free and get hybrid retrieval with zero configuration.

Tags

ragretrievalhybrid searchfusionrrf

Related Posts

Ailog Assistant

Ici pour vous aider

Salut ! Pose-moi des questions sur Ailog et comment intégrer votre RAG dans vos projets !