Last active
June 24, 2025 16:18
-
-
Save bigsnarfdude/c3db17fd7f6b7afb9e59aafba975ac0e to your computer and use it in GitHub Desktop.
internal_wiki_rag.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import subprocess | |
import sys | |
from pathlib import Path | |
import json | |
from txtai import Embeddings, LLM, RAG | |
# --- Add this line to handle the OpenMP error, especially on macOS --- | |
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE" | |
def install_dependencies(): | |
"""Install required packages if not available.""" | |
required_packages = [ | |
"txtai[faiss]", | |
"beautifulsoup4", | |
"lxml" | |
] | |
for package in required_packages: | |
try: | |
if package == "beautifulsoup4": | |
import bs4 | |
elif package == "lxml": | |
import lxml | |
elif package == "txtai[faiss]": | |
import txtai | |
except ImportError: | |
print(f"Installing {package}...") | |
subprocess.check_call([sys.executable, "-m", "pip", "install", package]) | |
print(f"{package} installed successfully.") | |
def check_ollama_status(): | |
"""Checks if Ollama server is running by attempting a connection.""" | |
print("Checking Ollama server status...") | |
try: | |
subprocess.run(["curl", "-s", "--connect-timeout", "5", "http://localhost:11434/api/tags"], | |
check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
print("Ollama server is running successfully at http://localhost:11434.") | |
return True | |
except (subprocess.CalledProcessError, FileNotFoundError): | |
print("Error: Ollama server is not detected or not responding at http://localhost:11434.") | |
print("Please ensure Ollama is installed and running. Download from: https://ollama.com/download") | |
return False | |
def check_ollama_model(model_name): | |
"""Checks if a specific Ollama model is pulled, and pulls it if not.""" | |
print(f"Checking for Ollama model '{model_name}'...") | |
try: | |
result = subprocess.run(["ollama", "list"], check=True, capture_output=True, text=True) | |
if model_name not in result.stdout: | |
print(f"Ollama model '{model_name}' not found locally. Attempting to pull it now...") | |
subprocess.run(["ollama", "pull", model_name], check=True) | |
print(f"Model '{model_name}' pulled successfully.") | |
else: | |
print(f"Ollama model '{model_name}' is already pulled.") | |
return True | |
except (subprocess.CalledProcessError, FileNotFoundError) as e: | |
print(f"Error checking/pulling Ollama model '{model_name}': {e}") | |
return False | |
def extract_text_from_html(html_content): | |
"""Extract clean text from HTML content.""" | |
from bs4 import BeautifulSoup | |
try: | |
soup = BeautifulSoup(html_content, 'lxml') | |
# Remove script and style elements | |
for script in soup(["script", "style"]): | |
script.decompose() | |
# Get text and clean it up | |
text = soup.get_text() | |
# Clean up whitespace | |
lines = (line.strip() for line in text.splitlines()) | |
chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) | |
text = ' '.join(chunk for chunk in chunks if chunk) | |
return text | |
except Exception as e: | |
print(f"Error extracting text from HTML: {e}") | |
return "" | |
def extract_title_from_html(html_content): | |
"""Extract page title from HTML content.""" | |
from bs4 import BeautifulSoup | |
try: | |
soup = BeautifulSoup(html_content, 'lxml') | |
title_tag = soup.find('title') | |
if title_tag: | |
return title_tag.get_text().strip() | |
# Fallback to first h1 tag | |
h1_tag = soup.find('h1') | |
if h1_tag: | |
return h1_tag.get_text().strip() | |
return None | |
except: | |
return None | |
def load_html_documents(folder_path, max_files=None, chunk_size=2000): | |
"""Load HTML files from a folder and extract text content with metadata.""" | |
folder = Path(folder_path) | |
if not folder.exists(): | |
raise FileNotFoundError(f"Folder not found: {folder_path}") | |
documents = [] | |
html_files = list(folder.glob("**/*.html"))[:max_files] if max_files else list(folder.glob("**/*.html")) | |
print(f"Found {len(html_files)} HTML files to process...") | |
for i, html_file in enumerate(html_files): | |
if i % 50 == 0: # Progress indicator | |
print(f"Processing file {i+1}/{len(html_files)}: {html_file.name}") | |
try: | |
with open(html_file, 'r', encoding='utf-8', errors='ignore') as f: | |
html_content = f.read() | |
text = extract_text_from_html(html_content) | |
title = extract_title_from_html(html_content) | |
if len(text.strip()) < 100: # Skip very short documents | |
continue | |
# Store full file path for direct access | |
relative_path = html_file.relative_to(folder) | |
full_path = str(html_file) | |
# Split large documents into chunks | |
if len(text) > chunk_size: | |
chunks = [text[i:i+chunk_size] for i in range(0, len(text), chunk_size)] | |
for j, chunk in enumerate(chunks): | |
documents.append({ | |
'text': chunk, | |
'source': str(relative_path), | |
'title': title or html_file.stem, | |
'full_path': full_path, | |
'chunk_id': j, | |
'doc_index': len(documents) # Unique index for each chunk | |
}) | |
else: | |
documents.append({ | |
'text': text, | |
'source': str(relative_path), | |
'title': title or html_file.stem, | |
'full_path': full_path, | |
'chunk_id': 0, | |
'doc_index': len(documents) # Unique index for each chunk | |
}) | |
except Exception as e: | |
print(f"Error processing {html_file}: {e}") | |
continue | |
print(f"Successfully processed {len(documents)} document chunks from {len(html_files)} files.") | |
return documents | |
def open_document(file_path): | |
"""Open a document using the system's default application.""" | |
import platform | |
import subprocess | |
try: | |
if platform.system() == 'Darwin': # macOS | |
subprocess.run(['open', file_path]) | |
elif platform.system() == 'Windows': | |
subprocess.run(['start', file_path], shell=True) | |
else: # Linux | |
subprocess.run(['xdg-open', file_path]) | |
print(f"Opened: {file_path}") | |
except Exception as e: | |
print(f"Could not open file: {e}") | |
print(f"Manual path: {file_path}") | |
def search_documents_by_text(documents, query, limit=10): | |
"""Simple text-based search through documents as backup.""" | |
query_lower = query.lower() | |
results = [] | |
for i, doc in enumerate(documents): | |
text_lower = doc['text'].lower() | |
# Count query term matches | |
query_words = query_lower.split() | |
matches = sum(1 for word in query_words if word in text_lower) | |
if matches > 0: | |
# Simple scoring based on term frequency and document length | |
score = matches / len(query_words) * (1000 / len(doc['text'])) | |
results.append({ | |
'doc': doc, | |
'score': score, | |
'matches': matches | |
}) | |
# Sort by score and return top results | |
results.sort(key=lambda x: x['score'], reverse=True) | |
return results[:limit] | |
def main(): | |
# --- 0. Initial Setup & Checks --- | |
install_dependencies() | |
if not check_ollama_status(): | |
sys.exit(1) | |
# Configure your Ollama LLM model | |
ollama_llm_model = "mistral-small3.2:latest" # Change as needed | |
if not check_ollama_model(ollama_llm_model): | |
sys.exit(1) | |
# --- 1. Load HTML documents --- | |
wiki_folder = os.path.expanduser("~/Desktop/wiki_content_clone_April12_2025") | |
print(f"\n--- Loading HTML documents from {wiki_folder} ---") | |
# Process ALL files (removed max_files limit) | |
documents = load_html_documents(wiki_folder, max_files=None, chunk_size=1500) | |
if not documents: | |
print("No documents found or processed successfully.") | |
sys.exit(1) | |
print(f"Loaded {len(documents)} document chunks.") | |
print(f"Sample document preview: {documents[0]['text'][:200]}...") | |
# Save documents with proper indexing | |
with open('documents_index.json', 'w') as f: | |
json.dump(documents, f, indent=2) | |
print("Document index saved to documents_index.json") | |
# --- 2. Create embeddings index --- | |
print("\n--- Creating embeddings index ---") | |
# Create embeddings (simpler approach - just index the text) | |
embeddings = Embeddings( | |
path="sentence-transformers/all-MiniLM-L6-v2", | |
content=True | |
) | |
# Index just the text content | |
texts = [doc['text'] for doc in documents] | |
embeddings.index(texts) | |
print("Embeddings index created.") | |
# --- 3. Set up RAG pipeline --- | |
print("\n--- Setting up RAG pipeline ---") | |
citation_prompt = """You are a helpful assistant that answers questions based on provided wiki content. | |
Always include source information in your responses when possible. | |
Answer questions based on the provided wiki content. If the information is not in the context, say so clearly.""" | |
rag = RAG( | |
embeddings, | |
path=f"ollama/{ollama_llm_model}", | |
system=citation_prompt, | |
context=3 # Use top 3 relevant chunks | |
) | |
print("RAG pipeline ready.") | |
# --- 4. Interactive query loop --- | |
print("\n--- Interactive Wiki RAG Query System ---") | |
print("Commands:") | |
print(" - Ask any question about your wiki content") | |
print(" - 'search [term]' - search using embeddings") | |
print(" - 'textsearch [term]' - simple text search with full metadata") | |
print(" - 'open [number]' - open the nth document from last search") | |
print(" - 'test-garibaldi' - comprehensive Garibaldi content test") | |
print(" - 'quit' - exit") | |
last_search_results = [] | |
while True: | |
try: | |
query = input("\nYour question: ").strip() | |
if query.lower() in ['quit', 'exit', 'q']: | |
break | |
if not query: | |
continue | |
if query.lower().startswith('open '): | |
try: | |
doc_num = int(query.split()[1]) - 1 | |
if 0 <= doc_num < len(last_search_results): | |
doc_info = last_search_results[doc_num] | |
file_path = doc_info.get('full_path') | |
if file_path: | |
open_document(file_path) | |
else: | |
print(f"No file path available for: {doc_info.get('source', 'Unknown')}") | |
else: | |
print(f"Invalid document number. Use 1-{len(last_search_results)}") | |
except (ValueError, IndexError): | |
print("Usage: open [number] (e.g., 'open 1')") | |
continue | |
if query.lower() == 'test-garibaldi': | |
print("\n--- Comprehensive Garibaldi Test ---") | |
# Test 1: Text search | |
print("1. Text-based search for Garibaldi content:") | |
text_results = search_documents_by_text(documents, "garibaldi", limit=5) | |
print(f" Found {len(text_results)} documents containing 'garibaldi'") | |
for i, result in enumerate(text_results): | |
doc = result['doc'] | |
print(f"\n Result {i+1} (Score: {result['score']:.3f}, Matches: {result['matches']}):") | |
print(f" Title: {doc['title']}") | |
print(f" Source: {doc['source']}") | |
print(f" File: {doc['full_path']}") | |
print(f" Text preview: {doc['text'][:200]}...") | |
# Test 2: Look for specific Garibaldi terms | |
print("\n2. Searching for specific Garibaldi identifiers:") | |
specific_terms = ['142.103.55.5', 'files.birs.ca', 'educloud', 'UBC IT'] | |
for term in specific_terms: | |
term_results = search_documents_by_text(documents, term, limit=3) | |
print(f"\n '{term}': {len(term_results)} documents found") | |
for result in term_results[:1]: # Show top result | |
doc = result['doc'] | |
print(f" {doc['source']}: {doc['text'][:100]}...") | |
# Test 3: Embedding search | |
print("\n3. Embedding-based search:") | |
embedding_results = embeddings.search("garibaldi server UBC", limit=5) | |
print(f" Found {len(embedding_results)} embedding results") | |
for i, result in enumerate(embedding_results): | |
# Get the document index from the result | |
if hasattr(result, 'id'): | |
doc_idx = result.id | |
if doc_idx < len(documents): | |
doc = documents[doc_idx] | |
print(f" Result {i+1}: {doc['source']} - {doc['text'][:100]}...") | |
else: | |
print(f" Result {i+1}: Index {doc_idx} out of range") | |
else: | |
print(f" Result {i+1}: {str(result)[:100]}...") | |
continue | |
if query.lower().startswith('textsearch '): | |
search_term = query[11:] # Remove 'textsearch ' prefix | |
print(f"\n--- Text search for: {search_term} ---") | |
results = search_documents_by_text(documents, search_term, limit=10) | |
last_search_results = [result['doc'] for result in results] | |
print(f"Found {len(results)} documents containing '{search_term}':") | |
for i, result in enumerate(results): | |
doc = result['doc'] | |
print(f"\nResult {i+1} (Score: {result['score']:.3f}):") | |
print(f"Title: {doc['title']}") | |
print(f"Source: {doc['source']}") | |
print(f"File: {doc['full_path']}") | |
print(f"Text: {doc['text'][:300]}...") | |
if results: | |
print(f"\n💡 Use 'open [1-{len(results)}]' to open a document") | |
continue | |
if query.lower().startswith('search '): | |
search_term = query[7:] # Remove 'search ' prefix | |
print(f"\n--- Embedding search for: {search_term} ---") | |
results = embeddings.search(search_term, limit=5) | |
last_search_results = [] | |
for i, result in enumerate(results): | |
print(f"\nResult {i+1}:") | |
# Try to get document info if possible | |
if hasattr(result, 'id') and result.id < len(documents): | |
doc = documents[result.id] | |
last_search_results.append(doc) | |
print(f"Title: {doc['title']}") | |
print(f"Source: {doc['source']}") | |
print(f"File: {doc['full_path']}") | |
print(f"Text: {doc['text'][:300]}...") | |
else: | |
print(f"Text: {str(result)[:300]}...") | |
last_search_results.append({'title': 'Unknown', 'source': 'Unknown', 'full_path': None}) | |
continue | |
# Regular RAG query | |
print(f"\n--- Processing: {query} ---") | |
# Get RAG response | |
response = rag(query) | |
answer = response.get('answer', 'No answer found.') | |
print(f"\nAnswer: {answer}") | |
# Also show text search results for comparison | |
text_results = search_documents_by_text(documents, query, limit=3) | |
if text_results: | |
print(f"\n📚 Most relevant documents (text search):") | |
for i, result in enumerate(text_results): | |
doc = result['doc'] | |
print(f" {i+1}. {doc['title']} - {doc['source']}") | |
print(f" File: {doc['full_path']}") | |
except KeyboardInterrupt: | |
print("\n\nExiting...") | |
break | |
except Exception as e: | |
print(f"Error: {e}") | |
continue | |
print("\n--- Session ended ---") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment