Skip to content

Instantly share code, notes, and snippets.

@bigsnarfdude
Last active June 24, 2025 16:18
Show Gist options
  • Save bigsnarfdude/c3db17fd7f6b7afb9e59aafba975ac0e to your computer and use it in GitHub Desktop.
Save bigsnarfdude/c3db17fd7f6b7afb9e59aafba975ac0e to your computer and use it in GitHub Desktop.
internal_wiki_rag.py
import os
import subprocess
import sys
from pathlib import Path
import json
from txtai import Embeddings, LLM, RAG
# --- Add this line to handle the OpenMP error, especially on macOS ---
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"
def install_dependencies():
"""Install required packages if not available."""
required_packages = [
"txtai[faiss]",
"beautifulsoup4",
"lxml"
]
for package in required_packages:
try:
if package == "beautifulsoup4":
import bs4
elif package == "lxml":
import lxml
elif package == "txtai[faiss]":
import txtai
except ImportError:
print(f"Installing {package}...")
subprocess.check_call([sys.executable, "-m", "pip", "install", package])
print(f"{package} installed successfully.")
def check_ollama_status():
"""Checks if Ollama server is running by attempting a connection."""
print("Checking Ollama server status...")
try:
subprocess.run(["curl", "-s", "--connect-timeout", "5", "http://localhost:11434/api/tags"],
check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
print("Ollama server is running successfully at http://localhost:11434.")
return True
except (subprocess.CalledProcessError, FileNotFoundError):
print("Error: Ollama server is not detected or not responding at http://localhost:11434.")
print("Please ensure Ollama is installed and running. Download from: https://ollama.com/download")
return False
def check_ollama_model(model_name):
"""Checks if a specific Ollama model is pulled, and pulls it if not."""
print(f"Checking for Ollama model '{model_name}'...")
try:
result = subprocess.run(["ollama", "list"], check=True, capture_output=True, text=True)
if model_name not in result.stdout:
print(f"Ollama model '{model_name}' not found locally. Attempting to pull it now...")
subprocess.run(["ollama", "pull", model_name], check=True)
print(f"Model '{model_name}' pulled successfully.")
else:
print(f"Ollama model '{model_name}' is already pulled.")
return True
except (subprocess.CalledProcessError, FileNotFoundError) as e:
print(f"Error checking/pulling Ollama model '{model_name}': {e}")
return False
def extract_text_from_html(html_content):
"""Extract clean text from HTML content."""
from bs4 import BeautifulSoup
try:
soup = BeautifulSoup(html_content, 'lxml')
# Remove script and style elements
for script in soup(["script", "style"]):
script.decompose()
# Get text and clean it up
text = soup.get_text()
# Clean up whitespace
lines = (line.strip() for line in text.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
text = ' '.join(chunk for chunk in chunks if chunk)
return text
except Exception as e:
print(f"Error extracting text from HTML: {e}")
return ""
def extract_title_from_html(html_content):
"""Extract page title from HTML content."""
from bs4 import BeautifulSoup
try:
soup = BeautifulSoup(html_content, 'lxml')
title_tag = soup.find('title')
if title_tag:
return title_tag.get_text().strip()
# Fallback to first h1 tag
h1_tag = soup.find('h1')
if h1_tag:
return h1_tag.get_text().strip()
return None
except:
return None
def load_html_documents(folder_path, max_files=None, chunk_size=2000):
"""Load HTML files from a folder and extract text content with metadata."""
folder = Path(folder_path)
if not folder.exists():
raise FileNotFoundError(f"Folder not found: {folder_path}")
documents = []
html_files = list(folder.glob("**/*.html"))[:max_files] if max_files else list(folder.glob("**/*.html"))
print(f"Found {len(html_files)} HTML files to process...")
for i, html_file in enumerate(html_files):
if i % 50 == 0: # Progress indicator
print(f"Processing file {i+1}/{len(html_files)}: {html_file.name}")
try:
with open(html_file, 'r', encoding='utf-8', errors='ignore') as f:
html_content = f.read()
text = extract_text_from_html(html_content)
title = extract_title_from_html(html_content)
if len(text.strip()) < 100: # Skip very short documents
continue
# Store full file path for direct access
relative_path = html_file.relative_to(folder)
full_path = str(html_file)
# Split large documents into chunks
if len(text) > chunk_size:
chunks = [text[i:i+chunk_size] for i in range(0, len(text), chunk_size)]
for j, chunk in enumerate(chunks):
documents.append({
'text': chunk,
'source': str(relative_path),
'title': title or html_file.stem,
'full_path': full_path,
'chunk_id': j,
'doc_index': len(documents) # Unique index for each chunk
})
else:
documents.append({
'text': text,
'source': str(relative_path),
'title': title or html_file.stem,
'full_path': full_path,
'chunk_id': 0,
'doc_index': len(documents) # Unique index for each chunk
})
except Exception as e:
print(f"Error processing {html_file}: {e}")
continue
print(f"Successfully processed {len(documents)} document chunks from {len(html_files)} files.")
return documents
def open_document(file_path):
"""Open a document using the system's default application."""
import platform
import subprocess
try:
if platform.system() == 'Darwin': # macOS
subprocess.run(['open', file_path])
elif platform.system() == 'Windows':
subprocess.run(['start', file_path], shell=True)
else: # Linux
subprocess.run(['xdg-open', file_path])
print(f"Opened: {file_path}")
except Exception as e:
print(f"Could not open file: {e}")
print(f"Manual path: {file_path}")
def search_documents_by_text(documents, query, limit=10):
"""Simple text-based search through documents as backup."""
query_lower = query.lower()
results = []
for i, doc in enumerate(documents):
text_lower = doc['text'].lower()
# Count query term matches
query_words = query_lower.split()
matches = sum(1 for word in query_words if word in text_lower)
if matches > 0:
# Simple scoring based on term frequency and document length
score = matches / len(query_words) * (1000 / len(doc['text']))
results.append({
'doc': doc,
'score': score,
'matches': matches
})
# Sort by score and return top results
results.sort(key=lambda x: x['score'], reverse=True)
return results[:limit]
def main():
# --- 0. Initial Setup & Checks ---
install_dependencies()
if not check_ollama_status():
sys.exit(1)
# Configure your Ollama LLM model
ollama_llm_model = "mistral-small3.2:latest" # Change as needed
if not check_ollama_model(ollama_llm_model):
sys.exit(1)
# --- 1. Load HTML documents ---
wiki_folder = os.path.expanduser("~/Desktop/wiki_content_clone_April12_2025")
print(f"\n--- Loading HTML documents from {wiki_folder} ---")
# Process ALL files (removed max_files limit)
documents = load_html_documents(wiki_folder, max_files=None, chunk_size=1500)
if not documents:
print("No documents found or processed successfully.")
sys.exit(1)
print(f"Loaded {len(documents)} document chunks.")
print(f"Sample document preview: {documents[0]['text'][:200]}...")
# Save documents with proper indexing
with open('documents_index.json', 'w') as f:
json.dump(documents, f, indent=2)
print("Document index saved to documents_index.json")
# --- 2. Create embeddings index ---
print("\n--- Creating embeddings index ---")
# Create embeddings (simpler approach - just index the text)
embeddings = Embeddings(
path="sentence-transformers/all-MiniLM-L6-v2",
content=True
)
# Index just the text content
texts = [doc['text'] for doc in documents]
embeddings.index(texts)
print("Embeddings index created.")
# --- 3. Set up RAG pipeline ---
print("\n--- Setting up RAG pipeline ---")
citation_prompt = """You are a helpful assistant that answers questions based on provided wiki content.
Always include source information in your responses when possible.
Answer questions based on the provided wiki content. If the information is not in the context, say so clearly."""
rag = RAG(
embeddings,
path=f"ollama/{ollama_llm_model}",
system=citation_prompt,
context=3 # Use top 3 relevant chunks
)
print("RAG pipeline ready.")
# --- 4. Interactive query loop ---
print("\n--- Interactive Wiki RAG Query System ---")
print("Commands:")
print(" - Ask any question about your wiki content")
print(" - 'search [term]' - search using embeddings")
print(" - 'textsearch [term]' - simple text search with full metadata")
print(" - 'open [number]' - open the nth document from last search")
print(" - 'test-garibaldi' - comprehensive Garibaldi content test")
print(" - 'quit' - exit")
last_search_results = []
while True:
try:
query = input("\nYour question: ").strip()
if query.lower() in ['quit', 'exit', 'q']:
break
if not query:
continue
if query.lower().startswith('open '):
try:
doc_num = int(query.split()[1]) - 1
if 0 <= doc_num < len(last_search_results):
doc_info = last_search_results[doc_num]
file_path = doc_info.get('full_path')
if file_path:
open_document(file_path)
else:
print(f"No file path available for: {doc_info.get('source', 'Unknown')}")
else:
print(f"Invalid document number. Use 1-{len(last_search_results)}")
except (ValueError, IndexError):
print("Usage: open [number] (e.g., 'open 1')")
continue
if query.lower() == 'test-garibaldi':
print("\n--- Comprehensive Garibaldi Test ---")
# Test 1: Text search
print("1. Text-based search for Garibaldi content:")
text_results = search_documents_by_text(documents, "garibaldi", limit=5)
print(f" Found {len(text_results)} documents containing 'garibaldi'")
for i, result in enumerate(text_results):
doc = result['doc']
print(f"\n Result {i+1} (Score: {result['score']:.3f}, Matches: {result['matches']}):")
print(f" Title: {doc['title']}")
print(f" Source: {doc['source']}")
print(f" File: {doc['full_path']}")
print(f" Text preview: {doc['text'][:200]}...")
# Test 2: Look for specific Garibaldi terms
print("\n2. Searching for specific Garibaldi identifiers:")
specific_terms = ['142.103.55.5', 'files.birs.ca', 'educloud', 'UBC IT']
for term in specific_terms:
term_results = search_documents_by_text(documents, term, limit=3)
print(f"\n '{term}': {len(term_results)} documents found")
for result in term_results[:1]: # Show top result
doc = result['doc']
print(f" {doc['source']}: {doc['text'][:100]}...")
# Test 3: Embedding search
print("\n3. Embedding-based search:")
embedding_results = embeddings.search("garibaldi server UBC", limit=5)
print(f" Found {len(embedding_results)} embedding results")
for i, result in enumerate(embedding_results):
# Get the document index from the result
if hasattr(result, 'id'):
doc_idx = result.id
if doc_idx < len(documents):
doc = documents[doc_idx]
print(f" Result {i+1}: {doc['source']} - {doc['text'][:100]}...")
else:
print(f" Result {i+1}: Index {doc_idx} out of range")
else:
print(f" Result {i+1}: {str(result)[:100]}...")
continue
if query.lower().startswith('textsearch '):
search_term = query[11:] # Remove 'textsearch ' prefix
print(f"\n--- Text search for: {search_term} ---")
results = search_documents_by_text(documents, search_term, limit=10)
last_search_results = [result['doc'] for result in results]
print(f"Found {len(results)} documents containing '{search_term}':")
for i, result in enumerate(results):
doc = result['doc']
print(f"\nResult {i+1} (Score: {result['score']:.3f}):")
print(f"Title: {doc['title']}")
print(f"Source: {doc['source']}")
print(f"File: {doc['full_path']}")
print(f"Text: {doc['text'][:300]}...")
if results:
print(f"\n💡 Use 'open [1-{len(results)}]' to open a document")
continue
if query.lower().startswith('search '):
search_term = query[7:] # Remove 'search ' prefix
print(f"\n--- Embedding search for: {search_term} ---")
results = embeddings.search(search_term, limit=5)
last_search_results = []
for i, result in enumerate(results):
print(f"\nResult {i+1}:")
# Try to get document info if possible
if hasattr(result, 'id') and result.id < len(documents):
doc = documents[result.id]
last_search_results.append(doc)
print(f"Title: {doc['title']}")
print(f"Source: {doc['source']}")
print(f"File: {doc['full_path']}")
print(f"Text: {doc['text'][:300]}...")
else:
print(f"Text: {str(result)[:300]}...")
last_search_results.append({'title': 'Unknown', 'source': 'Unknown', 'full_path': None})
continue
# Regular RAG query
print(f"\n--- Processing: {query} ---")
# Get RAG response
response = rag(query)
answer = response.get('answer', 'No answer found.')
print(f"\nAnswer: {answer}")
# Also show text search results for comparison
text_results = search_documents_by_text(documents, query, limit=3)
if text_results:
print(f"\n📚 Most relevant documents (text search):")
for i, result in enumerate(text_results):
doc = result['doc']
print(f" {i+1}. {doc['title']} - {doc['source']}")
print(f" File: {doc['full_path']}")
except KeyboardInterrupt:
print("\n\nExiting...")
break
except Exception as e:
print(f"Error: {e}")
continue
print("\n--- Session ended ---")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment