Intelligent Knowledge Base: Centralizing Enterprise Knowledge

Create an AI knowledge base for your company: technical documentation, onboarding, and business expertise accessible instantly.

Author
Ailog Team
Published
Reading time
19 min read
Level
intermediate

Intelligent Knowledge Base: Centralizing Enterprise Knowledge

In every company, knowledge is scattered: Confluence documents, Google Drive, emails, expertise in employees' heads. An intelligent knowledge base powered by RAG transforms this chaos into an assistant capable of instantly answering any business question.

The Problem of Scattered Knowledge

The Reality in Companies

A McKinsey study reveals that an employee spends an average of 1.8 hours per day searching for information. That's 9.3 hours per week, or 23% of work time lost in search.

Classic symptoms: • "Where's the document about procedure X?" • "Who knows how to configure Y?" • "We already solved this problem, but I can't find the solution" • "I'm new, I don't know where to look"

Limitations of Traditional Solutions

| Solution | Problem | |----------|---------| | Wiki/Confluence | Complex navigation, search limited to exact keywords | | Google Drive | No structure, impossible to search within content | | Slack/Teams | Scattered messages, no knowledge capture | | Internal experts | Single point of failure, cognitive load | | Static FAQs | Never up to date, don't cover all cases |

The RAG KB Advantage

A RAG knowledge base enables: • Semantic search: Find info even without knowing exact terms • Synthesis: Get a consolidated answer from multiple sources • Contextualization: Answers adapted to user profile • Continuous updates: Automatic synchronization with sources • Capitalization: Every answer enriches the base

Intelligent KB Architecture

`` ┌─────────────────────────────────────────────────────────────┐ │ DATA SOURCES │ ├──────────┬──────────┬──────────┬──────────┬────────────────┤ │Confluence│ Google │ Notion │ Slack │ PDF │ │ │ Drive │ │(archived)│ Documents │ └────┬─────┴────┬─────┴────┬─────┴────┬─────┴───────┬────────┘ │ │ │ │ │ └──────────┴──────────┴──────────┴─────────────┘ │ ▼ ┌─────────────────────────┐ │ Ingestion Pipeline │ │ - Text extraction │ │ - Intelligent chunking │ │ - Metadata │ └───────────┬─────────────┘ ▼ ┌─────────────────────────┐ │ Embedding + │ │ Indexing │ └───────────┬─────────────┘ ▼ ┌─────────────────────────┐ │ Vector Database │ │ (Qdrant) │ └───────────┬─────────────┘ │ ┌─────────────────────────┴──────────────────────────────────┐ │ INTERFACE │ ├─────────────┬─────────────┬─────────────┬─────────────────┤ │ Web │ Slack │ Internal │ Advanced │ │ Chatbot │ Bot │ API │ Search │ └─────────────┴─────────────┴─────────────┴─────────────────┘ `

Source Connectors

Confluence Connector

`python from atlassian import Confluence import html2text

class ConfluenceConnector: def __init__(self, url: str, username: str, api_token: str): self.confluence = Confluence( url=url, username=username, password=api_token, cloud=True ) self.html_converter = html2text.HTML2Text() self.html_converter.ignore_links = False

def get_all_pages(self, space_keys: list[str] = None) -> list[dict]: """ Retrieve all pages from specified spaces """ documents = []

if space_keys is None: spaces = self.confluence.get_all_spaces() space_keys = [s['key'] for s in spaces['results']]

for space_key in space_keys: pages = self._get_space_pages(space_key) documents.extend(pages)

return documents

def _get_space_pages(self, space_key: str) -> list[dict]: """ Retrieve all pages from a space """ pages = [] start = 0 limit = 50

while True: result = self.confluence.get_all_pages_from_space( space_key, start=start, limit=limit, expand='body.storage,ancestors,version' )

for page in result: pages.append(self._format_page(page, space_key))

if len(result) < limit: break start += limit

return pages

def _format_page(self, page: dict, space_key: str) -> dict: """ Format a Confluence page for RAG """ Convert HTML to text html_content = page.get('body', {}).get('storage', {}).get('value', '') text_content = self.html_converter.handle(html_content)

Build hierarchical path ancestors = page.get('ancestors', []) path = ' > '.join([a['title'] for a in ancestors] + [page['title']])

return { "id": f"confluence_{page['id']}", "title": page['title'], "content": f"{page['title']}\n\n{text_content}", "metadata": { "type": "confluence", "source": "confluence", "space": space_key, "page_id": page['id'], "path": path, "url": f"{self.confluence.url}/wiki/spaces/{space_key}/pages/{page['id']}", "author": page.get('version', {}).get('by', {}).get('displayName'), "last_updated": page.get('version', {}).get('when'), "version": page.get('version', {}).get('number') } } `

Google Drive Connector

`python from google.oauth2 import service_account from googleapiclient.discovery import build from googleapiclient.http import MediaIoBaseDownload import io

class GoogleDriveConnector: def __init__(self, credentials_path: str, folder_ids: list[str] = None): credentials = service_account.Credentials.from_service_account_file( credentials_path, scopes=['https://www.googleapis.com/auth/drive.readonly'] ) self.service = build('drive', 'v3', credentials=credentials) self.folder_ids = folder_ids or ['root']

def get_all_documents(self) -> list[dict]: """ Retrieve all documents from specified folders """ documents = []

for folder_id in self.folder_ids: docs = self._get_folder_documents(folder_id) documents.extend(docs)

return documents

def _get_folder_documents(self, folder_id: str, path: str = "") -> list[dict]: """ Recursively retrieve documents from a folder """ documents = []

