RAG Security and Compliance: GDPR, AI Act, and Best Practices

Complete guide to securing your RAG system: GDPR compliance, European AI Act, sensitive data management, and security auditing.

Author
Ailog Team
Published
Reading time
24 min read
Level
advanced

RAG Security and Compliance: GDPR, AI Act, and Best Practices

Deploying a RAG system in enterprise involves processing potentially sensitive data. Between GDPR, the European AI Act, and security requirements, compliance has become a major concern. This guide walks you through setting up a secure and compliant RAG architecture.

The Regulatory Landscape

GDPR: Fundamentals for RAG

The General Data Protection Regulation applies whenever your RAG system processes personal data of European residents.

What is considered personal data: • Names, emails, addresses • Order numbers linked to a person • Conversation histories • Preferences and behaviors • Connection data (IP, device)

GDPR principles applicable to RAG:

| Principle | RAG Application | |-----------|-----------------| | Minimization | Only index necessary data | | Purpose limitation | Use data only for declared purpose | | Accuracy | Update obsolete documents | | Storage limitation | Delete data after defined period | | Integrity and confidentiality | Secure access to indexed data | | Accountability | Document processing and measures |

AI Act: New European Regulation

The AI Act classifies AI systems by risk level:

High risk (strict obligations): • HR systems (recruitment, evaluation) • Credit/insurance systems • Medical applications • Legal systems

Limited risk (transparency obligations): • Chatbots (obligation to inform it's an AI) • Recommendation systems

Minimal risk (no specific obligation): • Anti-spam filters • Internal search

For a standard RAG chatbot: • Transparency obligation: Clearly indicate the user is interacting with an AI • Documentation: Maintain a register of automated decisions • Human oversight: Allow escalation to a human

Secure Architecture

Security by Design Principles

`` ┌─────────────────────────────────────────────────────────────┐ │ SECURE PERIMETER │ ├─────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────────────┐ ┌─────────────────────┐ │ │ │ Authentication │ │ Authorization │ │ │ │ (OAuth2/SAML) │───▶│ (RBAC/ABAC) │ │ │ └─────────────────────┘ └──────────┬──────────┘ │ │ │ │ │ ┌─────────────────────────────────────▼──────────────────┐ │ │ │ API Gateway │ │ │ │ - Rate limiting - Input validation - Logging │ │ │ └─────────────────────────────────────┬──────────────────┘ │ │ │ │ │ ┌──────────────────┬─────────────────┬┴────────────────┐ │ │ │ │ │ │ │ │ ▼ ▼ ▼ │ │ │ ┌────────┐ ┌────────────┐ ┌────────────┐ │ │ │ │RAG │ │ Vector DB │ │ LLM │ │ │ │ │Pipeline│──▶│(encrypted) │──▶│(sandboxed) │ │ │ │ └────────┘ └────────────┘ └────────────┘ │ │ │ │ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ │ Audit Logs │ │ │ │ │ - Access - Queries - Responses - Anomalies │ │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ └─────────────────────────────────────────────────────────────┘ `

Data Encryption

`python from cryptography.fernet import Fernet from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC import base64 import os

class SecureDataHandler: def __init__(self, encryption_key: bytes = None): if encryption_key is None: encryption_key = os.environ.get("ENCRYPTION_KEY", "").encode()

Derive a robust key kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=b"ailog_salt_v1", In production: unique salt per tenant iterations=100000, ) key = base64.urlsafe_b64encode(kdf.derive(encryption_key)) self.cipher = Fernet(key)

def encrypt_document(self, document: dict) -> dict: """ Encrypt sensitive fields of a document """ sensitive_fields = ["content", "metadata.author", "metadata.email"] encrypted_doc = document.copy()

for field in sensitive_fields: value = self._get_nested(encrypted_doc, field) if value: encrypted_value = self.cipher.encrypt(str(value).encode()) self._set_nested(encrypted_doc, field, encrypted_value.decode())

return encrypted_doc

def decrypt_document(self, document: dict) -> dict: """ Decrypt sensitive fields """ sensitive_fields = ["content", "metadata.author", "metadata.email"] decrypted_doc = document.copy()

for field in sensitive_fields: value = self._get_nested(decrypted_doc, field) if value: try: decrypted_value = self.cipher.decrypt(value.encode()) self._set_nested(decrypted_doc, field, decrypted_value.decode()) except Exception: pass Field not encrypted

return decrypted_doc

def _get_nested(self, d: dict, path: str): keys = path.split(".") for key in keys: if isinstance(d, dict) and key in d: d = d[key] else: return None return d

def _set_nested(self, d: dict, path: str, value): keys = path.split(".") for key in keys[:-1]: d = d.setdefault(key, {}) d[keys[-1]] = value `

RBAC Access Control

`python from enum import Enum from dataclasses import dataclass from typing import Set

class Permission(Enum): READ_DOCUMENTS = "read_documents" WRITE_DOCUMENTS = "write_documents" DELETE_DOCUMENTS = "delete_documents" MANAGE_USERS = "manage_users" VIEW_ANALYTICS = "view_analytics" ADMIN = "admin"

class Role(Enum): VIEWER = "viewer" EDITOR = "editor" ADMIN = "admin" SUPER_ADMIN = "super_admin"

ROLE_PERMISSIONS = { Role.VIEWER: {Permission.READ_DOCUMENTS}, Role.EDITOR: {Permission.READ_DOCUMENTS, Permission.WRITE_DOCUMENTS}, Role.ADMIN: { Permission.READ_DOCUMENTS, Permission.WRITE_DOCUMENTS, Permission.DELETE_DOCUMENTS, Permission.VIEW_ANALYTICS, Permission.MANAGE_USERS }, Role.SUPER_ADMIN: {p for p in Permission} }

@dataclass class User: id: str email: str role: Role tenant_id: str allowed_collections: Set[str] = None None = all

class RBACManager: def __init__(self, user_service): self.user_service = user_service

async def check_permission( self, user_id: str, permission: Permission, resource_id: str = None ) -> bool: """ Check if user has required permission """ user = await self.user_service.get_user(user_id) if not user: return False

Role permissions role_permissions = ROLE_PERMISSIONS.get(user.role, set()) if permission not in role_permissions: return False

Resource-level verification if needed if resource_id and permission in {Permission.READ_DOCUMENTS, Permission.WRITE_DOCUMENTS}: return self._can_access_resource(user, resource_id)

return True

def _can_access_resource(self, user: User, resource_id: str) -> bool: """ Check access to a specific resource """ If no collection restriction if user.allowed_collections is None: return True

Extract collection from resource_id collection = resource_id.split("_")[0] if "_" in resource_id else resource_id return collection in user.allowed_collections

class SecureRAGPipeline: def __init__(self, rag_pipeline, rbac_manager, audit_logger): self.rag = rag_pipeline self.rbac = rbac_manager self.audit = audit_logger

async def query( self, user_id: str, query: str, collection: str = None ) -> dict: """ Execute a RAG query with access control """ Check permission if not await self.rbac.check_permission( user_id, Permission.READ_DOCUMENTS, collection ): self.audit.log_unauthorized_access(user_id, query, collection) raise PermissionError("Unauthorized access to this collection")

Log query self.audit.log_query(user_id, query, collection)

Execute RAG query with filtering user = await self.rbac.user_service.get_user(user_id) filters = self._build_access_filters(user)

result = await self.rag.query(query, filters=filters)

Log response self.audit.log_response(user_id, query, result)

return result

def _build_access_filters(self, user: User) -> dict: """ Build access filters based on user """ filters = {"tenant_id": user.tenant_id}

if user.allowed_collections: filters["collection"] = {"$in": list(user.allowed_collections)}

return filters `

Personal Data Protection

Anonymization and Pseudonymization

`python import hashlib import re from typing import Callable

