Milvus : Recherche Vectorielle à l'Échelle Milliards
Déployez Milvus pour un RAG à l'Échelle Production Gérant des Milliards de Vecteurs avec Mise à l'Échelle Horizontale et Accélération GPU.
- Auteur
- Équipe de Recherche Ailog
- Date de publication
- Temps de lecture
- 13 min de lecture
- Niveau
- advanced
- Étape du pipeline RAG
- Storage
Pourquoi Milvus ?
Conçu pour l'échelle : • Des milliards de vecteurs • Accélération GPU • Mise à l'échelle horizontale • Stockage S3/MinIO • Natif Kubernetes
Utilisé par : Shopify, NVIDIA, Salesforce
Configuration Docker
``bash Standalone (développement) docker run -d --name milvus -p 19530:19530 -p 9091:9091 \ milvusdb/milvus:v2.3.4 milvus run standalone `
Déploiement Distribué
`yaml docker-compose.yml (production) version: '3.8'
services: etcd: image: quay.io/coreos/etcd:v3.5.5
minio: image: minio/minio:RELEASE.2023-03-20T20-16-18Z
milvus-proxy: image: milvusdb/milvus:v2.3.4 command: ["milvus", "run", "proxy"] depends_on: • etcd • minio
milvus-querynode: image: milvusdb/milvus:v2.3.4 command: ["milvus", "run", "querynode"] deploy: replicas: 3 Mise à l'échelle horizontale `
Client Python
`python from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType
Connexion connections.connect("default", host="localhost", port="19530")
Définir le schéma fields = [ FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True), FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=1536), FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535), FieldSchema(name="metadata", dtype=DataType.JSON) ]
schema = CollectionSchema(fields=fields, description="RAG documents")
Créer la collection collection = Collection(name="documents", schema=schema) `
Stratégies d'Indexation
`python IVF_FLAT (équilibré) index_params = { "index_type": "IVF_FLAT", "metric_type": "COSINE", "params": {"nlist": 1024} }
HNSW (recherche plus rapide, plus de mémoire) index_params = { "index_type": "HNSW", "metric_type": "COSINE", "params": { "M": 16, "efConstruction": 256 } }
Index GPU (10x plus rapide) index_params = { "index_type": "GPU_IVF_FLAT", "metric_type": "COSINE", "params": {"nlist": 2048} }
collection.create_index(field_name="embedding", index_params=index_params) `
Insertion de Données
`python Insertion par lots data = [ [embedding1, embedding2, ...], embeddings ["text1", "text2", ...], text [{"category": "A"}, {"category": "B"}, ...] metadata ]
collection.insert(data) collection.flush() Persister sur disque `
Recherche
`python Charger la collection en mémoire collection.load()
Recherche search_params = { "metric_type": "COSINE", "params": {"nprobe": 16} Plus élevé = plus précis mais plus lent }
results = collection.search( data=[query_embedding], anns_field="embedding", param=search_params, limit=10, output_fields=["text", "metadata"] )
for hits in results: for hit in hits: print(f"Score: {hit.score}, Text: {hit.entity.get('text')}") `
Filtrage
`python Filtrage des métadonnées avec expressions booléennes results = collection.search( data=[query_embedding], anns_field="embedding", param=search_params, limit=10, expr='metadata["category"] == "tech"', output_fields=["text", "metadata"] ) `
Partitionnement
Diviser la collection pour des requêtes plus rapides :
`python Créer des partitions collection.create_partition("partition_2024") collection.create_partition("partition_2025")
Insérer dans une partition spécifique collection.insert(data, partition_name="partition_2025")
Rechercher uniquement dans une partition spécifique results = collection.search( data=[query_embedding], anns_field="embedding", param=search_params, partition_names=["partition_2025"], limit=10 ) `
Time Travel
Interroger les données historiques :
`python import time
Obtenir le timestamp avant la suppression ts_before = int(time.time() * 1000)
Supprimer des données collection.delete(expr="id in [1, 2, 3]")
Interroger les données telles qu'elles étaient avant la suppression results = collection.search( data=[query_embedding], anns_field="embedding", param=search_params, limit=10, travel_timestamp=ts_before ) `
Groupes de Ressources
Isoler les charges de travail sur différents nœuds :
`python from pymilvus import utility
Créer des groupes de ressources utility.create_resource_group("rg1", config={"node_num": 2}) utility.create_resource_group("rg2", config={"node_num": 1})
Assigner une collection à un groupe de ressources collection.set_properties({"resource_groups": ["rg1"]}) `
Surveillance
`python Statistiques de la collection stats = collection.get_stats() print(f"Row count: {stats['row_count']}")
Progression de l'index index = collection.index() print(f"Index state: {index.state}")
Métriques de requête (endpoint Prometheus) http://localhost:9091/metrics `
Pipeline RAG de Production
`python from pymilvus import Collection, connections import openai
connections.connect("default", host="milvus-proxy", port="19530") collection = Collection("documents") collection.load()
def milvus_rag(query): Créer l'embedding de la requête query_emb = openai.Embedding.create( input=query, model="text-embedding-3-small" )['data'][0]['embedding']
Rechercher dans Milvus results = collection.search( data=[query_emb], anns_field="embedding", param={"metric_type": "COSINE", "params": {"nprobe": 32}}, limit=5, output_fields=["text"] )
Construire le contexte context = "\n\n".join([hit.entity.get('text') for hit in results[0]])
Générer la réponse response = openai.ChatCompletion.create( model="gpt-4-turbo", messages=[{ "role": "user", "content": f"Context: {context}\n\nQuestion: {query}" }] )
return response.choices[0].message.content
Utilisation answer = milvus_rag("What is Milvus?") ``
Milvus gère l'échelle de milliards de vecteurs avec facilité. Parfait pour les déploiements RAG en entreprise.