List files query = f"'{folder_id}' in parents and trashed = false" results = self.service.files().list( q=query, fields="files(id, name, mimeType, modifiedTime, owners, webViewLink)", pageSize=100 ).execute()

for file in results.get('files', []): current_path = f"{path}/{file['name']}" if path else file['name']

if file['mimeType'] == 'application/vnd.google-apps.folder': Recurse into subfolders sub_docs = self._get_folder_documents(file['id'], current_path) documents.extend(sub_docs) else: Extract file content content = self._extract_content(file) if content: documents.append({ "id": f"gdrive_{file['id']}", "title": file['name'], "content": content, "metadata": { "type": "google_drive", "source": "google_drive", "file_id": file['id'], "path": current_path, "mime_type": file['mimeType'], "url": file.get('webViewLink'), "author": file.get('owners', [{}])[0].get('displayName'), "last_updated": file.get('modifiedTime') } })

return documents

def _extract_content(self, file: dict) -> str: """ Extract text content from a file """ mime_type = file['mimeType'] file_id = file['id']

Google Docs - export as text if mime_type == 'application/vnd.google-apps.document': return self._export_google_doc(file_id)

Google Sheets - export as CSV elif mime_type == 'application/vnd.google-apps.spreadsheet': return self._export_google_sheet(file_id)

PDF - download and extract elif mime_type == 'application/pdf': return self._extract_pdf(file_id)

Text files elif mime_type.startswith('text/'): return self._download_text_file(file_id)

return None

def _export_google_doc(self, file_id: str) -> str: """ Export a Google Doc as text """ request = self.service.files().export_media( fileId=file_id, mimeType='text/plain' ) content = io.BytesIO() downloader = MediaIoBaseDownload(content, request)

done = False while not done: _, done = downloader.next_chunk()

return content.getvalue().decode('utf-8') `

Notion Connector

`python from notion_client import Client

class NotionConnector: def __init__(self, token: str, database_ids: list[str] = None): self.client = Client(auth=token) self.database_ids = database_ids

def get_all_pages(self) -> list[dict]: """ Retrieve all Notion pages """ documents = []

if self.database_ids: for db_id in self.database_ids: pages = self._get_database_pages(db_id) documents.extend(pages) else: Search all accessible pages results = self.client.search(filter={"property": "object", "value": "page"}) for page in results['results']: doc = self._format_page(page) if doc: documents.append(doc)

return documents

def _get_database_pages(self, database_id: str) -> list[dict]: """ Retrieve pages from a Notion database """ documents = [] has_more = True start_cursor = None

while has_more: response = self.client.databases.query( database_id=database_id, start_cursor=start_cursor )

for page in response['results']: doc = self._format_page(page) if doc: documents.append(doc)

has_more = response['has_more'] start_cursor = response.get('next_cursor')

return documents

def _format_page(self, page: dict) -> dict: """ Format a Notion page for RAG """ Extract title title = self._extract_title(page) if not title: return None

Extract content content = self._extract_page_content(page['id'])

return { "id": f"notion_{page['id']}", "title": title, "content": f"{title}\n\n{content}", "metadata": { "type": "notion", "source": "notion", "page_id": page['id'], "url": page['url'], "created_time": page['created_time'], "last_updated": page['last_edited_time'] } }

def _extract_page_content(self, page_id: str) -> str: """ Extract text content from a Notion page """ blocks = self.client.blocks.children.list(block_id=page_id) content_parts = []

for block in blocks['results']: text = self._block_to_text(block) if text: content_parts.append(text)

return "\n\n".join(content_parts)

def _block_to_text(self, block: dict) -> str: """ Convert a Notion block to text """ block_type = block['type']

if block_type == 'paragraph': return self._rich_text_to_string(block['paragraph']['rich_text'])

elif block_type in ['heading_1', 'heading_2', 'heading_3']: prefix = '#' int(block_type[-1]) text = self._rich_text_to_string(block[block_type]['rich_text']) return f"{prefix} {text}"

elif block_type == 'bulleted_list_item': return f"- {self._rich_text_to_string(block['bulleted_list_item']['rich_text'])}"

elif block_type == 'numbered_list_item': return f"1. {self._rich_text_to_string(block['numbered_list_item']['rich_text'])}"

elif block_type == 'code': code = self._rich_text_to_string(block['code']['rich_text']) lang = block['code'].get('language', '') return f"`{lang}\n{code}\n`"

return ""

def _rich_text_to_string(self, rich_text: list) -> str: return "".join([rt['plain_text'] for rt in rich_text]) `

Intelligent Ingestion Pipeline

Adaptive Chunking by Document Type

`python from langchain.text_splitter import ( RecursiveCharacterTextSplitter, MarkdownTextSplitter, Language )

class AdaptiveChunker: def __init__(self): self.chunkers = { "markdown": MarkdownTextSplitter( chunk_size=1000, chunk_overlap=100 ), "code": RecursiveCharacterTextSplitter.from_language( language=Language.PYTHON, chunk_size=500, chunk_overlap=50 ), "prose": RecursiveCharacterTextSplitter( chunk_size=800, chunk_overlap=100, separators=["\n\n", "\n", ". ", " ", ""] ), "faq": RecursiveCharacterTextSplitter( chunk_size=400, chunk_overlap=0, separators=["Q:", "Question:", "---"] ) }

def chunk(self, document: dict) -> list[dict]: """ Split a document with adapted strategy """ doc_type = self._detect_type(document) chunker = self.chunkers.get(doc_type, self.chunkers["prose"])

chunks = chunker.split_text(document["content"])