class DataAnonymizer: def __init__(self, salt: str = None): self.salt = salt or os.environ.get("ANONYMIZATION_SALT", "default_salt") self.patterns = { "email": r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', "phone_us": r'\b(?:\+1[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b', "iban": r'\b[A-Z]{2}\d{2}[A-Z0-9]{4}\d{7}([A-Z0-9]?){0,16}\b', "ssn_us": r'\b\d{3}-\d{2}-\d{4}\b', "credit_card": r'\b(?:\d{4}[-\s]?){3}\d{4}\b', }

def anonymize_text(self, text: str, strategy: str = "mask") -> str: """ Anonymize personal data in text """ strategies = { "mask": self._mask, "hash": self._hash, "remove": self._remove }

anonymizer = strategies.get(strategy, self._mask)

for pattern_name, pattern in self.patterns.items(): text = re.sub(pattern, lambda m: anonymizer(m.group(), pattern_name), text)

return text

def _mask(self, value: str, pattern_type: str) -> str: """ Mask value keeping first/last characters """ if pattern_type == "email": parts = value.split("@") return f"{parts[0][:2]}**@{parts[1]}" elif pattern_type == "phone_us": return value[:4] + "" (len(value) - 6) + value[-2:] elif pattern_type == "credit_card": return " " + value[-4:] else: return "" * len(value)

def _hash(self, value: str, pattern_type: str) -> str: """ Replace with deterministic hash (pseudonymization) """ hash_input = f"{self.salt}:{value}".encode() return f"[{pattern_type.upper()}_" + hashlib.sha256(hash_input).hexdigest()[:8] + "]"

def _remove(self, value: str, pattern_type: str) -> str: """ Completely remove the value """ return f"[{pattern_type.upper()}_REMOVED]"

class PIIDetector: def __init__(self, llm=None): self.llm = llm self.anonymizer = DataAnonymizer()

async def detect_pii(self, text: str) -> list[dict]: """ Detect PII in text """ pii_found = []

Regex detection for pattern_name, pattern in self.anonymizer.patterns.items(): matches = re.finditer(pattern, text) for match in matches: pii_found.append({ "type": pattern_name, "value": match.group(), "position": (match.start(), match.end()), "method": "regex" })

LLM detection for complex cases if self.llm: llm_pii = await self._detect_with_llm(text) pii_found.extend(llm_pii)

return pii_found

async def _detect_with_llm(self, text: str) -> list[dict]: """ Use LLM to detect PII not covered by regex """ prompt = f""" Analyze this text and identify any personally identifiable information (PII): • People's names • Physical addresses • Dates of birth • Medical information • Financial data

Text: {text[:2000]}

Respond in JSON format: [{{"type": "...", "value": "...", "reason": "..."}}] """

response = await self.llm.generate(prompt, temperature=0) return json.loads(response) `

Consent Management

`python from datetime import datetime from enum import Enum

class ConsentPurpose(Enum): RAG_INDEXING = "rag_indexing" ANALYTICS = "analytics" PERSONALIZATION = "personalization" MARKETING = "marketing"

class ConsentManager: def __init__(self, db): self.db = db

async def record_consent( self, user_id: str, purpose: ConsentPurpose, granted: bool, metadata: dict = None ): """ Record a consent """ consent = { "user_id": user_id, "purpose": purpose.value, "granted": granted, "timestamp": datetime.utcnow(), "ip_address": metadata.get("ip") if metadata else None, "user_agent": metadata.get("user_agent") if metadata else None, "version": "v1.0" Terms version }

await self.db.consents.insert(consent)

If consent withdrawal, trigger deletion if not granted and purpose == ConsentPurpose.RAG_INDEXING: await self._trigger_data_deletion(user_id)

async def check_consent( self, user_id: str, purpose: ConsentPurpose ) -> bool: """ Check if consent is active """ consent = await self.db.consents.find_one( {"user_id": user_id, "purpose": purpose.value}, sort=[("timestamp", -1)] )

return consent and consent.get("granted", False)

async def get_consent_history(self, user_id: str) -> list[dict]: """ Consent history for audit """ return await self.db.consents.find( {"user_id": user_id} ).sort("timestamp", -1).to_list(100)

async def _trigger_data_deletion(self, user_id: str): """ Trigger user data deletion """ Delete from RAG indexes await self._delete_from_rag(user_id)

Anonymize conversation history await self._anonymize_conversations(user_id)

Log for audit await self.db.audit_logs.insert({ "action": "consent_withdrawal_processed", "user_id": user_id, "timestamp": datetime.utcnow() }) `

Right to Erasure (GDPR Article 17)

`python class RightToErasure: def __init__(self, rag_service, conversation_service, audit_service): self.rag = rag_service self.conversations = conversation_service self.audit = audit_service

async def process_erasure_request( self, user_id: str, request_id: str, scope: str = "all" ) -> dict: """ Process a right to erasure request """ result = { "request_id": request_id, "user_id": user_id, "status": "processing", "actions": [] }

try: Delete from RAG indexes if scope in ["all", "rag"]: rag_result = await self._delete_from_rag(user_id) result["actions"].append({ "type": "rag_deletion", "documents_deleted": rag_result["count"] }) Anonymize conversations if scope in ["all", "conversations"]: conv_result = await self._anonymize_conversations(user_id) result["actions"].append({ "type": "conversation_anonymization", "conversations_processed": conv_result["count"] }) Delete profile data if scope in ["all", "profile"]: await self._delete_profile(user_id) result["actions"].append({ "type": "profile_deletion", "status": "completed" }) Audit log await self.audit.log({ "action": "erasure_request_completed", "request_id": request_id, "user_id": user_id, "scope": scope, "actions": result["actions"] })

result["status"] = "completed" result["completed_at"] = datetime.utcnow().isoformat()

except Exception as e: result["status"] = "failed" result["error"] = str(e) await self.audit.log({ "action": "erasure_request_failed", "request_id": request_id, "error": str(e) })

return result

async def _delete_from_rag(self, user_id: str) -> dict: """ Delete user's documents from RAG indexes """ Find all user's documents documents = await self.rag.find_documents_by_user(user_id)

Delete from vector database for doc in documents: await self.rag.delete_document(doc["id"])

return {"count": len(documents)}

async def _anonymize_conversations(self, user_id: str) -> dict: """ Anonymize user's conversations """ conversations = await self.conversations.find_by_user(user_id) anonymizer = DataAnonymizer()

for conv in conversations: for message in conv["messages"]: message["content"] = anonymizer.anonymize_text( message["content"], strategy="remove" ) message["user_id"] = "[DELETED_USER]"

await self.conversations.update(conv["id"], conv)

return {"count": len(conversations)} `

Audit and Traceability

Secure Logging System

`python import json from datetime import datetime import hashlib

class AuditLogger: def __init__(self, storage, integrity_checker=None): self.storage = storage self.integrity = integrity_checker self.previous_hash = None

async def log(self, event: dict): """ Log an event with cryptographic integrity """ log_entry = { "timestamp": datetime.utcnow().isoformat(), "event_type": event.get("action", "unknown"), "user_id": event.get("user_id"), "tenant_id": event.get("tenant_id"), "resource_id": event.get("resource_id"), "details": event, "ip_address": event.get("ip"), "user_agent": event.get("user_agent") }

Hash chain for integrity log_entry["previous_hash"] = self.previous_hash log_entry["hash"] = self._compute_hash(log_entry) self.previous_hash = log_entry["hash"]

await self.storage.insert("audit_logs", log_entry)

Alert if critical event if self._is_critical(event): await self._alert_security_team(log_entry)

def _compute_hash(self, entry: dict) -> str: """ Compute hash for integrity chain """ data = json.dumps({ "timestamp": entry["timestamp"], "event_type": entry["event_type"], "details": entry["details"], "previous_hash": entry["previous_hash"] }, sort_keys=True)

return hashlib.sha256(data.encode()).hexdigest()

def _is_critical(self, event: dict) -> bool: """ Determine if event is critical """ critical_actions = [ "unauthorized_access", "data_export", "bulk_deletion", "permission_escalation", "failed_authentication" ]

return event.get("action") in critical_actions

async def verify_integrity(self, start_date: datetime, end_date: datetime) -> dict: """ Verify log chain integrity """ logs = await self.storage.find( "audit_logs", {"timestamp": {"$gte": start_date.isoformat(), "$lte": end_date.isoformat()}}, sort=[("timestamp", 1)] )

previous_hash = None integrity_ok = True issues = []

for log in logs: Verify previous hash if log.get("previous_hash") != previous_hash: integrity_ok = False issues.append({ "log_id": log["_id"], "issue": "previous_hash_mismatch" })

Verify current hash expected_hash = self._compute_hash({ "timestamp": log["timestamp"], "event_type": log["event_type"], "details": log["details"], "previous_hash": log["previous_hash"] })

if log.get("hash") != expected_hash: integrity_ok = False issues.append({ "log_id": log["_id"], "issue": "hash_mismatch" })

previous_hash = log.get("hash")

return { "integrity_ok": integrity_ok, "logs_checked": len(logs), "issues": issues } `

Compliance Report

`python class ComplianceReporter: def __init__(self, audit_logger, consent_manager, data_service): self.audit = audit_logger self.consent = consent_manager self.data = data_service

async def generate_gdpr_report( self, tenant_id: str, period_start: datetime, period_end: datetime ) -> dict: """ Generate a GDPR compliance report """ return { "report_type": "GDPR_COMPLIANCE", "tenant_id": tenant_id, "period": { "start": period_start.isoformat(), "end": period_end.isoformat() }, "data_inventory": await self._data_inventory(tenant_id), "consent_summary": await self._consent_summary(tenant_id, period_start, period_end), "access_requests": await self._access_requests(tenant_id, period_start, period_end), "erasure_requests": await self._erasure_requests(tenant_id, period_start, period_end), "security_incidents": await self._security_incidents(tenant_id, period_start, period_end), "data_breaches": await self._data_breaches(tenant_id, period_start, period_end), "generated_at": datetime.utcnow().isoformat() }

async def _data_inventory(self, tenant_id: str) -> dict: """ Inventory of processed data """ return { "document_collections": await self.data.count_collections(tenant_id), "total_documents": await self.data.count_documents(tenant_id), "data_categories": ["support_docs", "product_catalog", "faq"], "pii_present": True, "pii_types": ["email", "name", "order_history"], "retention_policy": "24_months", "encryption_status": "encrypted_at_rest" }

async def _consent_summary( self, tenant_id: str, start: datetime, end: datetime ) -> dict: """ Consent summary """ consents = await self.consent.get_statistics(tenant_id, start, end)

return { "total_users": consents["total_users"], "consent_rate": consents["consent_rate"], "withdrawals": consents["withdrawals"], "by_purpose": consents["by_purpose"] } ``

Compliance Checklist

Before Deployment • [ ] Impact assessment (DPIA) completed if sensitive data • [ ] Legal basis identified (consent, legitimate interest, contract) • [ ] Processing registry updated • [ ] Privacy policy mentioning AI usage • [ ] DPO consulted if applicable

Technical • [ ] Encryption at rest and in transit • [ ] Access control RBAC/ABAC implemented • [ ] Anonymization of sensitive data in logs • [ ] Retention automated with deletion • [ ] Audit logs with cryptographic integrity • [ ] Backup encrypted and tested

Organizational • [ ] Breach procedure documented • [ ] Training of teams on GDPR • [ ] Contracts with processors (DPA) • [ ] Contact point for rights exercise requests

Learn More • Introduction to RAG - Understand the fundamentals • Sovereign RAG - Hosting in Europe • Evaluating a RAG System - Quality metrics

---

Simplified Compliance with Ailog

Making a RAG system compliant is complex and time-consuming. With Ailog, benefit from a compliant-by-design infrastructure: • European hosting (OVH, Scaleway) certified • AES-256 encryption at rest and TLS 1.3 in transit • Native RBAC with SSO (SAML, OAuth2) • Automatic anonymization of detected PII • Audit logs with configurable retention • DPA included in contract • Rights exercise: Self-service portal for your users

Discover Ailog and deploy a compliant RAG with peace of mind.

Tags

  • RAG
  • security
  • GDPR
  • AI Act
  • compliance
  • personal data
GuideAvancé

RAG Security and Compliance: GDPR, AI Act, and Best Practices

25 janvier 2026
24 min read
Ailog Team

Complete guide to securing your RAG system: GDPR compliance, European AI Act, sensitive data management, and security auditing.

RAG Security and Compliance: GDPR, AI Act, and Best Practices

Deploying a RAG system in enterprise involves processing potentially sensitive data. Between GDPR, the European AI Act, and security requirements, compliance has become a major concern. This guide walks you through setting up a secure and compliant RAG architecture.

The Regulatory Landscape

GDPR: Fundamentals for RAG

The General Data Protection Regulation applies whenever your RAG system processes personal data of European residents.

What is considered personal data:

  • Names, emails, addresses
  • Order numbers linked to a person
  • Conversation histories
  • Preferences and behaviors
  • Connection data (IP, device)

GDPR principles applicable to RAG:

PrincipleRAG Application
MinimizationOnly index necessary data
Purpose limitationUse data only for declared purpose
AccuracyUpdate obsolete documents
Storage limitationDelete data after defined period
Integrity and confidentialitySecure access to indexed data
AccountabilityDocument processing and measures

AI Act: New European Regulation

The AI Act classifies AI systems by risk level:

High risk (strict obligations):

  • HR systems (recruitment, evaluation)
  • Credit/insurance systems
  • Medical applications
  • Legal systems

Limited risk (transparency obligations):

  • Chatbots (obligation to inform it's an AI)
  • Recommendation systems

Minimal risk (no specific obligation):

  • Anti-spam filters
  • Internal search

For a standard RAG chatbot:

  • Transparency obligation: Clearly indicate the user is interacting with an AI
  • Documentation: Maintain a register of automated decisions
  • Human oversight: Allow escalation to a human

Secure Architecture

Security by Design Principles

┌─────────────────────────────────────────────────────────────┐
│                    SECURE PERIMETER                          │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  ┌─────────────────────┐    ┌─────────────────────┐         │
│  │   Authentication    │    │    Authorization    │         │
│  │   (OAuth2/SAML)     │───▶│   (RBAC/ABAC)       │         │
│  └─────────────────────┘    └──────────┬──────────┘         │
│                                        │                     │
│  ┌─────────────────────────────────────▼──────────────────┐ │
│  │                     API Gateway                         │ │
│  │  - Rate limiting  - Input validation  - Logging        │ │
│  └─────────────────────────────────────┬──────────────────┘ │
│                                        │                     │
│  ┌──────────────────┬─────────────────┬┴────────────────┐   │
│  │                  │                 │                  │   │
│  ▼                  ▼                 ▼                  │   │
│ ┌────────┐   ┌────────────┐   ┌────────────┐            │   │
│ │RAG     │   │ Vector DB  │   │ LLM        │            │   │
│ │Pipeline│──▶│(encrypted) │──▶│(sandboxed) │            │   │
│ └────────┘   └────────────┘   └────────────┘            │   │
│                                                          │   │
│  ┌─────────────────────────────────────────────────────┐ │   │
│  │                    Audit Logs                        │ │   │
│  │  - Access  - Queries  - Responses  - Anomalies      │ │   │
│  └─────────────────────────────────────────────────────┘ │   │
│                                                              │
└─────────────────────────────────────────────────────────────┘

Data Encryption

DEVELOPERpython
from cryptography.fernet import Fernet from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC import base64 import os class SecureDataHandler: def __init__(self, encryption_key: bytes = None): if encryption_key is None: encryption_key = os.environ.get("ENCRYPTION_KEY", "").encode() # Derive a robust key kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=b"ailog_salt_v1", # In production: unique salt per tenant iterations=100000, ) key = base64.urlsafe_b64encode(kdf.derive(encryption_key)) self.cipher = Fernet(key) def encrypt_document(self, document: dict) -> dict: """ Encrypt sensitive fields of a document """ sensitive_fields = ["content", "metadata.author", "metadata.email"] encrypted_doc = document.copy() for field in sensitive_fields: value = self._get_nested(encrypted_doc, field) if value: encrypted_value = self.cipher.encrypt(str(value).encode()) self._set_nested(encrypted_doc, field, encrypted_value.decode()) return encrypted_doc def decrypt_document(self, document: dict) -> dict: """ Decrypt sensitive fields """ sensitive_fields = ["content", "metadata.author", "metadata.email"] decrypted_doc = document.copy() for field in sensitive_fields: value = self._get_nested(decrypted_doc, field) if value: try: decrypted_value = self.cipher.decrypt(value.encode()) self._set_nested(decrypted_doc, field, decrypted_value.decode()) except Exception: pass # Field not encrypted return decrypted_doc def _get_nested(self, d: dict, path: str): keys = path.split(".") for key in keys: if isinstance(d, dict) and key in d: d = d[key] else: return None return d def _set_nested(self, d: dict, path: str, value): keys = path.split(".") for key in keys[:-1]: d = d.setdefault(key, {}) d[keys[-1]] = value

RBAC Access Control

DEVELOPERpython
from enum import Enum from dataclasses import dataclass from typing import Set class Permission(Enum): READ_DOCUMENTS = "read_documents" WRITE_DOCUMENTS = "write_documents" DELETE_DOCUMENTS = "delete_documents" MANAGE_USERS = "manage_users" VIEW_ANALYTICS = "view_analytics" ADMIN = "admin" class Role(Enum): VIEWER = "viewer" EDITOR = "editor" ADMIN = "admin" SUPER_ADMIN = "super_admin" ROLE_PERMISSIONS = { Role.VIEWER: {Permission.READ_DOCUMENTS}, Role.EDITOR: {Permission.READ_DOCUMENTS, Permission.WRITE_DOCUMENTS}, Role.ADMIN: { Permission.READ_DOCUMENTS, Permission.WRITE_DOCUMENTS, Permission.DELETE_DOCUMENTS, Permission.VIEW_ANALYTICS, Permission.MANAGE_USERS }, Role.SUPER_ADMIN: {p for p in Permission} } @dataclass class User: id: str email: str role: Role tenant_id: str allowed_collections: Set[str] = None # None = all class RBACManager: def __init__(self, user_service): self.user_service = user_service async def check_permission( self, user_id: str, permission: Permission, resource_id: str = None ) -> bool: """ Check if user has required permission """ user = await self.user_service.get_user(user_id) if not user: return False # Role permissions role_permissions = ROLE_PERMISSIONS.get(user.role, set()) if permission not in role_permissions: return False # Resource-level verification if needed if resource_id and permission in {Permission.READ_DOCUMENTS, Permission.WRITE_DOCUMENTS}: return self._can_access_resource(user, resource_id) return True def _can_access_resource(self, user: User, resource_id: str) -> bool: """ Check access to a specific resource """ # If no collection restriction if user.allowed_collections is None: return True # Extract collection from resource_id collection = resource_id.split("_")[0] if "_" in resource_id else resource_id return collection in user.allowed_collections class SecureRAGPipeline: def __init__(self, rag_pipeline, rbac_manager, audit_logger): self.rag = rag_pipeline self.rbac = rbac_manager self.audit = audit_logger async def query( self, user_id: str, query: str, collection: str = None ) -> dict: """ Execute a RAG query with access control """ # Check permission if not await self.rbac.check_permission( user_id, Permission.READ_DOCUMENTS, collection ): self.audit.log_unauthorized_access(user_id, query, collection) raise PermissionError("Unauthorized access to this collection") # Log query self.audit.log_query(user_id, query, collection) # Execute RAG query with filtering user = await self.rbac.user_service.get_user(user_id) filters = self._build_access_filters(user) result = await self.rag.query(query, filters=filters) # Log response self.audit.log_response(user_id, query, result) return result def _build_access_filters(self, user: User) -> dict: """ Build access filters based on user """ filters = {"tenant_id": user.tenant_id} if user.allowed_collections: filters["collection"] = {"$in": list(user.allowed_collections)} return filters

Personal Data Protection

Anonymization and Pseudonymization

DEVELOPERpython
import hashlib import re from typing import Callable class DataAnonymizer: def __init__(self, salt: str = None): self.salt = salt or os.environ.get("ANONYMIZATION_SALT", "default_salt") self.patterns = { "email": r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', "phone_us": r'\b(?:\+1[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b', "iban": r'\b[A-Z]{2}\d{2}[A-Z0-9]{4}\d{7}([A-Z0-9]?){0,16}\b', "ssn_us": r'\b\d{3}-\d{2}-\d{4}\b', "credit_card": r'\b(?:\d{4}[-\s]?){3}\d{4}\b', } def anonymize_text(self, text: str, strategy: str = "mask") -> str: """ Anonymize personal data in text """ strategies = { "mask": self._mask, "hash": self._hash, "remove": self._remove } anonymizer = strategies.get(strategy, self._mask) for pattern_name, pattern in self.patterns.items(): text = re.sub(pattern, lambda m: anonymizer(m.group(), pattern_name), text) return text def _mask(self, value: str, pattern_type: str) -> str: """ Mask value keeping first/last characters """ if pattern_type == "email": parts = value.split("@") return f"{parts[0][:2]}***@{parts[1]}" elif pattern_type == "phone_us": return value[:4] + "*" * (len(value) - 6) + value[-2:] elif pattern_type == "credit_card": return "**** **** **** " + value[-4:] else: return "*" * len(value) def _hash(self, value: str, pattern_type: str) -> str: """ Replace with deterministic hash (pseudonymization) """ hash_input = f"{self.salt}:{value}".encode() return f"[{pattern_type.upper()}_" + hashlib.sha256(hash_input).hexdigest()[:8] + "]" def _remove(self, value: str, pattern_type: str) -> str: """ Completely remove the value """ return f"[{pattern_type.upper()}_REMOVED]" class PIIDetector: def __init__(self, llm=None): self.llm = llm self.anonymizer = DataAnonymizer() async def detect_pii(self, text: str) -> list[dict]: """ Detect PII in text """ pii_found = [] # Regex detection for pattern_name, pattern in self.anonymizer.patterns.items(): matches = re.finditer(pattern, text) for match in matches: pii_found.append({ "type": pattern_name, "value": match.group(), "position": (match.start(), match.end()), "method": "regex" }) # LLM detection for complex cases if self.llm: llm_pii = await self._detect_with_llm(text) pii_found.extend(llm_pii) return pii_found async def _detect_with_llm(self, text: str) -> list[dict]: """ Use LLM to detect PII not covered by regex """ prompt = f""" Analyze this text and identify any personally identifiable information (PII): - People's names - Physical addresses - Dates of birth - Medical information - Financial data Text: {text[:2000]} Respond in JSON format: [{{"type": "...", "value": "...", "reason": "..."}}] """ response = await self.llm.generate(prompt, temperature=0) return json.loads(response)

Consent Management

DEVELOPERpython
from datetime import datetime from enum import Enum class ConsentPurpose(Enum): RAG_INDEXING = "rag_indexing" ANALYTICS = "analytics" PERSONALIZATION = "personalization" MARKETING = "marketing" class ConsentManager: def __init__(self, db): self.db = db async def record_consent( self, user_id: str, purpose: ConsentPurpose, granted: bool, metadata: dict = None ): """ Record a consent """ consent = { "user_id": user_id, "purpose": purpose.value, "granted": granted, "timestamp": datetime.utcnow(), "ip_address": metadata.get("ip") if metadata else None, "user_agent": metadata.get("user_agent") if metadata else None, "version": "v1.0" # Terms version } await self.db.consents.insert(consent) # If consent withdrawal, trigger deletion if not granted and purpose == ConsentPurpose.RAG_INDEXING: await self._trigger_data_deletion(user_id) async def check_consent( self, user_id: str, purpose: ConsentPurpose ) -> bool: """ Check if consent is active """ consent = await self.db.consents.find_one( {"user_id": user_id, "purpose": purpose.value}, sort=[("timestamp", -1)] ) return consent and consent.get("granted", False) async def get_consent_history(self, user_id: str) -> list[dict]: """ Consent history for audit """ return await self.db.consents.find( {"user_id": user_id} ).sort("timestamp", -1).to_list(100) async def _trigger_data_deletion(self, user_id: str): """ Trigger user data deletion """ # Delete from RAG indexes await self._delete_from_rag(user_id) # Anonymize conversation history await self._anonymize_conversations(user_id) # Log for audit await self.db.audit_logs.insert({ "action": "consent_withdrawal_processed", "user_id": user_id, "timestamp": datetime.utcnow() })

Right to Erasure (GDPR Article 17)

DEVELOPERpython
class RightToErasure: def __init__(self, rag_service, conversation_service, audit_service): self.rag = rag_service self.conversations = conversation_service self.audit = audit_service async def process_erasure_request( self, user_id: str, request_id: str, scope: str = "all" ) -> dict: """ Process a right to erasure request """ result = { "request_id": request_id, "user_id": user_id, "status": "processing", "actions": [] } try: # 1. Delete from RAG indexes if scope in ["all", "rag"]: rag_result = await self._delete_from_rag(user_id) result["actions"].append({ "type": "rag_deletion", "documents_deleted": rag_result["count"] }) # 2. Anonymize conversations if scope in ["all", "conversations"]: conv_result = await self._anonymize_conversations(user_id) result["actions"].append({ "type": "conversation_anonymization", "conversations_processed": conv_result["count"] }) # 3. Delete profile data if scope in ["all", "profile"]: await self._delete_profile(user_id) result["actions"].append({ "type": "profile_deletion", "status": "completed" }) # 4. Audit log await self.audit.log({ "action": "erasure_request_completed", "request_id": request_id, "user_id": user_id, "scope": scope, "actions": result["actions"] }) result["status"] = "completed" result["completed_at"] = datetime.utcnow().isoformat() except Exception as e: result["status"] = "failed" result["error"] = str(e) await self.audit.log({ "action": "erasure_request_failed", "request_id": request_id, "error": str(e) }) return result async def _delete_from_rag(self, user_id: str) -> dict: """ Delete user's documents from RAG indexes """ # Find all user's documents documents = await self.rag.find_documents_by_user(user_id) # Delete from vector database for doc in documents: await self.rag.delete_document(doc["id"]) return {"count": len(documents)} async def _anonymize_conversations(self, user_id: str) -> dict: """ Anonymize user's conversations """ conversations = await self.conversations.find_by_user(user_id) anonymizer = DataAnonymizer() for conv in conversations: for message in conv["messages"]: message["content"] = anonymizer.anonymize_text( message["content"], strategy="remove" ) message["user_id"] = "[DELETED_USER]" await self.conversations.update(conv["id"], conv) return {"count": len(conversations)}

Audit and Traceability

Secure Logging System

DEVELOPERpython
import json from datetime import datetime import hashlib class AuditLogger: def __init__(self, storage, integrity_checker=None): self.storage = storage self.integrity = integrity_checker self.previous_hash = None async def log(self, event: dict): """ Log an event with cryptographic integrity """ log_entry = { "timestamp": datetime.utcnow().isoformat(), "event_type": event.get("action", "unknown"), "user_id": event.get("user_id"), "tenant_id": event.get("tenant_id"), "resource_id": event.get("resource_id"), "details": event, "ip_address": event.get("ip"), "user_agent": event.get("user_agent") } # Hash chain for integrity log_entry["previous_hash"] = self.previous_hash log_entry["hash"] = self._compute_hash(log_entry) self.previous_hash = log_entry["hash"] await self.storage.insert("audit_logs", log_entry) # Alert if critical event if self._is_critical(event): await self._alert_security_team(log_entry) def _compute_hash(self, entry: dict) -> str: """ Compute hash for integrity chain """ data = json.dumps({ "timestamp": entry["timestamp"], "event_type": entry["event_type"], "details": entry["details"], "previous_hash": entry["previous_hash"] }, sort_keys=True) return hashlib.sha256(data.encode()).hexdigest() def _is_critical(self, event: dict) -> bool: """ Determine if event is critical """ critical_actions = [ "unauthorized_access", "data_export", "bulk_deletion", "permission_escalation", "failed_authentication" ] return event.get("action") in critical_actions async def verify_integrity(self, start_date: datetime, end_date: datetime) -> dict: """ Verify log chain integrity """ logs = await self.storage.find( "audit_logs", {"timestamp": {"$gte": start_date.isoformat(), "$lte": end_date.isoformat()}}, sort=[("timestamp", 1)] ) previous_hash = None integrity_ok = True issues = [] for log in logs: # Verify previous hash if log.get("previous_hash") != previous_hash: integrity_ok = False issues.append({ "log_id": log["_id"], "issue": "previous_hash_mismatch" }) # Verify current hash expected_hash = self._compute_hash({ "timestamp": log["timestamp"], "event_type": log["event_type"], "details": log["details"], "previous_hash": log["previous_hash"] }) if log.get("hash") != expected_hash: integrity_ok = False issues.append({ "log_id": log["_id"], "issue": "hash_mismatch" }) previous_hash = log.get("hash") return { "integrity_ok": integrity_ok, "logs_checked": len(logs), "issues": issues }

Compliance Report

DEVELOPERpython
class ComplianceReporter: def __init__(self, audit_logger, consent_manager, data_service): self.audit = audit_logger self.consent = consent_manager self.data = data_service async def generate_gdpr_report( self, tenant_id: str, period_start: datetime, period_end: datetime ) -> dict: """ Generate a GDPR compliance report """ return { "report_type": "GDPR_COMPLIANCE", "tenant_id": tenant_id, "period": { "start": period_start.isoformat(), "end": period_end.isoformat() }, "data_inventory": await self._data_inventory(tenant_id), "consent_summary": await self._consent_summary(tenant_id, period_start, period_end), "access_requests": await self._access_requests(tenant_id, period_start, period_end), "erasure_requests": await self._erasure_requests(tenant_id, period_start, period_end), "security_incidents": await self._security_incidents(tenant_id, period_start, period_end), "data_breaches": await self._data_breaches(tenant_id, period_start, period_end), "generated_at": datetime.utcnow().isoformat() } async def _data_inventory(self, tenant_id: str) -> dict: """ Inventory of processed data """ return { "document_collections": await self.data.count_collections(tenant_id), "total_documents": await self.data.count_documents(tenant_id), "data_categories": ["support_docs", "product_catalog", "faq"], "pii_present": True, "pii_types": ["email", "name", "order_history"], "retention_policy": "24_months", "encryption_status": "encrypted_at_rest" } async def _consent_summary( self, tenant_id: str, start: datetime, end: datetime ) -> dict: """ Consent summary """ consents = await self.consent.get_statistics(tenant_id, start, end) return { "total_users": consents["total_users"], "consent_rate": consents["consent_rate"], "withdrawals": consents["withdrawals"], "by_purpose": consents["by_purpose"] }

Compliance Checklist

Before Deployment

  • Impact assessment (DPIA) completed if sensitive data
  • Legal basis identified (consent, legitimate interest, contract)
  • Processing registry updated
  • Privacy policy mentioning AI usage
  • DPO consulted if applicable

Technical

  • Encryption at rest and in transit
  • Access control RBAC/ABAC implemented
  • Anonymization of sensitive data in logs
  • Retention automated with deletion
  • Audit logs with cryptographic integrity
  • Backup encrypted and tested

Organizational

  • Breach procedure documented
  • Training of teams on GDPR
  • Contracts with processors (DPA)
  • Contact point for rights exercise requests

Learn More


Simplified Compliance with Ailog

Making a RAG system compliant is complex and time-consuming. With Ailog, benefit from a compliant-by-design infrastructure:

  • European hosting (OVH, Scaleway) certified
  • AES-256 encryption at rest and TLS 1.3 in transit
  • Native RBAC with SSO (SAML, OAuth2)
  • Automatic anonymization of detected PII
  • Audit logs with configurable retention
  • DPA included in contract
  • Rights exercise: Self-service portal for your users

Discover Ailog and deploy a compliant RAG with peace of mind.

Tags

RAGsecurityGDPRAI Actcompliancepersonal data

Articles connexes

Ailog Assistant

Ici pour vous aider

Salut ! Pose-moi des questions sur Ailog et comment intégrer votre RAG dans vos projets !