|
# Nanonets-OCR-Setup.ps1 |
|
# PowerShell script to set up and run Nanonets OCR model locally |
|
|
|
param( |
|
[string]$PdfPath = "", |
|
[switch]$InstallDependencies, |
|
[switch]$UseVLLM = $false, |
|
[string]$OutputDir = "gnosis/development" |
|
) |
|
|
|
# Function to check if running as administrator |
|
function Test-Administrator { |
|
$currentUser = [Security.Principal.WindowsIdentity]::GetCurrent() |
|
$principal = New-Object Security.Principal.WindowsPrincipal($currentUser) |
|
return $principal.IsInRole([Security.Principal.WindowsBuiltInRole]::Administrator) |
|
} |
|
|
|
# Function to install Python dependencies |
|
function Install-PythonDependencies { |
|
Write-Host "Installing Python dependencies..." -ForegroundColor Green |
|
|
|
$packages = @( |
|
"transformers", |
|
"torch", |
|
"torchvision", |
|
"pillow", |
|
"accelerate", |
|
"pdf2image", |
|
"pymupdf", |
|
"opencv-python", |
|
"numpy" |
|
) |
|
|
|
if ($UseVLLM) { |
|
$packages += "vllm", "openai" |
|
} |
|
|
|
foreach ($package in $packages) { |
|
Write-Host "Installing $package..." -ForegroundColor Yellow |
|
python -m pip install $package --upgrade |
|
} |
|
|
|
# Try to install flash-attn if CUDA is available |
|
try { |
|
python -m pip install flash-attn --no-build-isolation |
|
Write-Host "Flash Attention installed successfully" -ForegroundColor Green |
|
} catch { |
|
Write-Host "Flash Attention installation failed (optional, will use default attention)" -ForegroundColor Yellow |
|
} |
|
} |
|
|
|
# Function to check Python installation |
|
function Test-PythonInstallation { |
|
try { |
|
$pythonVersion = python --version 2>&1 |
|
Write-Host "Python found: $pythonVersion" -ForegroundColor Green |
|
return $true |
|
} catch { |
|
Write-Host "Python not found. Please install Python 3.8 or higher." -ForegroundColor Red |
|
return $false |
|
} |
|
} |
|
|
|
# Function to create directory structure |
|
function Initialize-DirectoryStructure { |
|
param([string]$BaseDir) |
|
|
|
if (-not (Test-Path $BaseDir)) { |
|
New-Item -ItemType Directory -Path $BaseDir -Force | Out-Null |
|
Write-Host "Created directory: $BaseDir" -ForegroundColor Green |
|
} |
|
|
|
$timestamp = Get-Date -Format "yyyy-MM-dd_HH-mm-ss" |
|
$sessionDir = Join-Path $BaseDir "ocr_session_$timestamp" |
|
New-Item -ItemType Directory -Path $sessionDir -Force | Out-Null |
|
|
|
# Create subdirectories |
|
$dirs = @("input", "output", "logs", "images") |
|
foreach ($dir in $dirs) { |
|
New-Item -ItemType Directory -Path (Join-Path $sessionDir $dir) -Force | Out-Null |
|
} |
|
|
|
return $sessionDir |
|
} |
|
|
|
# Function to select PDF file |
|
function Select-PDFFile { |
|
Add-Type -AssemblyName System.Windows.Forms |
|
$dialog = New-Object System.Windows.Forms.OpenFileDialog |
|
$dialog.Filter = "PDF files (*.pdf)|*.pdf|All files (*.*)|*.*" |
|
$dialog.Title = "Select PDF file for OCR" |
|
$dialog.InitialDirectory = [Environment]::GetFolderPath("Desktop") |
|
|
|
if ($dialog.ShowDialog() -eq "OK") { |
|
return $dialog.FileName |
|
} |
|
return $null |
|
} |
|
|
|
# Create Python script for OCR processing |
|
function Create-OCRScript { |
|
param([string]$ScriptPath) |
|
|
|
$pythonScript = @' |
|
import os |
|
import sys |
|
import json |
|
import base64 |
|
from pathlib import Path |
|
from datetime import datetime |
|
from PIL import Image |
|
import fitz # PyMuPDF |
|
import numpy as np |
|
from transformers import AutoTokenizer, AutoProcessor, AutoModelForImageTextToText |
|
import torch |
|
import logging |
|
|
|
# Set up logging |
|
def setup_logging(log_dir): |
|
log_file = os.path.join(log_dir, f"ocr_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log") |
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(levelname)s - %(message)s', |
|
handlers=[ |
|
logging.FileHandler(log_file), |
|
logging.StreamHandler(sys.stdout) |
|
] |
|
) |
|
return logging.getLogger(__name__) |
|
|
|
# Convert PDF to images |
|
def pdf_to_images(pdf_path, output_dir, dpi=300): |
|
"""Convert PDF pages to images""" |
|
pdf_document = fitz.open(pdf_path) |
|
images = [] |
|
|
|
for page_num in range(len(pdf_document)): |
|
page = pdf_document[page_num] |
|
mat = fitz.Matrix(dpi/72, dpi/72) |
|
pix = page.get_pixmap(matrix=mat) |
|
|
|
# Convert to PIL Image |
|
img_data = pix.pil_tobytes(format="PNG") |
|
img = Image.open(io.BytesIO(img_data)) |
|
|
|
# Save image |
|
img_path = os.path.join(output_dir, f"page_{page_num + 1:03d}.png") |
|
img.save(img_path) |
|
images.append((img_path, img)) |
|
|
|
print(f"Converted page {page_num + 1}/{len(pdf_document)}") |
|
|
|
pdf_document.close() |
|
return images |
|
|
|
# OCR function |
|
def ocr_page_with_nanonets_s(image_path, model, processor, max_new_tokens=4096): |
|
prompt = """Extract the text from the above document as if you were reading it naturally. Return the tables in html format. Return the equations in LaTeX representation. If there is an image in the document and image caption is not present, add a small description of the image inside the <img></img> tag; otherwise, add the image caption inside <img></img>. Watermarks should be wrapped in brackets. Ex: <watermark>OFFICIAL COPY</watermark>. Page numbers should be wrapped in brackets. Ex: <page_number>14</page_number> or <page_number>9/22</page_number>. Prefer using ☐ and ☑ for check boxes.""" |
|
|
|
if isinstance(image_path, str): |
|
image = Image.open(image_path) |
|
else: |
|
image = image_path |
|
|
|
messages = [ |
|
{"role": "system", "content": "You are a helpful assistant."}, |
|
{"role": "user", "content": [ |
|
{"type": "image"}, |
|
{"type": "text", "text": prompt}, |
|
]}, |
|
] |
|
|
|
# Process the image |
|
text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) |
|
inputs = processor(text=[text], images=[image], padding=True, return_tensors="pt") |
|
inputs = inputs.to(model.device) |
|
|
|
# Generate output |
|
with torch.no_grad(): |
|
output_ids = model.generate(**inputs, max_new_tokens=max_new_tokens, do_sample=False) |
|
|
|
generated_ids = [output_ids[len(input_ids):] for input_ids, output_ids in zip(inputs.input_ids, output_ids)] |
|
output_text = processor.batch_decode(generated_ids, skip_special_tokens=True, clean_up_tokenization_spaces=True) |
|
|
|
return output_text[0] |
|
|
|
# Main processing function |
|
def process_document(pdf_path, output_dir, use_flash_attention=True): |
|
logger = setup_logging(os.path.join(output_dir, "logs")) |
|
logger.info(f"Starting OCR processing for: {pdf_path}") |
|
|
|
# Create subdirectories |
|
images_dir = os.path.join(output_dir, "images") |
|
output_text_dir = os.path.join(output_dir, "output") |
|
|
|
try: |
|
# Load model |
|
logger.info("Loading Nanonets OCR model...") |
|
model_path = "nanonets/Nanonets-OCR-s" |
|
|
|
# Check and log GPU availability |
|
if torch.cuda.is_available(): |
|
gpu_name = torch.cuda.get_device_name(0) |
|
logger.info(f"GPU detected: {gpu_name}") |
|
print(f"✓ Using GPU acceleration: {gpu_name}") |
|
else: |
|
logger.info("No GPU detected, using CPU") |
|
print("✗ No GPU detected, using CPU (processing will be slower)") |
|
|
|
# Configure model loading |
|
|
|
model_kwargs = { |
|
"torch_dtype": torch.float16 if torch.cuda.is_available() else torch.float32, |
|
"device_map": "auto" |
|
} |
|
|
|
if use_flash_attention and torch.cuda.is_available(): |
|
try: |
|
import flash_attn |
|
model_kwargs["attn_implementation"] = "flash_attention_2" |
|
logger.info("Flash Attention 2 enabled") |
|
except ImportError: |
|
logger.warning("Flash Attention not available, using default attention") |
|
# Don't set attn_implementation if flash_attn is not installed |
|
|
|
|
|
model = AutoModelForImageTextToText.from_pretrained(model_path, **model_kwargs) |
|
model.eval() |
|
|
|
tokenizer = AutoTokenizer.from_pretrained(model_path) |
|
processor = AutoProcessor.from_pretrained(model_path) |
|
|
|
logger.info("Model loaded successfully") |
|
|
|
# Convert PDF to images |
|
logger.info("Converting PDF to images...") |
|
images = pdf_to_images(pdf_path, images_dir) |
|
logger.info(f"Converted {len(images)} pages") |
|
|
|
# Process each page |
|
all_results = [] |
|
for i, (img_path, img) in enumerate(images): |
|
logger.info(f"Processing page {i + 1}/{len(images)}...") |
|
try: |
|
result = ocr_page_with_nanonets_s(img_path, model, processor, max_new_tokens=8192) |
|
|
|
# Save individual page result |
|
page_output = os.path.join(output_text_dir, f"page_{i + 1:03d}.md") |
|
with open(page_output, 'w', encoding='utf-8') as f: |
|
f.write(result) |
|
|
|
all_results.append({ |
|
"page": i + 1, |
|
"image_path": img_path, |
|
"text": result |
|
}) |
|
|
|
logger.info(f"Page {i + 1} processed successfully") |
|
|
|
except Exception as e: |
|
logger.error(f"Error processing page {i + 1}: {str(e)}") |
|
all_results.append({ |
|
"page": i + 1, |
|
"image_path": img_path, |
|
"error": str(e) |
|
}) |
|
|
|
# Save combined output |
|
combined_output = os.path.join(output_text_dir, "combined_output.md") |
|
with open(combined_output, 'w', encoding='utf-8') as f: |
|
f.write(f"# OCR Results for {os.path.basename(pdf_path)}\n\n") |
|
f.write(f"Processed on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n") |
|
f.write("---\n\n") |
|
|
|
for result in all_results: |
|
f.write(f"## Page {result['page']}\n\n") |
|
if 'error' in result: |
|
f.write(f"**Error:** {result['error']}\n\n") |
|
else: |
|
f.write(result['text']) |
|
f.write("\n\n---\n\n") |
|
|
|
# Save JSON metadata |
|
metadata = { |
|
"source_pdf": pdf_path, |
|
"processing_date": datetime.now().isoformat(), |
|
"total_pages": len(images), |
|
"model_used": model_path, |
|
"device": "cuda" if torch.cuda.is_available() else "cpu", |
|
"pages": all_results |
|
} |
|
|
|
metadata_path = os.path.join(output_dir, "metadata.json") |
|
with open(metadata_path, 'w', encoding='utf-8') as f: |
|
json.dump(metadata, f, indent=2, ensure_ascii=False) |
|
|
|
logger.info(f"Processing complete. Results saved to: {output_dir}") |
|
|
|
# Clean up GPU memory if used |
|
if torch.cuda.is_available(): |
|
del model |
|
torch.cuda.empty_cache() |
|
|
|
return combined_output |
|
|
|
except Exception as e: |
|
logger.error(f"Fatal error: {str(e)}") |
|
raise |
|
|
|
if __name__ == "__main__": |
|
import io |
|
|
|
if len(sys.argv) < 3: |
|
print("Usage: python ocr_script.py <pdf_path> <output_dir>") |
|
sys.exit(1) |
|
|
|
pdf_path = sys.argv[1] |
|
output_dir = sys.argv[2] |
|
|
|
if not os.path.exists(pdf_path): |
|
print(f"Error: PDF file not found: {pdf_path}") |
|
sys.exit(1) |
|
|
|
result_file = process_document(pdf_path, output_dir) |
|
print(f"\nOCR complete! Combined results saved to: {result_file}") |
|
'@ |
|
|
|
$pythonScript | Out-File -FilePath $ScriptPath -Encoding UTF8 |
|
Write-Host "Created OCR processing script: $ScriptPath" -ForegroundColor Green |
|
} |
|
|
|
# Main execution |
|
Write-Host "`nNanonets OCR Local Setup and Execution" -ForegroundColor Cyan |
|
Write-Host "======================================`n" -ForegroundColor Cyan |
|
|
|
# Check Python installation |
|
if (-not (Test-PythonInstallation)) { |
|
Write-Host "Please install Python 3.8 or higher from https://www.python.org/" -ForegroundColor Red |
|
exit 1 |
|
} |
|
|
|
# Check GPU availability |
|
Write-Host "Checking hardware acceleration..." -ForegroundColor Yellow |
|
$gpuCheck = python -c "import torch; print('CUDA' if torch.cuda.is_available() else 'CPU')" |
|
if ($gpuCheck -eq "CUDA") { |
|
$gpuName = python -c "import torch; print(torch.cuda.get_device_name(0) if torch.cuda.is_available() else 'No GPU')" |
|
Write-Host "✓ GPU acceleration available: $gpuName" -ForegroundColor Green |
|
} else { |
|
Write-Host "✗ GPU not available, will use CPU (slower processing)" -ForegroundColor Yellow |
|
} |
|
Write-Host "" |
|
|
|
# Install dependencies if requested |
|
if ($InstallDependencies) { |
|
Install-PythonDependencies |
|
} |
|
|
|
|
|
# Create base directory structure |
|
$sessionDir = Initialize-DirectoryStructure -BaseDir $OutputDir |
|
|
|
# Select or use provided PDF |
|
if ([string]::IsNullOrEmpty($PdfPath)) { |
|
Write-Host "Please select a PDF file..." -ForegroundColor Yellow |
|
$PdfPath = Select-PDFFile |
|
if ([string]::IsNullOrEmpty($PdfPath)) { |
|
Write-Host "No file selected. Exiting." -ForegroundColor Red |
|
exit 1 |
|
} |
|
} |
|
|
|
if (-not (Test-Path $PdfPath)) { |
|
Write-Host "Error: PDF file not found: $PdfPath" -ForegroundColor Red |
|
exit 1 |
|
} |
|
|
|
Write-Host "`nSelected PDF: $PdfPath" -ForegroundColor Green |
|
|
|
# Copy PDF to input directory |
|
$inputPdf = Join-Path $sessionDir "input" (Split-Path $PdfPath -Leaf) |
|
Copy-Item -Path $PdfPath -Destination $inputPdf -Force |
|
Write-Host "Copied PDF to: $inputPdf" -ForegroundColor Green |
|
|
|
# Create OCR script |
|
$ocrScriptPath = Join-Path $sessionDir "ocr_processor.py" |
|
Create-OCRScript -ScriptPath $ocrScriptPath |
|
|
|
# Run OCR processing |
|
Write-Host "`nStarting OCR processing..." -ForegroundColor Cyan |
|
Write-Host "This may take several minutes depending on the PDF size and your hardware." -ForegroundColor Yellow |
|
Write-Host "GPU will be used if available for faster processing.`n" -ForegroundColor Yellow |
|
|
|
try { |
|
# Execute Python script |
|
$pythonArgs = @($ocrScriptPath, $inputPdf, $sessionDir) |
|
$process = Start-Process -FilePath "python" -ArgumentList $pythonArgs -NoNewWindow -Wait -PassThru |
|
|
|
if ($process.ExitCode -eq 0) { |
|
Write-Host "`nOCR processing completed successfully!" -ForegroundColor Green |
|
Write-Host "Results saved to: $sessionDir" -ForegroundColor Green |
|
|
|
# Display output structure |
|
Write-Host "`nOutput structure:" -ForegroundColor Cyan |
|
Get-ChildItem -Path $sessionDir -Recurse | Where-Object { -not $_.PSIsContainer } | ForEach-Object { |
|
$relativePath = $_.FullName.Replace($sessionDir, "").TrimStart("\") |
|
Write-Host " - $relativePath" -ForegroundColor Gray |
|
} |
|
|
|
# Open results directory |
|
Start-Process explorer.exe -ArgumentList $sessionDir |
|
|
|
# Show combined output path |
|
$combinedOutput = Join-Path $sessionDir "output" "combined_output.md" |
|
if (Test-Path $combinedOutput) { |
|
Write-Host "`nCombined OCR output: $combinedOutput" -ForegroundColor Green |
|
Write-Host "Would you like to open the results? (Y/N)" -ForegroundColor Yellow |
|
$response = Read-Host |
|
if ($response -eq 'Y' -or $response -eq 'y') { |
|
Start-Process notepad.exe -ArgumentList $combinedOutput |
|
} |
|
} |
|
} else { |
|
Write-Host "OCR processing failed with exit code: $($process.ExitCode)" -ForegroundColor Red |
|
Write-Host "Check the logs in: $(Join-Path $sessionDir 'logs')" -ForegroundColor Yellow |
|
} |
|
} catch { |
|
Write-Host "Error running OCR processing: $_" -ForegroundColor Red |
|
} |
|
|
|
Write-Host "`nPress any key to exit..." -ForegroundColor Gray |
|
$null = $Host.UI.RawUI.ReadKey("NoEcho,IncludeKeyDown") |