return [ { "id": f"{document['id']}_chunk_{i}", "content": chunk, "metadata": { document["metadata"], "parent_id": document["id"], "parent_title": document["title"], "chunk_index": i, "total_chunks": len(chunks) } } for i, chunk in enumerate(chunks) ]

def _detect_type(self, document: dict) -> str: """ Detect document type to adapt chunking """ content = document["content"] title = document["title"].lower()

FAQ if any(kw in title for kw in ["faq", "questions", "q&a"]): return "faq"

Code code_indicators = ["`", "def ", "class ", "function ", "import "] if sum(1 for ind in code_indicators if ind in content) >= 2: return "code"

Structured markdown if content.count("#") >= 3 or content.count("##") >= 2: return "markdown"

return "prose" `

Metadata Enrichment

`python class MetadataEnricher: def __init__(self, llm=None): self.llm = llm

async def enrich(self, document: dict) -> dict: """ Enrich document metadata """ enriched = document.copy()

Automatic categorization enriched["metadata"]["category"] = await self._categorize(document)

Entity extraction enriched["metadata"]["entities"] = await self._extract_entities(document)

Quality score enriched["metadata"]["quality_score"] = self._calculate_quality(document)

Automatic tags enriched["metadata"]["auto_tags"] = await self._generate_tags(document)

return enriched

async def _categorize(self, document: dict) -> str: """ Automatically categorize the document """ if not self.llm: return "uncategorized"

prompt = f""" Categorize this document among the following categories: • procedure: Step-by-step procedure or guide • reference: Technical reference documentation • policy: Company policy or regulation • tutorial: Tutorial or training • faq: Frequently asked questions • other: Other

Title: {document['title']} Content start: {document['content'][:500]}

Respond only with the category name. """

response = await self.llm.generate(prompt, temperature=0) return response.strip().lower()

async def _extract_entities(self, document: dict) -> dict: """ Extract named entities """ Basic regex extraction import re

content = document["content"]

return { "tools": re.findall(r'\b(?:Jira|Confluence|Slack|GitHub|GitLab|AWS|Azure|GCP)\b', content, re.I), "teams": re.findall(r'team\s+(\w+)', content, re.I), "people": [], Requires more advanced NER "versions": re.findall(r'v?\d+\.\d+(?:\.\d+)?', content) }

def _calculate_quality(self, document: dict) -> float: """ Calculate document quality score """ score = 1.0 content = document["content"]

Penalties if len(content) < 100: score -= 0.3 Too short if "TODO" in content or "WIP" in content: score -= 0.2 Not finalized if document["metadata"].get("last_updated"): Old document from datetime import datetime, timedelta try: updated = datetime.fromisoformat(document["metadata"]["last_updated"].replace("Z", "+00:00")) if datetime.now(updated.tzinfo) - updated > timedelta(days=365): score -= 0.2 except: pass

Bonuses if content.count("#") >= 2: score += 0.1 Well structured if "example" in content.lower(): score += 0.1 Contains examples

return max(0, min(1, score))

async def _generate_tags(self, document: dict) -> list[str]: """ Generate automatic tags """ if not self.llm: return []

prompt = f""" Generate 3 to 5 relevant tags for this document.

Title: {document['title']} Content: {document['content'][:1000]}

Respond with tags separated by commas. """

response = await self.llm.generate(prompt, temperature=0.3) tags = [t.strip().lower() for t in response.split(",")] return tags[:5] `

Intelligent Search

Hybrid Search with Personalization

`python class KBSearch: def __init__(self, vector_db, bm25_index, user_service): self.vector_db = vector_db self.bm25_index = bm25_index self.user_service = user_service

async def search( self, query: str, user_id: str, top_k: int = 10, filters: dict = None ) -> list[dict]: """ Personalized hybrid search """ User profile for personalization user_profile = await self.user_service.get_profile(user_id)

Vector search vector_results = await self._vector_search(query, top_k 2, filters)

Lexical BM25 search bm25_results = await self._bm25_search(query, top_k 2, filters)

RRF fusion (Reciprocal Rank Fusion) fused = self._rrf_fusion(vector_results, bm25_results)

Profile-based personalization personalized = self._personalize(fused, user_profile)

return personalized[:top_k]

def _rrf_fusion( self, results_1: list[dict], results_2: list[dict], k: int = 60 ) -> list[dict]: """ RRF fusion of two result lists """ scores = {}

for rank, doc in enumerate(results_1): doc_id = doc["id"] scores[doc_id] = scores.get(doc_id, 0) + 1 / (k + rank + 1)

for rank, doc in enumerate(results_2): doc_id = doc["id"] scores[doc_id] = scores.get(doc_id, 0) + 1 / (k + rank + 1)

Create merged list all_docs = {d["id"]: d for d in results_1 + results_2} sorted_ids = sorted(scores.keys(), key=lambda x: scores[x], reverse=True)

return [ {all_docs[doc_id], "fusion_score": scores[doc_id]} for doc_id in sorted_ids if doc_id in all_docs ]

def _personalize(self, results: list[dict], user_profile: dict) -> list[dict]: """ Personalize results based on user profile """ user_team = user_profile.get("team") user_role = user_profile.get("role") user_history = set(user_profile.get("viewed_docs", []))

for result in results: boost = 0

Boost for team documents if result["metadata"].get("team") == user_team: boost += 0.1

Boost for role-appropriate documents if result["metadata"].get("target_role") == user_role: boost += 0.05

Slight penalty for already viewed (favor discovery) if result["id"] in user_history: boost -= 0.02

result["personalized_score"] = result.get("fusion_score", 0.5) + boost

Re-sort by personalized score return sorted(results, key=lambda x: x["personalized_score"], reverse=True) `

Intelligent Suggestions

`python class KBSuggestions: def __init__(self, search, analytics): self.search = search self.analytics = analytics

async def get_suggestions(self, user_id: str, context: str = None) -> list[dict]: """ Generate personalized suggestions """ suggestions = [] Popular documents popular = await self._get_popular_docs() suggestions.extend([{"type": "popular", d} for d in popular[:3]]) Recently updated documents recent = await self._get_recently_updated() suggestions.extend([{"type": "updated", d} for d in recent[:3]]) Based on user history if user_id: related = await self._get_related_to_history(user_id) suggestions.extend([{"type": "for_you", d} for d in related[:3]]) Based on current context (page, project, etc.) if context: contextual = await self.search.search(context, user_id, top_k=3) suggestions.extend([{"type": "contextual", d} for d in contextual])

Deduplicate seen = set() unique_suggestions = [] for s in suggestions: if s["id"] not in seen: seen.add(s["id"]) unique_suggestions.append(s)

return unique_suggestions[:10]

async def _get_popular_docs(self) -> list[dict]: """ Most viewed documents """ return await self.analytics.get_top_documents(period_days=30, limit=5)

async def _get_recently_updated(self) -> list[dict]: """ Recently updated documents """ return await self.search.vector_db.query( sort_by="metadata.last_updated", sort_order="desc", limit=5 ) `

System Prompt for Internal KB

`python KB_ASSISTANT_PROMPT = """You are the knowledge base assistant for {company_name}.

USER CONTEXT: • Name: {user_name} • Team: {user_team} • Role: {user_role}

RULES: Answer ONLY from the provided documentation If info is not available, say so clearly and suggest who to contact Always cite your sources with links to documents Adapt your technical level to the user's role

RESPONSE FORMAT: • Start with a direct, concise answer • Then detail if necessary • End with sources used

AVAILABLE DOCUMENTS: {context}

QUESTION: {question} """ `

Permission Management

`python class KBPermissions: def __init__(self, user_service, document_service): self.user_service = user_service self.document_service = document_service

async def filter_accessible( self, documents: list[dict], user_id: str ) -> list[dict]: """ Filter documents by user permissions """ user = await self.user_service.get_user(user_id) user_groups = set(user.get("groups", [])) user_team = user.get("team")

accessible = [] for doc in documents: if self._can_access(doc, user_groups, user_team): accessible.append(doc)

return accessible

def _can_access( self, document: dict, user_groups: set, user_team: str ) -> bool: """ Check if a user can access a document """ doc_meta = document.get("metadata", {})

Public documents if doc_meta.get("visibility") == "public": return True

Team documents if doc_meta.get("visibility") == "team": return doc_meta.get("team") == user_team

Documents restricted to certain groups allowed_groups = set(doc_meta.get("allowed_groups", [])) if allowed_groups and not allowed_groups.intersection(user_groups): return False

return True `

Metrics and Analytics

`python class KBAnalytics: def __init__(self, db): self.db = db

def track_search( self, user_id: str, query: str, results_count: int, clicked_doc_id: str = None ): """ Track a search """ self.db.insert("kb_searches", { "user_id": user_id, "query": query, "results_count": results_count, "clicked_doc_id": clicked_doc_id, "timestamp": datetime.now() })

def get_search_metrics(self, period_days: int = 30) -> dict: """ Search metrics """ return { "total_searches": self._count_searches(period_days), "unique_users": self._count_unique_searchers(period_days), "avg_results_per_search": self._avg_results(period_days), "zero_result_rate": self._zero_result_rate(period_days), "click_through_rate": self._ctr(period_days), "top_queries": self._top_queries(period_days), "top_documents": self._top_documents(period_days) }

def identify_gaps(self) -> list[dict]: """ Identify KB gaps (zero-result queries) """ return self.db.query(""" SELECT query, COUNT() as count FROM kb_searches WHERE results_count = 0 AND timestamp > NOW() - INTERVAL '30 days' GROUP BY query ORDER BY count DESC LIMIT 20 """) ``

Best Practices Maintain Freshness • Daily automatic synchronization • Alerts on stale documents (> 6 months without update) • Periodic review of least viewed documents Encourage Contribution • Simple interface to report errors • Update suggestions after consultation • Gamification: badges for contributors Measure Adoption

| Metric | Target | Action if Below Target | |--------|--------|------------------------| | DAU / MAU | > 60% | Communication campaign | | Zero-result queries | < 5% | Enrich KB | | Avg resolution time | < 30s | Improve ranking | | Satisfaction (CSAT) | > 4.2/5 | Analyze feedback |

Learn More • Retrieval Fundamentals - Optimize search • Multimodal RAG - Index images and PDFs • Introduction to RAG - Understand the basics

---

Deploy Your Intelligent KB with Ailog

Creating a RAG knowledge base requires integrating multiple sources and managing complex permissions. With Ailog, simplify deployment: • Native connectors: Confluence, Notion, Google Drive, SharePoint • Real-time automatic synchronization • Built-in permission management (SSO, groups, teams) • Customizable search interface • Analytics to measure adoption and identify gaps • European hosting GDPR compliant

Try Ailog for free and centralize your company's knowledge in days.

Tags

  • RAG
  • knowledge base
  • enterprise
  • documentation
  • onboarding
GuideIntermédiaire

Intelligent Knowledge Base: Centralizing Enterprise Knowledge

21 janvier 2026
19 min read
Ailog Team

