RAG Security and Compliance: GDPR, AI Act, and Best Practices
Complete guide to securing your RAG system: GDPR compliance, European AI Act, sensitive data management, and security auditing.
- Author
- Ailog Team
- Published
- Reading time
- 24 min read
- Level
- advanced
RAG Security and Compliance: GDPR, AI Act, and Best Practices
Deploying a RAG system in enterprise involves processing potentially sensitive data. Between GDPR, the European AI Act, and security requirements, compliance has become a major concern. This guide walks you through setting up a secure and compliant RAG architecture.
The Regulatory Landscape
GDPR: Fundamentals for RAG
The General Data Protection Regulation applies whenever your RAG system processes personal data of European residents.
What is considered personal data: • Names, emails, addresses • Order numbers linked to a person • Conversation histories • Preferences and behaviors • Connection data (IP, device)
GDPR principles applicable to RAG:
| Principle | RAG Application | |-----------|-----------------| | Minimization | Only index necessary data | | Purpose limitation | Use data only for declared purpose | | Accuracy | Update obsolete documents | | Storage limitation | Delete data after defined period | | Integrity and confidentiality | Secure access to indexed data | | Accountability | Document processing and measures |
AI Act: New European Regulation
The AI Act classifies AI systems by risk level:
High risk (strict obligations): • HR systems (recruitment, evaluation) • Credit/insurance systems • Medical applications • Legal systems
Limited risk (transparency obligations): • Chatbots (obligation to inform it's an AI) • Recommendation systems
Minimal risk (no specific obligation): • Anti-spam filters • Internal search
For a standard RAG chatbot: • Transparency obligation: Clearly indicate the user is interacting with an AI • Documentation: Maintain a register of automated decisions • Human oversight: Allow escalation to a human
Secure Architecture
Security by Design Principles
`` ┌─────────────────────────────────────────────────────────────┐ │ SECURE PERIMETER │ ├─────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────────────┐ ┌─────────────────────┐ │ │ │ Authentication │ │ Authorization │ │ │ │ (OAuth2/SAML) │───▶│ (RBAC/ABAC) │ │ │ └─────────────────────┘ └──────────┬──────────┘ │ │ │ │ │ ┌─────────────────────────────────────▼──────────────────┐ │ │ │ API Gateway │ │ │ │ - Rate limiting - Input validation - Logging │ │ │ └─────────────────────────────────────┬──────────────────┘ │ │ │ │ │ ┌──────────────────┬─────────────────┬┴────────────────┐ │ │ │ │ │ │ │ │ ▼ ▼ ▼ │ │ │ ┌────────┐ ┌────────────┐ ┌────────────┐ │ │ │ │RAG │ │ Vector DB │ │ LLM │ │ │ │ │Pipeline│──▶│(encrypted) │──▶│(sandboxed) │ │ │ │ └────────┘ └────────────┘ └────────────┘ │ │ │ │ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ │ Audit Logs │ │ │ │ │ - Access - Queries - Responses - Anomalies │ │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ └─────────────────────────────────────────────────────────────┘ `
Data Encryption
`python from cryptography.fernet import Fernet from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC import base64 import os
class SecureDataHandler: def __init__(self, encryption_key: bytes = None): if encryption_key is None: encryption_key = os.environ.get("ENCRYPTION_KEY", "").encode()
Derive a robust key kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=b"ailog_salt_v1", In production: unique salt per tenant iterations=100000, ) key = base64.urlsafe_b64encode(kdf.derive(encryption_key)) self.cipher = Fernet(key)
def encrypt_document(self, document: dict) -> dict: """ Encrypt sensitive fields of a document """ sensitive_fields = ["content", "metadata.author", "metadata.email"] encrypted_doc = document.copy()
for field in sensitive_fields: value = self._get_nested(encrypted_doc, field) if value: encrypted_value = self.cipher.encrypt(str(value).encode()) self._set_nested(encrypted_doc, field, encrypted_value.decode())
return encrypted_doc
def decrypt_document(self, document: dict) -> dict: """ Decrypt sensitive fields """ sensitive_fields = ["content", "metadata.author", "metadata.email"] decrypted_doc = document.copy()
for field in sensitive_fields: value = self._get_nested(decrypted_doc, field) if value: try: decrypted_value = self.cipher.decrypt(value.encode()) self._set_nested(decrypted_doc, field, decrypted_value.decode()) except Exception: pass Field not encrypted
return decrypted_doc
def _get_nested(self, d: dict, path: str): keys = path.split(".") for key in keys: if isinstance(d, dict) and key in d: d = d[key] else: return None return d
def _set_nested(self, d: dict, path: str, value): keys = path.split(".") for key in keys[:-1]: d = d.setdefault(key, {}) d[keys[-1]] = value `
RBAC Access Control
`python from enum import Enum from dataclasses import dataclass from typing import Set
class Permission(Enum): READ_DOCUMENTS = "read_documents" WRITE_DOCUMENTS = "write_documents" DELETE_DOCUMENTS = "delete_documents" MANAGE_USERS = "manage_users" VIEW_ANALYTICS = "view_analytics" ADMIN = "admin"
class Role(Enum): VIEWER = "viewer" EDITOR = "editor" ADMIN = "admin" SUPER_ADMIN = "super_admin"
ROLE_PERMISSIONS = { Role.VIEWER: {Permission.READ_DOCUMENTS}, Role.EDITOR: {Permission.READ_DOCUMENTS, Permission.WRITE_DOCUMENTS}, Role.ADMIN: { Permission.READ_DOCUMENTS, Permission.WRITE_DOCUMENTS, Permission.DELETE_DOCUMENTS, Permission.VIEW_ANALYTICS, Permission.MANAGE_USERS }, Role.SUPER_ADMIN: {p for p in Permission} }
@dataclass class User: id: str email: str role: Role tenant_id: str allowed_collections: Set[str] = None None = all
class RBACManager: def __init__(self, user_service): self.user_service = user_service
async def check_permission( self, user_id: str, permission: Permission, resource_id: str = None ) -> bool: """ Check if user has required permission """ user = await self.user_service.get_user(user_id) if not user: return False
Role permissions role_permissions = ROLE_PERMISSIONS.get(user.role, set()) if permission not in role_permissions: return False
Resource-level verification if needed if resource_id and permission in {Permission.READ_DOCUMENTS, Permission.WRITE_DOCUMENTS}: return self._can_access_resource(user, resource_id)
return True
def _can_access_resource(self, user: User, resource_id: str) -> bool: """ Check access to a specific resource """ If no collection restriction if user.allowed_collections is None: return True
Extract collection from resource_id collection = resource_id.split("_")[0] if "_" in resource_id else resource_id return collection in user.allowed_collections
class SecureRAGPipeline: def __init__(self, rag_pipeline, rbac_manager, audit_logger): self.rag = rag_pipeline self.rbac = rbac_manager self.audit = audit_logger
async def query( self, user_id: str, query: str, collection: str = None ) -> dict: """ Execute a RAG query with access control """ Check permission if not await self.rbac.check_permission( user_id, Permission.READ_DOCUMENTS, collection ): self.audit.log_unauthorized_access(user_id, query, collection) raise PermissionError("Unauthorized access to this collection")
Log query self.audit.log_query(user_id, query, collection)
Execute RAG query with filtering user = await self.rbac.user_service.get_user(user_id) filters = self._build_access_filters(user)
result = await self.rag.query(query, filters=filters)
Log response self.audit.log_response(user_id, query, result)
return result
def _build_access_filters(self, user: User) -> dict: """ Build access filters based on user """ filters = {"tenant_id": user.tenant_id}
if user.allowed_collections: filters["collection"] = {"$in": list(user.allowed_collections)}
return filters `
Personal Data Protection
Anonymization and Pseudonymization
`python import hashlib import re from typing import Callable
class DataAnonymizer: def __init__(self, salt: str = None): self.salt = salt or os.environ.get("ANONYMIZATION_SALT", "default_salt") self.patterns = { "email": r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', "phone_us": r'\b(?:\+1[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b', "iban": r'\b[A-Z]{2}\d{2}[A-Z0-9]{4}\d{7}([A-Z0-9]?){0,16}\b', "ssn_us": r'\b\d{3}-\d{2}-\d{4}\b', "credit_card": r'\b(?:\d{4}[-\s]?){3}\d{4}\b', }
def anonymize_text(self, text: str, strategy: str = "mask") -> str: """ Anonymize personal data in text """ strategies = { "mask": self._mask, "hash": self._hash, "remove": self._remove }
anonymizer = strategies.get(strategy, self._mask)
for pattern_name, pattern in self.patterns.items(): text = re.sub(pattern, lambda m: anonymizer(m.group(), pattern_name), text)
return text
def _mask(self, value: str, pattern_type: str) -> str: """ Mask value keeping first/last characters """ if pattern_type == "email": parts = value.split("@") return f"{parts[0][:2]}**@{parts[1]}" elif pattern_type == "phone_us": return value[:4] + "" (len(value) - 6) + value[-2:] elif pattern_type == "credit_card": return " " + value[-4:] else: return "" * len(value)
def _hash(self, value: str, pattern_type: str) -> str: """ Replace with deterministic hash (pseudonymization) """ hash_input = f"{self.salt}:{value}".encode() return f"[{pattern_type.upper()}_" + hashlib.sha256(hash_input).hexdigest()[:8] + "]"
def _remove(self, value: str, pattern_type: str) -> str: """ Completely remove the value """ return f"[{pattern_type.upper()}_REMOVED]"
class PIIDetector: def __init__(self, llm=None): self.llm = llm self.anonymizer = DataAnonymizer()
async def detect_pii(self, text: str) -> list[dict]: """ Detect PII in text """ pii_found = []
Regex detection for pattern_name, pattern in self.anonymizer.patterns.items(): matches = re.finditer(pattern, text) for match in matches: pii_found.append({ "type": pattern_name, "value": match.group(), "position": (match.start(), match.end()), "method": "regex" })
LLM detection for complex cases if self.llm: llm_pii = await self._detect_with_llm(text) pii_found.extend(llm_pii)
return pii_found
async def _detect_with_llm(self, text: str) -> list[dict]: """ Use LLM to detect PII not covered by regex """ prompt = f""" Analyze this text and identify any personally identifiable information (PII): • People's names • Physical addresses • Dates of birth • Medical information • Financial data
Text: {text[:2000]}
Respond in JSON format: [{{"type": "...", "value": "...", "reason": "..."}}] """
response = await self.llm.generate(prompt, temperature=0) return json.loads(response) `
Consent Management
`python from datetime import datetime from enum import Enum
class ConsentPurpose(Enum): RAG_INDEXING = "rag_indexing" ANALYTICS = "analytics" PERSONALIZATION = "personalization" MARKETING = "marketing"
class ConsentManager: def __init__(self, db): self.db = db
async def record_consent( self, user_id: str, purpose: ConsentPurpose, granted: bool, metadata: dict = None ): """ Record a consent """ consent = { "user_id": user_id, "purpose": purpose.value, "granted": granted, "timestamp": datetime.utcnow(), "ip_address": metadata.get("ip") if metadata else None, "user_agent": metadata.get("user_agent") if metadata else None, "version": "v1.0" Terms version }
await self.db.consents.insert(consent)
If consent withdrawal, trigger deletion if not granted and purpose == ConsentPurpose.RAG_INDEXING: await self._trigger_data_deletion(user_id)
async def check_consent( self, user_id: str, purpose: ConsentPurpose ) -> bool: """ Check if consent is active """ consent = await self.db.consents.find_one( {"user_id": user_id, "purpose": purpose.value}, sort=[("timestamp", -1)] )
return consent and consent.get("granted", False)
async def get_consent_history(self, user_id: str) -> list[dict]: """ Consent history for audit """ return await self.db.consents.find( {"user_id": user_id} ).sort("timestamp", -1).to_list(100)
async def _trigger_data_deletion(self, user_id: str): """ Trigger user data deletion """ Delete from RAG indexes await self._delete_from_rag(user_id)
Anonymize conversation history await self._anonymize_conversations(user_id)
Log for audit await self.db.audit_logs.insert({ "action": "consent_withdrawal_processed", "user_id": user_id, "timestamp": datetime.utcnow() }) `
Right to Erasure (GDPR Article 17)
`python class RightToErasure: def __init__(self, rag_service, conversation_service, audit_service): self.rag = rag_service self.conversations = conversation_service self.audit = audit_service
async def process_erasure_request( self, user_id: str, request_id: str, scope: str = "all" ) -> dict: """ Process a right to erasure request """ result = { "request_id": request_id, "user_id": user_id, "status": "processing", "actions": [] }
try: Delete from RAG indexes if scope in ["all", "rag"]: rag_result = await self._delete_from_rag(user_id) result["actions"].append({ "type": "rag_deletion", "documents_deleted": rag_result["count"] }) Anonymize conversations if scope in ["all", "conversations"]: conv_result = await self._anonymize_conversations(user_id) result["actions"].append({ "type": "conversation_anonymization", "conversations_processed": conv_result["count"] }) Delete profile data if scope in ["all", "profile"]: await self._delete_profile(user_id) result["actions"].append({ "type": "profile_deletion", "status": "completed" }) Audit log await self.audit.log({ "action": "erasure_request_completed", "request_id": request_id, "user_id": user_id, "scope": scope, "actions": result["actions"] })
result["status"] = "completed" result["completed_at"] = datetime.utcnow().isoformat()
except Exception as e: result["status"] = "failed" result["error"] = str(e) await self.audit.log({ "action": "erasure_request_failed", "request_id": request_id, "error": str(e) })
return result
async def _delete_from_rag(self, user_id: str) -> dict: """ Delete user's documents from RAG indexes """ Find all user's documents documents = await self.rag.find_documents_by_user(user_id)
Delete from vector database for doc in documents: await self.rag.delete_document(doc["id"])
return {"count": len(documents)}
async def _anonymize_conversations(self, user_id: str) -> dict: """ Anonymize user's conversations """ conversations = await self.conversations.find_by_user(user_id) anonymizer = DataAnonymizer()
for conv in conversations: for message in conv["messages"]: message["content"] = anonymizer.anonymize_text( message["content"], strategy="remove" ) message["user_id"] = "[DELETED_USER]"
await self.conversations.update(conv["id"], conv)
return {"count": len(conversations)} `
Audit and Traceability
Secure Logging System
`python import json from datetime import datetime import hashlib
class AuditLogger: def __init__(self, storage, integrity_checker=None): self.storage = storage self.integrity = integrity_checker self.previous_hash = None
async def log(self, event: dict): """ Log an event with cryptographic integrity """ log_entry = { "timestamp": datetime.utcnow().isoformat(), "event_type": event.get("action", "unknown"), "user_id": event.get("user_id"), "tenant_id": event.get("tenant_id"), "resource_id": event.get("resource_id"), "details": event, "ip_address": event.get("ip"), "user_agent": event.get("user_agent") }
Hash chain for integrity log_entry["previous_hash"] = self.previous_hash log_entry["hash"] = self._compute_hash(log_entry) self.previous_hash = log_entry["hash"]
await self.storage.insert("audit_logs", log_entry)
Alert if critical event if self._is_critical(event): await self._alert_security_team(log_entry)
def _compute_hash(self, entry: dict) -> str: """ Compute hash for integrity chain """ data = json.dumps({ "timestamp": entry["timestamp"], "event_type": entry["event_type"], "details": entry["details"], "previous_hash": entry["previous_hash"] }, sort_keys=True)
return hashlib.sha256(data.encode()).hexdigest()
def _is_critical(self, event: dict) -> bool: """ Determine if event is critical """ critical_actions = [ "unauthorized_access", "data_export", "bulk_deletion", "permission_escalation", "failed_authentication" ]
return event.get("action") in critical_actions
async def verify_integrity(self, start_date: datetime, end_date: datetime) -> dict: """ Verify log chain integrity """ logs = await self.storage.find( "audit_logs", {"timestamp": {"$gte": start_date.isoformat(), "$lte": end_date.isoformat()}}, sort=[("timestamp", 1)] )
previous_hash = None integrity_ok = True issues = []
for log in logs: Verify previous hash if log.get("previous_hash") != previous_hash: integrity_ok = False issues.append({ "log_id": log["_id"], "issue": "previous_hash_mismatch" })
Verify current hash expected_hash = self._compute_hash({ "timestamp": log["timestamp"], "event_type": log["event_type"], "details": log["details"], "previous_hash": log["previous_hash"] })
if log.get("hash") != expected_hash: integrity_ok = False issues.append({ "log_id": log["_id"], "issue": "hash_mismatch" })
previous_hash = log.get("hash")
return { "integrity_ok": integrity_ok, "logs_checked": len(logs), "issues": issues } `
Compliance Report
`python class ComplianceReporter: def __init__(self, audit_logger, consent_manager, data_service): self.audit = audit_logger self.consent = consent_manager self.data = data_service
async def generate_gdpr_report( self, tenant_id: str, period_start: datetime, period_end: datetime ) -> dict: """ Generate a GDPR compliance report """ return { "report_type": "GDPR_COMPLIANCE", "tenant_id": tenant_id, "period": { "start": period_start.isoformat(), "end": period_end.isoformat() }, "data_inventory": await self._data_inventory(tenant_id), "consent_summary": await self._consent_summary(tenant_id, period_start, period_end), "access_requests": await self._access_requests(tenant_id, period_start, period_end), "erasure_requests": await self._erasure_requests(tenant_id, period_start, period_end), "security_incidents": await self._security_incidents(tenant_id, period_start, period_end), "data_breaches": await self._data_breaches(tenant_id, period_start, period_end), "generated_at": datetime.utcnow().isoformat() }
async def _data_inventory(self, tenant_id: str) -> dict: """ Inventory of processed data """ return { "document_collections": await self.data.count_collections(tenant_id), "total_documents": await self.data.count_documents(tenant_id), "data_categories": ["support_docs", "product_catalog", "faq"], "pii_present": True, "pii_types": ["email", "name", "order_history"], "retention_policy": "24_months", "encryption_status": "encrypted_at_rest" }
async def _consent_summary( self, tenant_id: str, start: datetime, end: datetime ) -> dict: """ Consent summary """ consents = await self.consent.get_statistics(tenant_id, start, end)
return { "total_users": consents["total_users"], "consent_rate": consents["consent_rate"], "withdrawals": consents["withdrawals"], "by_purpose": consents["by_purpose"] } ``
Compliance Checklist
Before Deployment • [ ] Impact assessment (DPIA) completed if sensitive data • [ ] Legal basis identified (consent, legitimate interest, contract) • [ ] Processing registry updated • [ ] Privacy policy mentioning AI usage • [ ] DPO consulted if applicable
Technical • [ ] Encryption at rest and in transit • [ ] Access control RBAC/ABAC implemented • [ ] Anonymization of sensitive data in logs • [ ] Retention automated with deletion • [ ] Audit logs with cryptographic integrity • [ ] Backup encrypted and tested
Organizational • [ ] Breach procedure documented • [ ] Training of teams on GDPR • [ ] Contracts with processors (DPA) • [ ] Contact point for rights exercise requests
Learn More • Introduction to RAG - Understand the fundamentals • Sovereign RAG - Hosting in Europe • Evaluating a RAG System - Quality metrics
---
Simplified Compliance with Ailog
Making a RAG system compliant is complex and time-consuming. With Ailog, benefit from a compliant-by-design infrastructure: • European hosting (OVH, Scaleway) certified • AES-256 encryption at rest and TLS 1.3 in transit • Native RBAC with SSO (SAML, OAuth2) • Automatic anonymization of detected PII • Audit logs with configurable retention • DPA included in contract • Rights exercise: Self-service portal for your users
Discover Ailog and deploy a compliant RAG with peace of mind.