4. StorageExperte

Milvus: Vektorielle Suche im Milliardenmaßstab

15. November 2025
13 Minuten Lesezeit
Équipe de Recherche Ailog

Setzen Sie Milvus für ein produktionsfähiges RAG ein, das Milliarden von Vektoren verwaltet, mit horizontaler Skalierung und GPU-Beschleunigung.

Warum Milvus ?

Für Skalierung konzipiert :

  • Milliarden von vectors
  • GPU-Beschleunigung
  • Horizontale Skalierung
  • S3/MinIO-Speicher
  • Kubernetes-nativ

Genutzt von : Shopify, NVIDIA, Salesforce

Docker-Konfiguration

DEVELOPERbash
# Standalone (Entwicklung) docker run -d --name milvus -p 19530:19530 -p 9091:9091 \ milvusdb/milvus:v2.3.4 milvus run standalone

Verteilte Bereitstellung

DEVELOPERyaml
# docker-compose.yml (Produktion) version: '3.8' services: etcd: image: quay.io/coreos/etcd:v3.5.5 minio: image: minio/minio:RELEASE.2023-03-20T20-16-18Z milvus-proxy: image: milvusdb/milvus:v2.3.4 command: ["milvus", "run", "proxy"] depends_on: - etcd - minio milvus-querynode: image: milvusdb/milvus:v2.3.4 command: ["milvus", "run", "querynode"] deploy: replicas: 3 # Horizontale Skalierung

Python-Client

DEVELOPERpython
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType # Verbindung connections.connect("default", host="localhost", port="19530") # Schema definieren fields = [ FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True), FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=1536), FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535), FieldSchema(name="metadata", dtype=DataType.JSON) ] schema = CollectionSchema(fields=fields, description="RAG documents") # Collection erstellen collection = Collection(name="documents", schema=schema)

Indexierungsstrategien

DEVELOPERpython
# IVF_FLAT (ausgewogen) index_params = { "index_type": "IVF_FLAT", "metric_type": "COSINE", "params": {"nlist": 1024} } # HNSW (schnellere Suche, mehr Speicher) index_params = { "index_type": "HNSW", "metric_type": "COSINE", "params": { "M": 16, "efConstruction": 256 } } # GPU-Index (10x schneller) index_params = { "index_type": "GPU_IVF_FLAT", "metric_type": "COSINE", "params": {"nlist": 2048} } collection.create_index(field_name="embedding", index_params=index_params)

Einfügen von Daten

DEVELOPERpython
# Batch-Einfügen data = [ [embedding1, embedding2, ...], # embeddings ["text1", "text2", ...], # text [{"category": "A"}, {"category": "B"}, ...] # metadata ] collection.insert(data) collection.flush() # Auf Festplatte persistieren

Suche

DEVELOPERpython
# Collection in den Speicher laden collection.load() # Suche search_params = { "metric_type": "COSINE", "params": {"nprobe": 16} # Höher = genauer, aber langsamer } results = collection.search( data=[query_embedding], anns_field="embedding", param=search_params, limit=10, output_fields=["text", "metadata"] ) for hits in results: for hit in hits: print(f"Score: {hit.score}, Text: {hit.entity.get('text')}")

Filterung

DEVELOPERpython
# Filterung der Metadaten mit booleschen Ausdrücken results = collection.search( data=[query_embedding], anns_field="embedding", param=search_params, limit=10, expr='metadata["category"] == "tech"', output_fields=["text", "metadata"] )

Partitionierung

Die Collection für schnellere Abfragen aufteilen :

DEVELOPERpython
# Partitionen erstellen collection.create_partition("partition_2024") collection.create_partition("partition_2025") # In eine spezifische Partition einfügen collection.insert(data, partition_name="partition_2025") # Nur in einer spezifischen Partition suchen results = collection.search( data=[query_embedding], anns_field="embedding", param=search_params, partition_names=["partition_2025"], limit=10 )

Time Travel

Historische Daten abfragen :

DEVELOPERpython
import time # Timestamp vor dem Löschen erhalten ts_before = int(time.time() * 1000) # Daten löschen collection.delete(expr="id in [1, 2, 3]") # Daten abfragen, wie sie vor dem Löschen waren results = collection.search( data=[query_embedding], anns_field="embedding", param=search_params, limit=10, travel_timestamp=ts_before )

Ressourcengruppen

Lasten auf verschiedenen Knoten isolieren :

DEVELOPERpython
from pymilvus import utility # Ressourcengruppen erstellen utility.create_resource_group("rg1", config={"node_num": 2}) utility.create_resource_group("rg2", config={"node_num": 1}) # Einer Collection eine Ressourcengruppe zuweisen collection.set_properties({"resource_groups": ["rg1"]})

Überwachung

DEVELOPERpython
# Statistiken der Collection stats = collection.get_stats() print(f"Row count: {stats['row_count']}") # Fortschritt des Index index = collection.index() print(f"Index state: {index.state}") # Abfragemetriken (Prometheus-Endpunkt) # http://localhost:9091/metrics

Produktions-RAG-Pipeline

DEVELOPERpython
from pymilvus import Collection, connections import openai connections.connect("default", host="milvus-proxy", port="19530") collection = Collection("documents") collection.load() def milvus_rag(query): # Embedding für die Anfrage erstellen query_emb = openai.Embedding.create( input=query, model="text-embedding-3-small" )['data'][0]['embedding'] # In Milvus suchen results = collection.search( data=[query_emb], anns_field="embedding", param={"metric_type": "COSINE", "params": {"nprobe": 32}}, limit=5, output_fields=["text"] ) # Kontext aufbauen context = "\n\n".join([hit.entity.get('text') for hit in results[0]]) # Antwort generieren response = openai.ChatCompletion.create( model="gpt-4-turbo", messages=[{ "role": "user", "content": f"Context: {context}\n\nQuestion: {query}" }] ) return response.choices[0].message.content # Verwendung answer = milvus_rag("What is Milvus?")

Milvus bewältigt die Skalierung auf Milliarden von vectors mit Leichtigkeit. Perfekt für RAG-Deployments im Unternehmensumfeld.

Tags

milvusbase-de-données-vectorielleéchellestorageperformance

Verwandte Artikel

Ailog Assistant

Ici pour vous aider

Salut ! Pose-moi des questions sur Ailog et comment intégrer votre RAG dans vos projets !