Create an AI knowledge base for your company: technical documentation, onboarding, and business expertise accessible instantly.

Intelligent Knowledge Base: Centralizing Enterprise Knowledge

In every company, knowledge is scattered: Confluence documents, Google Drive, emails, expertise in employees' heads. An intelligent knowledge base powered by RAG transforms this chaos into an assistant capable of instantly answering any business question.

The Problem of Scattered Knowledge

The Reality in Companies

A McKinsey study reveals that an employee spends an average of 1.8 hours per day searching for information. That's 9.3 hours per week, or 23% of work time lost in search.

Classic symptoms:

  • "Where's the document about procedure X?"
  • "Who knows how to configure Y?"
  • "We already solved this problem, but I can't find the solution"
  • "I'm new, I don't know where to look"

Limitations of Traditional Solutions

SolutionProblem
Wiki/ConfluenceComplex navigation, search limited to exact keywords
Google DriveNo structure, impossible to search within content
Slack/TeamsScattered messages, no knowledge capture
Internal expertsSingle point of failure, cognitive load
Static FAQsNever up to date, don't cover all cases

The RAG KB Advantage

A RAG knowledge base enables:

  • Semantic search: Find info even without knowing exact terms
  • Synthesis: Get a consolidated answer from multiple sources
  • Contextualization: Answers adapted to user profile
  • Continuous updates: Automatic synchronization with sources
  • Capitalization: Every answer enriches the base

Intelligent KB Architecture

┌─────────────────────────────────────────────────────────────┐
│                       DATA SOURCES                           │
├──────────┬──────────┬──────────┬──────────┬────────────────┤
│Confluence│  Google  │  Notion  │  Slack   │   PDF          │
│          │  Drive   │          │(archived)│  Documents     │
└────┬─────┴────┬─────┴────┬─────┴────┬─────┴───────┬────────┘
     │          │          │          │             │
     └──────────┴──────────┴──────────┴─────────────┘
                           │
                           ▼
              ┌─────────────────────────┐
              │   Ingestion Pipeline    │
              │  - Text extraction      │
              │  - Intelligent chunking │
              │  - Metadata             │
              └───────────┬─────────────┘
                          ▼
              ┌─────────────────────────┐
              │     Embedding +         │
              │     Indexing            │
              └───────────┬─────────────┘
                          ▼
              ┌─────────────────────────┐
              │    Vector Database      │
              │      (Qdrant)           │
              └───────────┬─────────────┘
                          │
┌─────────────────────────┴──────────────────────────────────┐
│                       INTERFACE                             │
├─────────────┬─────────────┬─────────────┬─────────────────┤
│    Web      │    Slack    │  Internal   │   Advanced      │
│  Chatbot    │    Bot      │    API      │   Search        │
└─────────────┴─────────────┴─────────────┴─────────────────┘

Source Connectors

Confluence Connector

DEVELOPERpython
from atlassian import Confluence import html2text class ConfluenceConnector: def __init__(self, url: str, username: str, api_token: str): self.confluence = Confluence( url=url, username=username, password=api_token, cloud=True ) self.html_converter = html2text.HTML2Text() self.html_converter.ignore_links = False def get_all_pages(self, space_keys: list[str] = None) -> list[dict]: """ Retrieve all pages from specified spaces """ documents = [] if space_keys is None: spaces = self.confluence.get_all_spaces() space_keys = [s['key'] for s in spaces['results']] for space_key in space_keys: pages = self._get_space_pages(space_key) documents.extend(pages) return documents def _get_space_pages(self, space_key: str) -> list[dict]: """ Retrieve all pages from a space """ pages = [] start = 0 limit = 50 while True: result = self.confluence.get_all_pages_from_space( space_key, start=start, limit=limit, expand='body.storage,ancestors,version' ) for page in result: pages.append(self._format_page(page, space_key)) if len(result) < limit: break start += limit return pages def _format_page(self, page: dict, space_key: str) -> dict: """ Format a Confluence page for RAG """ # Convert HTML to text html_content = page.get('body', {}).get('storage', {}).get('value', '') text_content = self.html_converter.handle(html_content) # Build hierarchical path ancestors = page.get('ancestors', []) path = ' > '.join([a['title'] for a in ancestors] + [page['title']]) return { "id": f"confluence_{page['id']}", "title": page['title'], "content": f"# {page['title']}\n\n{text_content}", "metadata": { "type": "confluence", "source": "confluence", "space": space_key, "page_id": page['id'], "path": path, "url": f"{self.confluence.url}/wiki/spaces/{space_key}/pages/{page['id']}", "author": page.get('version', {}).get('by', {}).get('displayName'), "last_updated": page.get('version', {}).get('when'), "version": page.get('version', {}).get('number') } }

Google Drive Connector

