Milvus : Recherche Vectorielle à l'Échelle Milliards
Déployez Milvus pour un RAG à l'Échelle Production Gérant des Milliards de Vecteurs avec Mise à l'Échelle Horizontale et Accélération GPU.
Pourquoi Milvus ?
Conçu pour l'échelle :
- Des milliards de vecteurs
- Accélération GPU
- Mise à l'échelle horizontale
- Stockage S3/MinIO
- Natif Kubernetes
Utilisé par : Shopify, NVIDIA, Salesforce
Configuration Docker
DEVELOPERbash# Standalone (développement) docker run -d --name milvus -p 19530:19530 -p 9091:9091 \ milvusdb/milvus:v2.3.4 milvus run standalone
Déploiement Distribué
DEVELOPERyaml# docker-compose.yml (production) version: '3.8' services: etcd: image: quay.io/coreos/etcd:v3.5.5 minio: image: minio/minio:RELEASE.2023-03-20T20-16-18Z milvus-proxy: image: milvusdb/milvus:v2.3.4 command: ["milvus", "run", "proxy"] depends_on: - etcd - minio milvus-querynode: image: milvusdb/milvus:v2.3.4 command: ["milvus", "run", "querynode"] deploy: replicas: 3 # Mise à l'échelle horizontale
Client Python
DEVELOPERpythonfrom pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType # Connexion connections.connect("default", host="localhost", port="19530") # Définir le schéma fields = [ FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True), FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=1536), FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535), FieldSchema(name="metadata", dtype=DataType.JSON) ] schema = CollectionSchema(fields=fields, description="RAG documents") # Créer la collection collection = Collection(name="documents", schema=schema)
Stratégies d'Indexation
DEVELOPERpython# IVF_FLAT (équilibré) index_params = { "index_type": "IVF_FLAT", "metric_type": "COSINE", "params": {"nlist": 1024} } # HNSW (recherche plus rapide, plus de mémoire) index_params = { "index_type": "HNSW", "metric_type": "COSINE", "params": { "M": 16, "efConstruction": 256 } } # Index GPU (10x plus rapide) index_params = { "index_type": "GPU_IVF_FLAT", "metric_type": "COSINE", "params": {"nlist": 2048} } collection.create_index(field_name="embedding", index_params=index_params)
Insertion de Données
DEVELOPERpython# Insertion par lots data = [ [embedding1, embedding2, ...], # embeddings ["text1", "text2", ...], # text [{"category": "A"}, {"category": "B"}, ...] # metadata ] collection.insert(data) collection.flush() # Persister sur disque
Recherche
DEVELOPERpython# Charger la collection en mémoire collection.load() # Recherche search_params = { "metric_type": "COSINE", "params": {"nprobe": 16} # Plus élevé = plus précis mais plus lent } results = collection.search( data=[query_embedding], anns_field="embedding", param=search_params, limit=10, output_fields=["text", "metadata"] ) for hits in results: for hit in hits: print(f"Score: {hit.score}, Text: {hit.entity.get('text')}")
Filtrage
DEVELOPERpython# Filtrage des métadonnées avec expressions booléennes results = collection.search( data=[query_embedding], anns_field="embedding", param=search_params, limit=10, expr='metadata["category"] == "tech"', output_fields=["text", "metadata"] )
Partitionnement
Diviser la collection pour des requêtes plus rapides :
DEVELOPERpython# Créer des partitions collection.create_partition("partition_2024") collection.create_partition("partition_2025") # Insérer dans une partition spécifique collection.insert(data, partition_name="partition_2025") # Rechercher uniquement dans une partition spécifique results = collection.search( data=[query_embedding], anns_field="embedding", param=search_params, partition_names=["partition_2025"], limit=10 )
Time Travel
Interroger les données historiques :
DEVELOPERpythonimport time # Obtenir le timestamp avant la suppression ts_before = int(time.time() * 1000) # Supprimer des données collection.delete(expr="id in [1, 2, 3]") # Interroger les données telles qu'elles étaient avant la suppression results = collection.search( data=[query_embedding], anns_field="embedding", param=search_params, limit=10, travel_timestamp=ts_before )
Groupes de Ressources
Isoler les charges de travail sur différents nœuds :
DEVELOPERpythonfrom pymilvus import utility # Créer des groupes de ressources utility.create_resource_group("rg1", config={"node_num": 2}) utility.create_resource_group("rg2", config={"node_num": 1}) # Assigner une collection à un groupe de ressources collection.set_properties({"resource_groups": ["rg1"]})
Surveillance
DEVELOPERpython# Statistiques de la collection stats = collection.get_stats() print(f"Row count: {stats['row_count']}") # Progression de l'index index = collection.index() print(f"Index state: {index.state}") # Métriques de requête (endpoint Prometheus) # http://localhost:9091/metrics
Pipeline RAG de Production
DEVELOPERpythonfrom pymilvus import Collection, connections import openai connections.connect("default", host="milvus-proxy", port="19530") collection = Collection("documents") collection.load() def milvus_rag(query): # Créer l'embedding de la requête query_emb = openai.Embedding.create( input=query, model="text-embedding-3-small" )['data'][0]['embedding'] # Rechercher dans Milvus results = collection.search( data=[query_emb], anns_field="embedding", param={"metric_type": "COSINE", "params": {"nprobe": 32}}, limit=5, output_fields=["text"] ) # Construire le contexte context = "\n\n".join([hit.entity.get('text') for hit in results[0]]) # Générer la réponse response = openai.ChatCompletion.create( model="gpt-4-turbo", messages=[{ "role": "user", "content": f"Context: {context}\n\nQuestion: {query}" }] ) return response.choices[0].message.content # Utilisation answer = milvus_rag("What is Milvus?")
Milvus gère l'échelle de milliards de vecteurs avec facilité. Parfait pour les déploiements RAG en entreprise.
Tags
Articles connexes
Bases de Données Vectorielles : Stocker et Rechercher des Embeddings
Guide complet sur les bases de données vectorielles pour le RAG : comparaison des options populaires, stratégies d'indexation et optimisation des performances.
Qdrant : Fonctionnalités Avancées de Recherche Vectorielle
Exploitez les fonctionnalités puissantes de Qdrant : indexation de payload, quantization, déploiement distribué pour des systèmes RAG haute performance.
Pinecone pour le RAG de Production à Grande Échelle
Déployez la recherche vectorielle prête pour la production : configuration de Pinecone, stratégies d'indexation et mise à l'échelle jusqu'à des milliards de vecteurs.