RAG Security and Compliance: GDPR, AI Act, and Best Practices
Complete guide to securing your RAG system: GDPR compliance, European AI Act, sensitive data management, and security auditing.
RAG Security and Compliance: GDPR, AI Act, and Best Practices
Deploying a RAG system in enterprise involves processing potentially sensitive data. Between GDPR, the European AI Act, and security requirements, compliance has become a major concern. This guide walks you through setting up a secure and compliant RAG architecture.
The Regulatory Landscape
GDPR: Fundamentals for RAG
The General Data Protection Regulation applies whenever your RAG system processes personal data of European residents.
What is considered personal data:
- Names, emails, addresses
- Order numbers linked to a person
- Conversation histories
- Preferences and behaviors
- Connection data (IP, device)
GDPR principles applicable to RAG:
| Principle | RAG Application |
|---|---|
| Minimization | Only index necessary data |
| Purpose limitation | Use data only for declared purpose |
| Accuracy | Update obsolete documents |
| Storage limitation | Delete data after defined period |
| Integrity and confidentiality | Secure access to indexed data |
| Accountability | Document processing and measures |
AI Act: New European Regulation
The AI Act classifies AI systems by risk level:
High risk (strict obligations):
- HR systems (recruitment, evaluation)
- Credit/insurance systems
- Medical applications
- Legal systems
Limited risk (transparency obligations):
- Chatbots (obligation to inform it's an AI)
- Recommendation systems
Minimal risk (no specific obligation):
- Anti-spam filters
- Internal search
For a standard RAG chatbot:
- Transparency obligation: Clearly indicate the user is interacting with an AI
- Documentation: Maintain a register of automated decisions
- Human oversight: Allow escalation to a human
Secure Architecture
Security by Design Principles
┌─────────────────────────────────────────────────────────────┐
│ SECURE PERIMETER │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────┐ ┌─────────────────────┐ │
│ │ Authentication │ │ Authorization │ │
│ │ (OAuth2/SAML) │───▶│ (RBAC/ABAC) │ │
│ └─────────────────────┘ └──────────┬──────────┘ │
│ │ │
│ ┌─────────────────────────────────────▼──────────────────┐ │
│ │ API Gateway │ │
│ │ - Rate limiting - Input validation - Logging │ │
│ └─────────────────────────────────────┬──────────────────┘ │
│ │ │
│ ┌──────────────────┬─────────────────┬┴────────────────┐ │
│ │ │ │ │ │
│ ▼ ▼ ▼ │ │
│ ┌────────┐ ┌────────────┐ ┌────────────┐ │ │
│ │RAG │ │ Vector DB │ │ LLM │ │ │
│ │Pipeline│──▶│(encrypted) │──▶│(sandboxed) │ │ │
│ └────────┘ └────────────┘ └────────────┘ │ │
│ │ │
│ ┌─────────────────────────────────────────────────────┐ │ │
│ │ Audit Logs │ │ │
│ │ - Access - Queries - Responses - Anomalies │ │ │
│ └─────────────────────────────────────────────────────┘ │ │
│ │
└─────────────────────────────────────────────────────────────┘
Data Encryption
DEVELOPERpythonfrom cryptography.fernet import Fernet from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC import base64 import os class SecureDataHandler: def __init__(self, encryption_key: bytes = None): if encryption_key is None: encryption_key = os.environ.get("ENCRYPTION_KEY", "").encode() # Derive a robust key kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=b"ailog_salt_v1", # In production: unique salt per tenant iterations=100000, ) key = base64.urlsafe_b64encode(kdf.derive(encryption_key)) self.cipher = Fernet(key) def encrypt_document(self, document: dict) -> dict: """ Encrypt sensitive fields of a document """ sensitive_fields = ["content", "metadata.author", "metadata.email"] encrypted_doc = document.copy() for field in sensitive_fields: value = self._get_nested(encrypted_doc, field) if value: encrypted_value = self.cipher.encrypt(str(value).encode()) self._set_nested(encrypted_doc, field, encrypted_value.decode()) return encrypted_doc def decrypt_document(self, document: dict) -> dict: """ Decrypt sensitive fields """ sensitive_fields = ["content", "metadata.author", "metadata.email"] decrypted_doc = document.copy() for field in sensitive_fields: value = self._get_nested(decrypted_doc, field) if value: try: decrypted_value = self.cipher.decrypt(value.encode()) self._set_nested(decrypted_doc, field, decrypted_value.decode()) except Exception: pass # Field not encrypted return decrypted_doc def _get_nested(self, d: dict, path: str): keys = path.split(".") for key in keys: if isinstance(d, dict) and key in d: d = d[key] else: return None return d def _set_nested(self, d: dict, path: str, value): keys = path.split(".") for key in keys[:-1]: d = d.setdefault(key, {}) d[keys[-1]] = value
RBAC Access Control
DEVELOPERpythonfrom enum import Enum from dataclasses import dataclass from typing import Set class Permission(Enum): READ_DOCUMENTS = "read_documents" WRITE_DOCUMENTS = "write_documents" DELETE_DOCUMENTS = "delete_documents" MANAGE_USERS = "manage_users" VIEW_ANALYTICS = "view_analytics" ADMIN = "admin" class Role(Enum): VIEWER = "viewer" EDITOR = "editor" ADMIN = "admin" SUPER_ADMIN = "super_admin" ROLE_PERMISSIONS = { Role.VIEWER: {Permission.READ_DOCUMENTS}, Role.EDITOR: {Permission.READ_DOCUMENTS, Permission.WRITE_DOCUMENTS}, Role.ADMIN: { Permission.READ_DOCUMENTS, Permission.WRITE_DOCUMENTS, Permission.DELETE_DOCUMENTS, Permission.VIEW_ANALYTICS, Permission.MANAGE_USERS }, Role.SUPER_ADMIN: {p for p in Permission} } @dataclass class User: id: str email: str role: Role tenant_id: str allowed_collections: Set[str] = None # None = all class RBACManager: def __init__(self, user_service): self.user_service = user_service async def check_permission( self, user_id: str, permission: Permission, resource_id: str = None ) -> bool: """ Check if user has required permission """ user = await self.user_service.get_user(user_id) if not user: return False # Role permissions role_permissions = ROLE_PERMISSIONS.get(user.role, set()) if permission not in role_permissions: return False # Resource-level verification if needed if resource_id and permission in {Permission.READ_DOCUMENTS, Permission.WRITE_DOCUMENTS}: return self._can_access_resource(user, resource_id) return True def _can_access_resource(self, user: User, resource_id: str) -> bool: """ Check access to a specific resource """ # If no collection restriction if user.allowed_collections is None: return True # Extract collection from resource_id collection = resource_id.split("_")[0] if "_" in resource_id else resource_id return collection in user.allowed_collections class SecureRAGPipeline: def __init__(self, rag_pipeline, rbac_manager, audit_logger): self.rag = rag_pipeline self.rbac = rbac_manager self.audit = audit_logger async def query( self, user_id: str, query: str, collection: str = None ) -> dict: """ Execute a RAG query with access control """ # Check permission if not await self.rbac.check_permission( user_id, Permission.READ_DOCUMENTS, collection ): self.audit.log_unauthorized_access(user_id, query, collection) raise PermissionError("Unauthorized access to this collection") # Log query self.audit.log_query(user_id, query, collection) # Execute RAG query with filtering user = await self.rbac.user_service.get_user(user_id) filters = self._build_access_filters(user) result = await self.rag.query(query, filters=filters) # Log response self.audit.log_response(user_id, query, result) return result def _build_access_filters(self, user: User) -> dict: """ Build access filters based on user """ filters = {"tenant_id": user.tenant_id} if user.allowed_collections: filters["collection"] = {"$in": list(user.allowed_collections)} return filters
Personal Data Protection
Anonymization and Pseudonymization
DEVELOPERpythonimport hashlib import re from typing import Callable class DataAnonymizer: def __init__(self, salt: str = None): self.salt = salt or os.environ.get("ANONYMIZATION_SALT", "default_salt") self.patterns = { "email": r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', "phone_us": r'\b(?:\+1[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b', "iban": r'\b[A-Z]{2}\d{2}[A-Z0-9]{4}\d{7}([A-Z0-9]?){0,16}\b', "ssn_us": r'\b\d{3}-\d{2}-\d{4}\b', "credit_card": r'\b(?:\d{4}[-\s]?){3}\d{4}\b', } def anonymize_text(self, text: str, strategy: str = "mask") -> str: """ Anonymize personal data in text """ strategies = { "mask": self._mask, "hash": self._hash, "remove": self._remove } anonymizer = strategies.get(strategy, self._mask) for pattern_name, pattern in self.patterns.items(): text = re.sub(pattern, lambda m: anonymizer(m.group(), pattern_name), text) return text def _mask(self, value: str, pattern_type: str) -> str: """ Mask value keeping first/last characters """ if pattern_type == "email": parts = value.split("@") return f"{parts[0][:2]}***@{parts[1]}" elif pattern_type == "phone_us": return value[:4] + "*" * (len(value) - 6) + value[-2:] elif pattern_type == "credit_card": return "**** **** **** " + value[-4:] else: return "*" * len(value) def _hash(self, value: str, pattern_type: str) -> str: """ Replace with deterministic hash (pseudonymization) """ hash_input = f"{self.salt}:{value}".encode() return f"[{pattern_type.upper()}_" + hashlib.sha256(hash_input).hexdigest()[:8] + "]" def _remove(self, value: str, pattern_type: str) -> str: """ Completely remove the value """ return f"[{pattern_type.upper()}_REMOVED]" class PIIDetector: def __init__(self, llm=None): self.llm = llm self.anonymizer = DataAnonymizer() async def detect_pii(self, text: str) -> list[dict]: """ Detect PII in text """ pii_found = [] # Regex detection for pattern_name, pattern in self.anonymizer.patterns.items(): matches = re.finditer(pattern, text) for match in matches: pii_found.append({ "type": pattern_name, "value": match.group(), "position": (match.start(), match.end()), "method": "regex" }) # LLM detection for complex cases if self.llm: llm_pii = await self._detect_with_llm(text) pii_found.extend(llm_pii) return pii_found async def _detect_with_llm(self, text: str) -> list[dict]: """ Use LLM to detect PII not covered by regex """ prompt = f""" Analyze this text and identify any personally identifiable information (PII): - People's names - Physical addresses - Dates of birth - Medical information - Financial data Text: {text[:2000]} Respond in JSON format: [{{"type": "...", "value": "...", "reason": "..."}}] """ response = await self.llm.generate(prompt, temperature=0) return json.loads(response)
Consent Management
DEVELOPERpythonfrom datetime import datetime from enum import Enum class ConsentPurpose(Enum): RAG_INDEXING = "rag_indexing" ANALYTICS = "analytics" PERSONALIZATION = "personalization" MARKETING = "marketing" class ConsentManager: def __init__(self, db): self.db = db async def record_consent( self, user_id: str, purpose: ConsentPurpose, granted: bool, metadata: dict = None ): """ Record a consent """ consent = { "user_id": user_id, "purpose": purpose.value, "granted": granted, "timestamp": datetime.utcnow(), "ip_address": metadata.get("ip") if metadata else None, "user_agent": metadata.get("user_agent") if metadata else None, "version": "v1.0" # Terms version } await self.db.consents.insert(consent) # If consent withdrawal, trigger deletion if not granted and purpose == ConsentPurpose.RAG_INDEXING: await self._trigger_data_deletion(user_id) async def check_consent( self, user_id: str, purpose: ConsentPurpose ) -> bool: """ Check if consent is active """ consent = await self.db.consents.find_one( {"user_id": user_id, "purpose": purpose.value}, sort=[("timestamp", -1)] ) return consent and consent.get("granted", False) async def get_consent_history(self, user_id: str) -> list[dict]: """ Consent history for audit """ return await self.db.consents.find( {"user_id": user_id} ).sort("timestamp", -1).to_list(100) async def _trigger_data_deletion(self, user_id: str): """ Trigger user data deletion """ # Delete from RAG indexes await self._delete_from_rag(user_id) # Anonymize conversation history await self._anonymize_conversations(user_id) # Log for audit await self.db.audit_logs.insert({ "action": "consent_withdrawal_processed", "user_id": user_id, "timestamp": datetime.utcnow() })
Right to Erasure (GDPR Article 17)
DEVELOPERpythonclass RightToErasure: def __init__(self, rag_service, conversation_service, audit_service): self.rag = rag_service self.conversations = conversation_service self.audit = audit_service async def process_erasure_request( self, user_id: str, request_id: str, scope: str = "all" ) -> dict: """ Process a right to erasure request """ result = { "request_id": request_id, "user_id": user_id, "status": "processing", "actions": [] } try: # 1. Delete from RAG indexes if scope in ["all", "rag"]: rag_result = await self._delete_from_rag(user_id) result["actions"].append({ "type": "rag_deletion", "documents_deleted": rag_result["count"] }) # 2. Anonymize conversations if scope in ["all", "conversations"]: conv_result = await self._anonymize_conversations(user_id) result["actions"].append({ "type": "conversation_anonymization", "conversations_processed": conv_result["count"] }) # 3. Delete profile data if scope in ["all", "profile"]: await self._delete_profile(user_id) result["actions"].append({ "type": "profile_deletion", "status": "completed" }) # 4. Audit log await self.audit.log({ "action": "erasure_request_completed", "request_id": request_id, "user_id": user_id, "scope": scope, "actions": result["actions"] }) result["status"] = "completed" result["completed_at"] = datetime.utcnow().isoformat() except Exception as e: result["status"] = "failed" result["error"] = str(e) await self.audit.log({ "action": "erasure_request_failed", "request_id": request_id, "error": str(e) }) return result async def _delete_from_rag(self, user_id: str) -> dict: """ Delete user's documents from RAG indexes """ # Find all user's documents documents = await self.rag.find_documents_by_user(user_id) # Delete from vector database for doc in documents: await self.rag.delete_document(doc["id"]) return {"count": len(documents)} async def _anonymize_conversations(self, user_id: str) -> dict: """ Anonymize user's conversations """ conversations = await self.conversations.find_by_user(user_id) anonymizer = DataAnonymizer() for conv in conversations: for message in conv["messages"]: message["content"] = anonymizer.anonymize_text( message["content"], strategy="remove" ) message["user_id"] = "[DELETED_USER]" await self.conversations.update(conv["id"], conv) return {"count": len(conversations)}
Audit and Traceability
Secure Logging System
DEVELOPERpythonimport json from datetime import datetime import hashlib class AuditLogger: def __init__(self, storage, integrity_checker=None): self.storage = storage self.integrity = integrity_checker self.previous_hash = None async def log(self, event: dict): """ Log an event with cryptographic integrity """ log_entry = { "timestamp": datetime.utcnow().isoformat(), "event_type": event.get("action", "unknown"), "user_id": event.get("user_id"), "tenant_id": event.get("tenant_id"), "resource_id": event.get("resource_id"), "details": event, "ip_address": event.get("ip"), "user_agent": event.get("user_agent") } # Hash chain for integrity log_entry["previous_hash"] = self.previous_hash log_entry["hash"] = self._compute_hash(log_entry) self.previous_hash = log_entry["hash"] await self.storage.insert("audit_logs", log_entry) # Alert if critical event if self._is_critical(event): await self._alert_security_team(log_entry) def _compute_hash(self, entry: dict) -> str: """ Compute hash for integrity chain """ data = json.dumps({ "timestamp": entry["timestamp"], "event_type": entry["event_type"], "details": entry["details"], "previous_hash": entry["previous_hash"] }, sort_keys=True) return hashlib.sha256(data.encode()).hexdigest() def _is_critical(self, event: dict) -> bool: """ Determine if event is critical """ critical_actions = [ "unauthorized_access", "data_export", "bulk_deletion", "permission_escalation", "failed_authentication" ] return event.get("action") in critical_actions async def verify_integrity(self, start_date: datetime, end_date: datetime) -> dict: """ Verify log chain integrity """ logs = await self.storage.find( "audit_logs", {"timestamp": {"$gte": start_date.isoformat(), "$lte": end_date.isoformat()}}, sort=[("timestamp", 1)] ) previous_hash = None integrity_ok = True issues = [] for log in logs: # Verify previous hash if log.get("previous_hash") != previous_hash: integrity_ok = False issues.append({ "log_id": log["_id"], "issue": "previous_hash_mismatch" }) # Verify current hash expected_hash = self._compute_hash({ "timestamp": log["timestamp"], "event_type": log["event_type"], "details": log["details"], "previous_hash": log["previous_hash"] }) if log.get("hash") != expected_hash: integrity_ok = False issues.append({ "log_id": log["_id"], "issue": "hash_mismatch" }) previous_hash = log.get("hash") return { "integrity_ok": integrity_ok, "logs_checked": len(logs), "issues": issues }
Compliance Report
DEVELOPERpythonclass ComplianceReporter: def __init__(self, audit_logger, consent_manager, data_service): self.audit = audit_logger self.consent = consent_manager self.data = data_service async def generate_gdpr_report( self, tenant_id: str, period_start: datetime, period_end: datetime ) -> dict: """ Generate a GDPR compliance report """ return { "report_type": "GDPR_COMPLIANCE", "tenant_id": tenant_id, "period": { "start": period_start.isoformat(), "end": period_end.isoformat() }, "data_inventory": await self._data_inventory(tenant_id), "consent_summary": await self._consent_summary(tenant_id, period_start, period_end), "access_requests": await self._access_requests(tenant_id, period_start, period_end), "erasure_requests": await self._erasure_requests(tenant_id, period_start, period_end), "security_incidents": await self._security_incidents(tenant_id, period_start, period_end), "data_breaches": await self._data_breaches(tenant_id, period_start, period_end), "generated_at": datetime.utcnow().isoformat() } async def _data_inventory(self, tenant_id: str) -> dict: """ Inventory of processed data """ return { "document_collections": await self.data.count_collections(tenant_id), "total_documents": await self.data.count_documents(tenant_id), "data_categories": ["support_docs", "product_catalog", "faq"], "pii_present": True, "pii_types": ["email", "name", "order_history"], "retention_policy": "24_months", "encryption_status": "encrypted_at_rest" } async def _consent_summary( self, tenant_id: str, start: datetime, end: datetime ) -> dict: """ Consent summary """ consents = await self.consent.get_statistics(tenant_id, start, end) return { "total_users": consents["total_users"], "consent_rate": consents["consent_rate"], "withdrawals": consents["withdrawals"], "by_purpose": consents["by_purpose"] }
Compliance Checklist
Before Deployment
- Impact assessment (DPIA) completed if sensitive data
- Legal basis identified (consent, legitimate interest, contract)
- Processing registry updated
- Privacy policy mentioning AI usage
- DPO consulted if applicable
Technical
- Encryption at rest and in transit
- Access control RBAC/ABAC implemented
- Anonymization of sensitive data in logs
- Retention automated with deletion
- Audit logs with cryptographic integrity
- Backup encrypted and tested
Organizational
- Breach procedure documented
- Training of teams on GDPR
- Contracts with processors (DPA)
- Contact point for rights exercise requests
Learn More
- Introduction to RAG - Understand the fundamentals
- Sovereign RAG - Hosting in Europe
- Evaluating a RAG System - Quality metrics
Simplified Compliance with Ailog
Making a RAG system compliant is complex and time-consuming. With Ailog, benefit from a compliant-by-design infrastructure:
- European hosting (OVH, Scaleway) certified
- AES-256 encryption at rest and TLS 1.3 in transit
- Native RBAC with SSO (SAML, OAuth2)
- Automatic anonymization of detected PII
- Audit logs with configurable retention
- DPA included in contract
- Rights exercise: Self-service portal for your users
Discover Ailog and deploy a compliant RAG with peace of mind.
Tags
Related Posts
Magento: Intelligent Catalog Assistant
Deploy an AI assistant on Magento to navigate complex catalogs, recommend products and improve B2B and B2C experience.
Upsell and Cross-sell: AI-Powered Personalized Recommendations
Increase average order value with intelligent AI recommendations: upsell, cross-sell, and personalized bundles powered by RAG.
Dynamic E-commerce FAQ: Generate Contextual Answers
Create an intelligent FAQ for your online store: personalized answers based on product, customer, and purchase context.