DEVELOPERpython
from google.oauth2 import service_account from googleapiclient.discovery import build from googleapiclient.http import MediaIoBaseDownload import io class GoogleDriveConnector: def __init__(self, credentials_path: str, folder_ids: list[str] = None): credentials = service_account.Credentials.from_service_account_file( credentials_path, scopes=['https://www.googleapis.com/auth/drive.readonly'] ) self.service = build('drive', 'v3', credentials=credentials) self.folder_ids = folder_ids or ['root'] def get_all_documents(self) -> list[dict]: """ Retrieve all documents from specified folders """ documents = [] for folder_id in self.folder_ids: docs = self._get_folder_documents(folder_id) documents.extend(docs) return documents def _get_folder_documents(self, folder_id: str, path: str = "") -> list[dict]: """ Recursively retrieve documents from a folder """ documents = [] # List files query = f"'{folder_id}' in parents and trashed = false" results = self.service.files().list( q=query, fields="files(id, name, mimeType, modifiedTime, owners, webViewLink)", pageSize=100 ).execute() for file in results.get('files', []): current_path = f"{path}/{file['name']}" if path else file['name'] if file['mimeType'] == 'application/vnd.google-apps.folder': # Recurse into subfolders sub_docs = self._get_folder_documents(file['id'], current_path) documents.extend(sub_docs) else: # Extract file content content = self._extract_content(file) if content: documents.append({ "id": f"gdrive_{file['id']}", "title": file['name'], "content": content, "metadata": { "type": "google_drive", "source": "google_drive", "file_id": file['id'], "path": current_path, "mime_type": file['mimeType'], "url": file.get('webViewLink'), "author": file.get('owners', [{}])[0].get('displayName'), "last_updated": file.get('modifiedTime') } }) return documents def _extract_content(self, file: dict) -> str: """ Extract text content from a file """ mime_type = file['mimeType'] file_id = file['id'] # Google Docs - export as text if mime_type == 'application/vnd.google-apps.document': return self._export_google_doc(file_id) # Google Sheets - export as CSV elif mime_type == 'application/vnd.google-apps.spreadsheet': return self._export_google_sheet(file_id) # PDF - download and extract elif mime_type == 'application/pdf': return self._extract_pdf(file_id) # Text files elif mime_type.startswith('text/'): return self._download_text_file(file_id) return None def _export_google_doc(self, file_id: str) -> str: """ Export a Google Doc as text """ request = self.service.files().export_media( fileId=file_id, mimeType='text/plain' ) content = io.BytesIO() downloader = MediaIoBaseDownload(content, request) done = False while not done: _, done = downloader.next_chunk() return content.getvalue().decode('utf-8')

Notion Connector

DEVELOPERpython
from notion_client import Client class NotionConnector: def __init__(self, token: str, database_ids: list[str] = None): self.client = Client(auth=token) self.database_ids = database_ids def get_all_pages(self) -> list[dict]: """ Retrieve all Notion pages """ documents = [] if self.database_ids: for db_id in self.database_ids: pages = self._get_database_pages(db_id) documents.extend(pages) else: # Search all accessible pages results = self.client.search(filter={"property": "object", "value": "page"}) for page in results['results']: doc = self._format_page(page) if doc: documents.append(doc) return documents def _get_database_pages(self, database_id: str) -> list[dict]: """ Retrieve pages from a Notion database """ documents = [] has_more = True start_cursor = None while has_more: response = self.client.databases.query( database_id=database_id, start_cursor=start_cursor ) for page in response['results']: doc = self._format_page(page) if doc: documents.append(doc) has_more = response['has_more'] start_cursor = response.get('next_cursor') return documents def _format_page(self, page: dict) -> dict: """ Format a Notion page for RAG """ # Extract title title = self._extract_title(page) if not title: return None # Extract content content = self._extract_page_content(page['id']) return { "id": f"notion_{page['id']}", "title": title, "content": f"# {title}\n\n{content}", "metadata": { "type": "notion", "source": "notion", "page_id": page['id'], "url": page['url'], "created_time": page['created_time'], "last_updated": page['last_edited_time'] } } def _extract_page_content(self, page_id: str) -> str: """ Extract text content from a Notion page """ blocks = self.client.blocks.children.list(block_id=page_id) content_parts = [] for block in blocks['results']: text = self._block_to_text(block) if text: content_parts.append(text) return "\n\n".join(content_parts) def _block_to_text(self, block: dict) -> str: """ Convert a Notion block to text """ block_type = block['type'] if block_type == 'paragraph': return self._rich_text_to_string(block['paragraph']['rich_text']) elif block_type in ['heading_1', 'heading_2', 'heading_3']: prefix = '#' * int(block_type[-1]) text = self._rich_text_to_string(block[block_type]['rich_text']) return f"{prefix} {text}" elif block_type == 'bulleted_list_item': return f"- {self._rich_text_to_string(block['bulleted_list_item']['rich_text'])}" elif block_type == 'numbered_list_item': return f"1. {self._rich_text_to_string(block['numbered_list_item']['rich_text'])}" elif block_type == 'code': code = self._rich_text_to_string(block['code']['rich_text']) lang = block['code'].get('language', '') return f"```{lang}\n{code}\n```" return "" def _rich_text_to_string(self, rich_text: list) -> str: return "".join([rt['plain_text'] for rt in rich_text])

Intelligent Ingestion Pipeline

Adaptive Chunking by Document Type

DEVELOPERpython
from langchain.text_splitter import ( RecursiveCharacterTextSplitter, MarkdownTextSplitter, Language ) class AdaptiveChunker: def __init__(self): self.chunkers = { "markdown": MarkdownTextSplitter( chunk_size=1000, chunk_overlap=100 ), "code": RecursiveCharacterTextSplitter.from_language( language=Language.PYTHON, chunk_size=500, chunk_overlap=50 ), "prose": RecursiveCharacterTextSplitter( chunk_size=800, chunk_overlap=100, separators=["\n\n", "\n", ". ", " ", ""] ), "faq": RecursiveCharacterTextSplitter( chunk_size=400, chunk_overlap=0, separators=["Q:", "Question:", "---"] ) } def chunk(self, document: dict) -> list[dict]: """ Split a document with adapted strategy """ doc_type = self._detect_type(document) chunker = self.chunkers.get(doc_type, self.chunkers["prose"]) chunks = chunker.split_text(document["content"]) return [ { "id": f"{document['id']}_chunk_{i}", "content": chunk, "metadata": { **document["metadata"], "parent_id": document["id"], "parent_title": document["title"], "chunk_index": i, "total_chunks": len(chunks) } } for i, chunk in enumerate(chunks) ] def _detect_type(self, document: dict) -> str: """ Detect document type to adapt chunking """ content = document["content"] title = document["title"].lower() # FAQ if any(kw in title for kw in ["faq", "questions", "q&a"]): return "faq" # Code code_indicators = ["```", "def ", "class ", "function ", "import "] if sum(1 for ind in code_indicators if ind in content) >= 2: return "code" # Structured markdown if content.count("#") >= 3 or content.count("##") >= 2: return "markdown" return "prose"

