Milvus: Billion-Scale Vector Search
Deploy Milvus for production-scale RAG handling billions of vectors with horizontal scaling and GPU acceleration.
- Author
- Ailog Research Team
- Published
- Reading time
- 13 min read
- Level
- advanced
- RAG Pipeline Step
- Storage
Why Milvus?
Built for scale: • Billions of vectors • GPU acceleration • Horizontal scaling • S3/MinIO storage • Kubernetes-native
Used by: Shopify, NVIDIA, Salesforce
Docker Setup
``bash Standalone (development) docker run -d --name milvus -p 19530:19530 -p 9091:9091 \ milvusdb/milvus:v2.3.4 milvus run standalone `
Distributed Deployment
`yaml docker-compose.yml (production) version: '3.8'
services: etcd: image: quay.io/coreos/etcd:v3.5.5
minio: image: minio/minio:RELEASE.2023-03-20T20-16-18Z
milvus-proxy: image: milvusdb/milvus:v2.3.4 command: ["milvus", "run", "proxy"] depends_on: • etcd • minio
milvus-querynode: image: milvusdb/milvus:v2.3.4 command: ["milvus", "run", "querynode"] deploy: replicas: 3 Scale horizontally `
Python Client
`python from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType
Connect connections.connect("default", host="localhost", port="19530")
Define schema fields = [ FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True), FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=1536), FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535), FieldSchema(name="metadata", dtype=DataType.JSON) ]
schema = CollectionSchema(fields=fields, description="RAG documents")
Create collection collection = Collection(name="documents", schema=schema) `
Indexing Strategies
`python IVF_FLAT (balanced) index_params = { "index_type": "IVF_FLAT", "metric_type": "COSINE", "params": {"nlist": 1024} }
HNSW (faster search, more memory) index_params = { "index_type": "HNSW", "metric_type": "COSINE", "params": { "M": 16, "efConstruction": 256 } }
GPU index (10x faster) index_params = { "index_type": "GPU_IVF_FLAT", "metric_type": "COSINE", "params": {"nlist": 2048} }
collection.create_index(field_name="embedding", index_params=index_params) `
Inserting Data
`python Batch insert data = [ [embedding1, embedding2, ...], embeddings ["text1", "text2", ...], text [{"category": "A"}, {"category": "B"}, ...] metadata ]
collection.insert(data) collection.flush() Persist to disk `
Searching
`python Load collection into memory collection.load()
Search search_params = { "metric_type": "COSINE", "params": {"nprobe": 16} Higher = more accurate but slower }
results = collection.search( data=[query_embedding], anns_field="embedding", param=search_params, limit=10, output_fields=["text", "metadata"] )
for hits in results: for hit in hits: print(f"Score: {hit.score}, Text: {hit.entity.get('text')}") `
Filtering
`python Metadata filtering with boolean expressions results = collection.search( data=[query_embedding], anns_field="embedding", param=search_params, limit=10, expr='metadata["category"] == "tech"', output_fields=["text", "metadata"] ) `
Partitioning
Split collection for faster queries:
`python Create partitions collection.create_partition("partition_2024") collection.create_partition("partition_2025")
Insert into specific partition collection.insert(data, partition_name="partition_2025")
Search in specific partition only results = collection.search( data=[query_embedding], anns_field="embedding", param=search_params, partition_names=["partition_2025"], limit=10 ) `
Time Travel
Query historical data:
`python import time
Get timestamp before deletion ts_before = int(time.time() * 1000)
Delete some data collection.delete(expr="id in [1, 2, 3]")
Query data as it was before deletion results = collection.search( data=[query_embedding], anns_field="embedding", param=search_params, limit=10, travel_timestamp=ts_before ) `
Resource Groups
Isolate workloads on different nodes:
`python from pymilvus import utility
Create resource groups utility.create_resource_group("rg1", config={"node_num": 2}) utility.create_resource_group("rg2", config={"node_num": 1})
Assign collection to resource group collection.set_properties({"resource_groups": ["rg1"]}) `
Monitoring
`python Collection stats stats = collection.get_stats() print(f"Row count: {stats['row_count']}")
Index progress index = collection.index() print(f"Index state: {index.state}")
Query metrics (Prometheus endpoint) http://localhost:9091/metrics `
Production RAG Pipeline
`python from pymilvus import Collection, connections import openai
connections.connect("default", host="milvus-proxy", port="19530") collection = Collection("documents") collection.load()
def milvus_rag(query): Embed query query_emb = openai.Embedding.create( input=query, model="text-embedding-3-small" )['data'][0]['embedding']
Search Milvus results = collection.search( data=[query_emb], anns_field="embedding", param={"metric_type": "COSINE", "params": {"nprobe": 32}}, limit=5, output_fields=["text"] )
Build context context = "\n\n".join([hit.entity.get('text') for hit in results[0]])
Generate answer response = openai.ChatCompletion.create( model="gpt-4-turbo", messages=[{ "role": "user", "content": f"Context: {context}\n\nQuestion: {query}" }] )
return response.choices[0].message.content
Usage answer = milvus_rag("What is Milvus?") ``
Milvus handles billion-vector scale with ease. Perfect for enterprise RAG deployments.