Intelligent Knowledge Base: Centralizing Enterprise Knowledge
Create an AI knowledge base for your company: technical documentation, onboarding, and business expertise accessible instantly.
- Author
- Ailog Team
- Published
- Reading time
- 19 min read
- Level
- intermediate
Intelligent Knowledge Base: Centralizing Enterprise Knowledge
In every company, knowledge is scattered: Confluence documents, Google Drive, emails, expertise in employees' heads. An intelligent knowledge base powered by RAG transforms this chaos into an assistant capable of instantly answering any business question.
The Problem of Scattered Knowledge
The Reality in Companies
A McKinsey study reveals that an employee spends an average of 1.8 hours per day searching for information. That's 9.3 hours per week, or 23% of work time lost in search.
Classic symptoms: • "Where's the document about procedure X?" • "Who knows how to configure Y?" • "We already solved this problem, but I can't find the solution" • "I'm new, I don't know where to look"
Limitations of Traditional Solutions
| Solution | Problem | |----------|---------| | Wiki/Confluence | Complex navigation, search limited to exact keywords | | Google Drive | No structure, impossible to search within content | | Slack/Teams | Scattered messages, no knowledge capture | | Internal experts | Single point of failure, cognitive load | | Static FAQs | Never up to date, don't cover all cases |
The RAG KB Advantage
A RAG knowledge base enables: • Semantic search: Find info even without knowing exact terms • Synthesis: Get a consolidated answer from multiple sources • Contextualization: Answers adapted to user profile • Continuous updates: Automatic synchronization with sources • Capitalization: Every answer enriches the base
Intelligent KB Architecture
`` ┌─────────────────────────────────────────────────────────────┐ │ DATA SOURCES │ ├──────────┬──────────┬──────────┬──────────┬────────────────┤ │Confluence│ Google │ Notion │ Slack │ PDF │ │ │ Drive │ │(archived)│ Documents │ └────┬─────┴────┬─────┴────┬─────┴────┬─────┴───────┬────────┘ │ │ │ │ │ └──────────┴──────────┴──────────┴─────────────┘ │ ▼ ┌─────────────────────────┐ │ Ingestion Pipeline │ │ - Text extraction │ │ - Intelligent chunking │ │ - Metadata │ └───────────┬─────────────┘ ▼ ┌─────────────────────────┐ │ Embedding + │ │ Indexing │ └───────────┬─────────────┘ ▼ ┌─────────────────────────┐ │ Vector Database │ │ (Qdrant) │ └───────────┬─────────────┘ │ ┌─────────────────────────┴──────────────────────────────────┐ │ INTERFACE │ ├─────────────┬─────────────┬─────────────┬─────────────────┤ │ Web │ Slack │ Internal │ Advanced │ │ Chatbot │ Bot │ API │ Search │ └─────────────┴─────────────┴─────────────┴─────────────────┘ `
Source Connectors
Confluence Connector
`python from atlassian import Confluence import html2text
class ConfluenceConnector: def __init__(self, url: str, username: str, api_token: str): self.confluence = Confluence( url=url, username=username, password=api_token, cloud=True ) self.html_converter = html2text.HTML2Text() self.html_converter.ignore_links = False
def get_all_pages(self, space_keys: list[str] = None) -> list[dict]: """ Retrieve all pages from specified spaces """ documents = []
if space_keys is None: spaces = self.confluence.get_all_spaces() space_keys = [s['key'] for s in spaces['results']]
for space_key in space_keys: pages = self._get_space_pages(space_key) documents.extend(pages)
return documents
def _get_space_pages(self, space_key: str) -> list[dict]: """ Retrieve all pages from a space """ pages = [] start = 0 limit = 50
while True: result = self.confluence.get_all_pages_from_space( space_key, start=start, limit=limit, expand='body.storage,ancestors,version' )
for page in result: pages.append(self._format_page(page, space_key))
if len(result) < limit: break start += limit
return pages
def _format_page(self, page: dict, space_key: str) -> dict: """ Format a Confluence page for RAG """ Convert HTML to text html_content = page.get('body', {}).get('storage', {}).get('value', '') text_content = self.html_converter.handle(html_content)
Build hierarchical path ancestors = page.get('ancestors', []) path = ' > '.join([a['title'] for a in ancestors] + [page['title']])
return { "id": f"confluence_{page['id']}", "title": page['title'], "content": f"{page['title']}\n\n{text_content}", "metadata": { "type": "confluence", "source": "confluence", "space": space_key, "page_id": page['id'], "path": path, "url": f"{self.confluence.url}/wiki/spaces/{space_key}/pages/{page['id']}", "author": page.get('version', {}).get('by', {}).get('displayName'), "last_updated": page.get('version', {}).get('when'), "version": page.get('version', {}).get('number') } } `
Google Drive Connector
`python from google.oauth2 import service_account from googleapiclient.discovery import build from googleapiclient.http import MediaIoBaseDownload import io
class GoogleDriveConnector: def __init__(self, credentials_path: str, folder_ids: list[str] = None): credentials = service_account.Credentials.from_service_account_file( credentials_path, scopes=['https://www.googleapis.com/auth/drive.readonly'] ) self.service = build('drive', 'v3', credentials=credentials) self.folder_ids = folder_ids or ['root']
def get_all_documents(self) -> list[dict]: """ Retrieve all documents from specified folders """ documents = []
for folder_id in self.folder_ids: docs = self._get_folder_documents(folder_id) documents.extend(docs)
return documents
def _get_folder_documents(self, folder_id: str, path: str = "") -> list[dict]: """ Recursively retrieve documents from a folder """ documents = []
List files query = f"'{folder_id}' in parents and trashed = false" results = self.service.files().list( q=query, fields="files(id, name, mimeType, modifiedTime, owners, webViewLink)", pageSize=100 ).execute()
for file in results.get('files', []): current_path = f"{path}/{file['name']}" if path else file['name']
if file['mimeType'] == 'application/vnd.google-apps.folder': Recurse into subfolders sub_docs = self._get_folder_documents(file['id'], current_path) documents.extend(sub_docs) else: Extract file content content = self._extract_content(file) if content: documents.append({ "id": f"gdrive_{file['id']}", "title": file['name'], "content": content, "metadata": { "type": "google_drive", "source": "google_drive", "file_id": file['id'], "path": current_path, "mime_type": file['mimeType'], "url": file.get('webViewLink'), "author": file.get('owners', [{}])[0].get('displayName'), "last_updated": file.get('modifiedTime') } })
return documents
def _extract_content(self, file: dict) -> str: """ Extract text content from a file """ mime_type = file['mimeType'] file_id = file['id']
Google Docs - export as text if mime_type == 'application/vnd.google-apps.document': return self._export_google_doc(file_id)
Google Sheets - export as CSV elif mime_type == 'application/vnd.google-apps.spreadsheet': return self._export_google_sheet(file_id)
PDF - download and extract elif mime_type == 'application/pdf': return self._extract_pdf(file_id)
Text files elif mime_type.startswith('text/'): return self._download_text_file(file_id)
return None
def _export_google_doc(self, file_id: str) -> str: """ Export a Google Doc as text """ request = self.service.files().export_media( fileId=file_id, mimeType='text/plain' ) content = io.BytesIO() downloader = MediaIoBaseDownload(content, request)
done = False while not done: _, done = downloader.next_chunk()
return content.getvalue().decode('utf-8') `
Notion Connector
`python from notion_client import Client
class NotionConnector: def __init__(self, token: str, database_ids: list[str] = None): self.client = Client(auth=token) self.database_ids = database_ids
def get_all_pages(self) -> list[dict]: """ Retrieve all Notion pages """ documents = []
if self.database_ids: for db_id in self.database_ids: pages = self._get_database_pages(db_id) documents.extend(pages) else: Search all accessible pages results = self.client.search(filter={"property": "object", "value": "page"}) for page in results['results']: doc = self._format_page(page) if doc: documents.append(doc)
return documents
def _get_database_pages(self, database_id: str) -> list[dict]: """ Retrieve pages from a Notion database """ documents = [] has_more = True start_cursor = None
while has_more: response = self.client.databases.query( database_id=database_id, start_cursor=start_cursor )
for page in response['results']: doc = self._format_page(page) if doc: documents.append(doc)
has_more = response['has_more'] start_cursor = response.get('next_cursor')
return documents
def _format_page(self, page: dict) -> dict: """ Format a Notion page for RAG """ Extract title title = self._extract_title(page) if not title: return None
Extract content content = self._extract_page_content(page['id'])
return { "id": f"notion_{page['id']}", "title": title, "content": f"{title}\n\n{content}", "metadata": { "type": "notion", "source": "notion", "page_id": page['id'], "url": page['url'], "created_time": page['created_time'], "last_updated": page['last_edited_time'] } }
def _extract_page_content(self, page_id: str) -> str: """ Extract text content from a Notion page """ blocks = self.client.blocks.children.list(block_id=page_id) content_parts = []
for block in blocks['results']: text = self._block_to_text(block) if text: content_parts.append(text)
return "\n\n".join(content_parts)
def _block_to_text(self, block: dict) -> str: """ Convert a Notion block to text """ block_type = block['type']
if block_type == 'paragraph': return self._rich_text_to_string(block['paragraph']['rich_text'])
elif block_type in ['heading_1', 'heading_2', 'heading_3']: prefix = '#' int(block_type[-1]) text = self._rich_text_to_string(block[block_type]['rich_text']) return f"{prefix} {text}"
elif block_type == 'bulleted_list_item': return f"- {self._rich_text_to_string(block['bulleted_list_item']['rich_text'])}"
elif block_type == 'numbered_list_item': return f"1. {self._rich_text_to_string(block['numbered_list_item']['rich_text'])}"
elif block_type == 'code': code = self._rich_text_to_string(block['code']['rich_text']) lang = block['code'].get('language', '') return f"`{lang}\n{code}\n`"
return ""
def _rich_text_to_string(self, rich_text: list) -> str: return "".join([rt['plain_text'] for rt in rich_text]) `
Intelligent Ingestion Pipeline
Adaptive Chunking by Document Type
`python from langchain.text_splitter import ( RecursiveCharacterTextSplitter, MarkdownTextSplitter, Language )
class AdaptiveChunker: def __init__(self): self.chunkers = { "markdown": MarkdownTextSplitter( chunk_size=1000, chunk_overlap=100 ), "code": RecursiveCharacterTextSplitter.from_language( language=Language.PYTHON, chunk_size=500, chunk_overlap=50 ), "prose": RecursiveCharacterTextSplitter( chunk_size=800, chunk_overlap=100, separators=["\n\n", "\n", ". ", " ", ""] ), "faq": RecursiveCharacterTextSplitter( chunk_size=400, chunk_overlap=0, separators=["Q:", "Question:", "---"] ) }
def chunk(self, document: dict) -> list[dict]: """ Split a document with adapted strategy """ doc_type = self._detect_type(document) chunker = self.chunkers.get(doc_type, self.chunkers["prose"])
chunks = chunker.split_text(document["content"])
return [ { "id": f"{document['id']}_chunk_{i}", "content": chunk, "metadata": { document["metadata"], "parent_id": document["id"], "parent_title": document["title"], "chunk_index": i, "total_chunks": len(chunks) } } for i, chunk in enumerate(chunks) ]
def _detect_type(self, document: dict) -> str: """ Detect document type to adapt chunking """ content = document["content"] title = document["title"].lower()
FAQ if any(kw in title for kw in ["faq", "questions", "q&a"]): return "faq"
Code code_indicators = ["`", "def ", "class ", "function ", "import "] if sum(1 for ind in code_indicators if ind in content) >= 2: return "code"
Structured markdown if content.count("#") >= 3 or content.count("##") >= 2: return "markdown"
return "prose" `
Metadata Enrichment
`python class MetadataEnricher: def __init__(self, llm=None): self.llm = llm
async def enrich(self, document: dict) -> dict: """ Enrich document metadata """ enriched = document.copy()
Automatic categorization enriched["metadata"]["category"] = await self._categorize(document)
Entity extraction enriched["metadata"]["entities"] = await self._extract_entities(document)
Quality score enriched["metadata"]["quality_score"] = self._calculate_quality(document)
Automatic tags enriched["metadata"]["auto_tags"] = await self._generate_tags(document)
return enriched
async def _categorize(self, document: dict) -> str: """ Automatically categorize the document """ if not self.llm: return "uncategorized"
prompt = f""" Categorize this document among the following categories: • procedure: Step-by-step procedure or guide • reference: Technical reference documentation • policy: Company policy or regulation • tutorial: Tutorial or training • faq: Frequently asked questions • other: Other
Title: {document['title']} Content start: {document['content'][:500]}
Respond only with the category name. """
response = await self.llm.generate(prompt, temperature=0) return response.strip().lower()
async def _extract_entities(self, document: dict) -> dict: """ Extract named entities """ Basic regex extraction import re
content = document["content"]
return { "tools": re.findall(r'\b(?:Jira|Confluence|Slack|GitHub|GitLab|AWS|Azure|GCP)\b', content, re.I), "teams": re.findall(r'team\s+(\w+)', content, re.I), "people": [], Requires more advanced NER "versions": re.findall(r'v?\d+\.\d+(?:\.\d+)?', content) }
def _calculate_quality(self, document: dict) -> float: """ Calculate document quality score """ score = 1.0 content = document["content"]
Penalties if len(content) < 100: score -= 0.3 Too short if "TODO" in content or "WIP" in content: score -= 0.2 Not finalized if document["metadata"].get("last_updated"): Old document from datetime import datetime, timedelta try: updated = datetime.fromisoformat(document["metadata"]["last_updated"].replace("Z", "+00:00")) if datetime.now(updated.tzinfo) - updated > timedelta(days=365): score -= 0.2 except: pass
Bonuses if content.count("#") >= 2: score += 0.1 Well structured if "example" in content.lower(): score += 0.1 Contains examples
return max(0, min(1, score))
async def _generate_tags(self, document: dict) -> list[str]: """ Generate automatic tags """ if not self.llm: return []
prompt = f""" Generate 3 to 5 relevant tags for this document.
Title: {document['title']} Content: {document['content'][:1000]}
Respond with tags separated by commas. """
response = await self.llm.generate(prompt, temperature=0.3) tags = [t.strip().lower() for t in response.split(",")] return tags[:5] `
Intelligent Search
Hybrid Search with Personalization
`python class KBSearch: def __init__(self, vector_db, bm25_index, user_service): self.vector_db = vector_db self.bm25_index = bm25_index self.user_service = user_service
async def search( self, query: str, user_id: str, top_k: int = 10, filters: dict = None ) -> list[dict]: """ Personalized hybrid search """ User profile for personalization user_profile = await self.user_service.get_profile(user_id)
Vector search vector_results = await self._vector_search(query, top_k 2, filters)
Lexical BM25 search bm25_results = await self._bm25_search(query, top_k 2, filters)
RRF fusion (Reciprocal Rank Fusion) fused = self._rrf_fusion(vector_results, bm25_results)
Profile-based personalization personalized = self._personalize(fused, user_profile)
return personalized[:top_k]
def _rrf_fusion( self, results_1: list[dict], results_2: list[dict], k: int = 60 ) -> list[dict]: """ RRF fusion of two result lists """ scores = {}
for rank, doc in enumerate(results_1): doc_id = doc["id"] scores[doc_id] = scores.get(doc_id, 0) + 1 / (k + rank + 1)
for rank, doc in enumerate(results_2): doc_id = doc["id"] scores[doc_id] = scores.get(doc_id, 0) + 1 / (k + rank + 1)
Create merged list all_docs = {d["id"]: d for d in results_1 + results_2} sorted_ids = sorted(scores.keys(), key=lambda x: scores[x], reverse=True)
return [ {all_docs[doc_id], "fusion_score": scores[doc_id]} for doc_id in sorted_ids if doc_id in all_docs ]
def _personalize(self, results: list[dict], user_profile: dict) -> list[dict]: """ Personalize results based on user profile """ user_team = user_profile.get("team") user_role = user_profile.get("role") user_history = set(user_profile.get("viewed_docs", []))
for result in results: boost = 0
Boost for team documents if result["metadata"].get("team") == user_team: boost += 0.1
Boost for role-appropriate documents if result["metadata"].get("target_role") == user_role: boost += 0.05
Slight penalty for already viewed (favor discovery) if result["id"] in user_history: boost -= 0.02
result["personalized_score"] = result.get("fusion_score", 0.5) + boost
Re-sort by personalized score return sorted(results, key=lambda x: x["personalized_score"], reverse=True) `
Intelligent Suggestions
`python class KBSuggestions: def __init__(self, search, analytics): self.search = search self.analytics = analytics
async def get_suggestions(self, user_id: str, context: str = None) -> list[dict]: """ Generate personalized suggestions """ suggestions = [] Popular documents popular = await self._get_popular_docs() suggestions.extend([{"type": "popular", d} for d in popular[:3]]) Recently updated documents recent = await self._get_recently_updated() suggestions.extend([{"type": "updated", d} for d in recent[:3]]) Based on user history if user_id: related = await self._get_related_to_history(user_id) suggestions.extend([{"type": "for_you", d} for d in related[:3]]) Based on current context (page, project, etc.) if context: contextual = await self.search.search(context, user_id, top_k=3) suggestions.extend([{"type": "contextual", d} for d in contextual])
Deduplicate seen = set() unique_suggestions = [] for s in suggestions: if s["id"] not in seen: seen.add(s["id"]) unique_suggestions.append(s)
return unique_suggestions[:10]
async def _get_popular_docs(self) -> list[dict]: """ Most viewed documents """ return await self.analytics.get_top_documents(period_days=30, limit=5)
async def _get_recently_updated(self) -> list[dict]: """ Recently updated documents """ return await self.search.vector_db.query( sort_by="metadata.last_updated", sort_order="desc", limit=5 ) `
System Prompt for Internal KB
`python KB_ASSISTANT_PROMPT = """You are the knowledge base assistant for {company_name}.
USER CONTEXT: • Name: {user_name} • Team: {user_team} • Role: {user_role}
RULES: Answer ONLY from the provided documentation If info is not available, say so clearly and suggest who to contact Always cite your sources with links to documents Adapt your technical level to the user's role
RESPONSE FORMAT: • Start with a direct, concise answer • Then detail if necessary • End with sources used
AVAILABLE DOCUMENTS: {context}
QUESTION: {question} """ `
Permission Management
`python class KBPermissions: def __init__(self, user_service, document_service): self.user_service = user_service self.document_service = document_service
async def filter_accessible( self, documents: list[dict], user_id: str ) -> list[dict]: """ Filter documents by user permissions """ user = await self.user_service.get_user(user_id) user_groups = set(user.get("groups", [])) user_team = user.get("team")
accessible = [] for doc in documents: if self._can_access(doc, user_groups, user_team): accessible.append(doc)
return accessible
def _can_access( self, document: dict, user_groups: set, user_team: str ) -> bool: """ Check if a user can access a document """ doc_meta = document.get("metadata", {})
Public documents if doc_meta.get("visibility") == "public": return True
Team documents if doc_meta.get("visibility") == "team": return doc_meta.get("team") == user_team
Documents restricted to certain groups allowed_groups = set(doc_meta.get("allowed_groups", [])) if allowed_groups and not allowed_groups.intersection(user_groups): return False
return True `
Metrics and Analytics
`python class KBAnalytics: def __init__(self, db): self.db = db
def track_search( self, user_id: str, query: str, results_count: int, clicked_doc_id: str = None ): """ Track a search """ self.db.insert("kb_searches", { "user_id": user_id, "query": query, "results_count": results_count, "clicked_doc_id": clicked_doc_id, "timestamp": datetime.now() })
def get_search_metrics(self, period_days: int = 30) -> dict: """ Search metrics """ return { "total_searches": self._count_searches(period_days), "unique_users": self._count_unique_searchers(period_days), "avg_results_per_search": self._avg_results(period_days), "zero_result_rate": self._zero_result_rate(period_days), "click_through_rate": self._ctr(period_days), "top_queries": self._top_queries(period_days), "top_documents": self._top_documents(period_days) }
def identify_gaps(self) -> list[dict]: """ Identify KB gaps (zero-result queries) """ return self.db.query(""" SELECT query, COUNT() as count FROM kb_searches WHERE results_count = 0 AND timestamp > NOW() - INTERVAL '30 days' GROUP BY query ORDER BY count DESC LIMIT 20 """) ``
Best Practices Maintain Freshness • Daily automatic synchronization • Alerts on stale documents (> 6 months without update) • Periodic review of least viewed documents Encourage Contribution • Simple interface to report errors • Update suggestions after consultation • Gamification: badges for contributors Measure Adoption
| Metric | Target | Action if Below Target | |--------|--------|------------------------| | DAU / MAU | > 60% | Communication campaign | | Zero-result queries | < 5% | Enrich KB | | Avg resolution time | < 30s | Improve ranking | | Satisfaction (CSAT) | > 4.2/5 | Analyze feedback |
Learn More • Retrieval Fundamentals - Optimize search • Multimodal RAG - Index images and PDFs • Introduction to RAG - Understand the basics
---
Deploy Your Intelligent KB with Ailog
Creating a RAG knowledge base requires integrating multiple sources and managing complex permissions. With Ailog, simplify deployment: • Native connectors: Confluence, Notion, Google Drive, SharePoint • Real-time automatic synchronization • Built-in permission management (SSO, groups, teams) • Customizable search interface • Analytics to measure adoption and identify gaps • European hosting GDPR compliant
Try Ailog for free and centralize your company's knowledge in days.