Metadata Enrichment

DEVELOPERpython
class MetadataEnricher: def __init__(self, llm=None): self.llm = llm async def enrich(self, document: dict) -> dict: """ Enrich document metadata """ enriched = document.copy() # Automatic categorization enriched["metadata"]["category"] = await self._categorize(document) # Entity extraction enriched["metadata"]["entities"] = await self._extract_entities(document) # Quality score enriched["metadata"]["quality_score"] = self._calculate_quality(document) # Automatic tags enriched["metadata"]["auto_tags"] = await self._generate_tags(document) return enriched async def _categorize(self, document: dict) -> str: """ Automatically categorize the document """ if not self.llm: return "uncategorized" prompt = f""" Categorize this document among the following categories: - procedure: Step-by-step procedure or guide - reference: Technical reference documentation - policy: Company policy or regulation - tutorial: Tutorial or training - faq: Frequently asked questions - other: Other Title: {document['title']} Content start: {document['content'][:500]} Respond only with the category name. """ response = await self.llm.generate(prompt, temperature=0) return response.strip().lower() async def _extract_entities(self, document: dict) -> dict: """ Extract named entities """ # Basic regex extraction import re content = document["content"] return { "tools": re.findall(r'\b(?:Jira|Confluence|Slack|GitHub|GitLab|AWS|Azure|GCP)\b', content, re.I), "teams": re.findall(r'team\s+(\w+)', content, re.I), "people": [], # Requires more advanced NER "versions": re.findall(r'v?\d+\.\d+(?:\.\d+)?', content) } def _calculate_quality(self, document: dict) -> float: """ Calculate document quality score """ score = 1.0 content = document["content"] # Penalties if len(content) < 100: score -= 0.3 # Too short if "TODO" in content or "WIP" in content: score -= 0.2 # Not finalized if document["metadata"].get("last_updated"): # Old document from datetime import datetime, timedelta try: updated = datetime.fromisoformat(document["metadata"]["last_updated"].replace("Z", "+00:00")) if datetime.now(updated.tzinfo) - updated > timedelta(days=365): score -= 0.2 except: pass # Bonuses if content.count("#") >= 2: score += 0.1 # Well structured if "example" in content.lower(): score += 0.1 # Contains examples return max(0, min(1, score)) async def _generate_tags(self, document: dict) -> list[str]: """ Generate automatic tags """ if not self.llm: return [] prompt = f""" Generate 3 to 5 relevant tags for this document. Title: {document['title']} Content: {document['content'][:1000]} Respond with tags separated by commas. """ response = await self.llm.generate(prompt, temperature=0.3) tags = [t.strip().lower() for t in response.split(",")] return tags[:5]

Intelligent Search

Hybrid Search with Personalization

DEVELOPERpython
class KBSearch: def __init__(self, vector_db, bm25_index, user_service): self.vector_db = vector_db self.bm25_index = bm25_index self.user_service = user_service async def search( self, query: str, user_id: str, top_k: int = 10, filters: dict = None ) -> list[dict]: """ Personalized hybrid search """ # User profile for personalization user_profile = await self.user_service.get_profile(user_id) # Vector search vector_results = await self._vector_search(query, top_k * 2, filters) # Lexical BM25 search bm25_results = await self._bm25_search(query, top_k * 2, filters) # RRF fusion (Reciprocal Rank Fusion) fused = self._rrf_fusion(vector_results, bm25_results) # Profile-based personalization personalized = self._personalize(fused, user_profile) return personalized[:top_k] def _rrf_fusion( self, results_1: list[dict], results_2: list[dict], k: int = 60 ) -> list[dict]: """ RRF fusion of two result lists """ scores = {} for rank, doc in enumerate(results_1): doc_id = doc["id"] scores[doc_id] = scores.get(doc_id, 0) + 1 / (k + rank + 1) for rank, doc in enumerate(results_2): doc_id = doc["id"] scores[doc_id] = scores.get(doc_id, 0) + 1 / (k + rank + 1) # Create merged list all_docs = {d["id"]: d for d in results_1 + results_2} sorted_ids = sorted(scores.keys(), key=lambda x: scores[x], reverse=True) return [ {**all_docs[doc_id], "fusion_score": scores[doc_id]} for doc_id in sorted_ids if doc_id in all_docs ] def _personalize(self, results: list[dict], user_profile: dict) -> list[dict]: """ Personalize results based on user profile """ user_team = user_profile.get("team") user_role = user_profile.get("role") user_history = set(user_profile.get("viewed_docs", [])) for result in results: boost = 0 # Boost for team documents if result["metadata"].get("team") == user_team: boost += 0.1 # Boost for role-appropriate documents if result["metadata"].get("target_role") == user_role: boost += 0.05 # Slight penalty for already viewed (favor discovery) if result["id"] in user_history: boost -= 0.02 result["personalized_score"] = result.get("fusion_score", 0.5) + boost # Re-sort by personalized score return sorted(results, key=lambda x: x["personalized_score"], reverse=True)

Intelligent Suggestions

DEVELOPERpython
class KBSuggestions: def __init__(self, search, analytics): self.search = search self.analytics = analytics async def get_suggestions(self, user_id: str, context: str = None) -> list[dict]: """ Generate personalized suggestions """ suggestions = [] # 1. Popular documents popular = await self._get_popular_docs() suggestions.extend([{"type": "popular", **d} for d in popular[:3]]) # 2. Recently updated documents recent = await self._get_recently_updated() suggestions.extend([{"type": "updated", **d} for d in recent[:3]]) # 3. Based on user history if user_id: related = await self._get_related_to_history(user_id) suggestions.extend([{"type": "for_you", **d} for d in related[:3]]) # 4. Based on current context (page, project, etc.) if context: contextual = await self.search.search(context, user_id, top_k=3) suggestions.extend([{"type": "contextual", **d} for d in contextual]) # Deduplicate seen = set() unique_suggestions = [] for s in suggestions: if s["id"] not in seen: seen.add(s["id"]) unique_suggestions.append(s) return unique_suggestions[:10] async def _get_popular_docs(self) -> list[dict]: """ Most viewed documents """ return await self.analytics.get_top_documents(period_days=30, limit=5) async def _get_recently_updated(self) -> list[dict]: """ Recently updated documents """ return await self.search.vector_db.query( sort_by="metadata.last_updated", sort_order="desc", limit=5 )

System Prompt for Internal KB

DEVELOPERpython
KB_ASSISTANT_PROMPT = """You are the knowledge base assistant for {company_name}. USER CONTEXT: - Name: {user_name} - Team: {user_team} - Role: {user_role} RULES: 1. Answer ONLY from the provided documentation 2. If info is not available, say so clearly and suggest who to contact 3. Always cite your sources with links to documents 4. Adapt your technical level to the user's role RESPONSE FORMAT: - Start with a direct, concise answer - Then detail if necessary - End with sources used AVAILABLE DOCUMENTS: {context} QUESTION: {question} """

Permission Management

DEVELOPERpython
class KBPermissions: def __init__(self, user_service, document_service): self.user_service = user_service self.document_service = document_service async def filter_accessible( self, documents: list[dict], user_id: str ) -> list[dict]: """ Filter documents by user permissions """ user = await self.user_service.get_user(user_id) user_groups = set(user.get("groups", [])) user_team = user.get("team") accessible = [] for doc in documents: if self._can_access(doc, user_groups, user_team): accessible.append(doc) return accessible def _can_access( self, document: dict, user_groups: set, user_team: str ) -> bool: """ Check if a user can access a document """ doc_meta = document.get("metadata", {}) # Public documents if doc_meta.get("visibility") == "public": return True # Team documents if doc_meta.get("visibility") == "team": return doc_meta.get("team") == user_team # Documents restricted to certain groups allowed_groups = set(doc_meta.get("allowed_groups", [])) if allowed_groups and not allowed_groups.intersection(user_groups): return False return True

Metrics and Analytics

DEVELOPERpython
class KBAnalytics: def __init__(self, db): self.db = db def track_search( self, user_id: str, query: str, results_count: int, clicked_doc_id: str = None ): """ Track a search """ self.db.insert("kb_searches", { "user_id": user_id, "query": query, "results_count": results_count, "clicked_doc_id": clicked_doc_id, "timestamp": datetime.now() }) def get_search_metrics(self, period_days: int = 30) -> dict: """ Search metrics """ return { "total_searches": self._count_searches(period_days), "unique_users": self._count_unique_searchers(period_days), "avg_results_per_search": self._avg_results(period_days), "zero_result_rate": self._zero_result_rate(period_days), "click_through_rate": self._ctr(period_days), "top_queries": self._top_queries(period_days), "top_documents": self._top_documents(period_days) } def identify_gaps(self) -> list[dict]: """ Identify KB gaps (zero-result queries) """ return self.db.query(""" SELECT query, COUNT(*) as count FROM kb_searches WHERE results_count = 0 AND timestamp > NOW() - INTERVAL '30 days' GROUP BY query ORDER BY count DESC LIMIT 20 """)

Best Practices

1. Maintain Freshness

  • Daily automatic synchronization
  • Alerts on stale documents (> 6 months without update)
  • Periodic review of least viewed documents

2. Encourage Contribution

  • Simple interface to report errors
  • Update suggestions after consultation
  • Gamification: badges for contributors

3. Measure Adoption

MetricTargetAction if Below Target
DAU / MAU> 60%Communication campaign
Zero-result queries< 5%Enrich KB
Avg resolution time< 30sImprove ranking
Satisfaction (CSAT)> 4.2/5Analyze feedback

Learn More


Deploy Your Intelligent KB with Ailog

Creating a RAG knowledge base requires integrating multiple sources and managing complex permissions. With Ailog, simplify deployment:

  • Native connectors: Confluence, Notion, Google Drive, SharePoint
  • Real-time automatic synchronization
  • Built-in permission management (SSO, groups, teams)
  • Customizable search interface
  • Analytics to measure adoption and identify gaps
  • European hosting GDPR compliant

Try Ailog for free and centralize your company's knowledge in days.

Tags

RAGknowledge baseenterprisedocumentationonboarding

Articles connexes

Ailog Assistant

Ici pour vous aider

Salut ! Pose-moi des questions sur Ailog et comment intégrer votre RAG dans vos projets !