Created
December 5, 2020 14:20
-
-
Save jbaiter/def3f8d1d52e4be94b19bff1cf363b5c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from __future__ import annotations | |
import html | |
import sys | |
from concurrent.futures import ProcessPoolExecutor, as_completed | |
from dataclasses import dataclass | |
from enum import Enum | |
from itertools import chain | |
from multiprocessing import cpu_count | |
from pathlib import Path | |
from typing import Any, Iterable, List, NamedTuple, Optional | |
import lxml.etree as etree | |
from click import progressbar | |
HOCR_HEADER = """ | |
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"> | |
<html xmlns="http://www.w3.org/1999/xhtml"> | |
<head> | |
<meta http-equiv="Content-Type" content="text/html; charset=utf-8"/> | |
<meta name="ocr-capabilities" content="ocr_page ocrx_block ocr_line ocrx_word"/> | |
</head> | |
<body> | |
""" | |
MINIOCR_ALTMARKER = "⇿" | |
class BoxType(Enum): | |
PAGE = 1 | |
BLOCK = 2 | |
LINE = 3 | |
WORD = 4 | |
class Coordinates(NamedTuple): | |
ulx: int | |
uly: int | |
width: int | |
height: int | |
@classmethod | |
def parse(cls, elem: Any) -> Coordinates: | |
return Coordinates( | |
ulx=int(float(elem.get('HPOS') or '0')), | |
uly=int(float(elem.get('VPOS') or '0')), | |
width=int(float(elem.get('WIDTH'))), | |
height=int(float(elem.get('HEIGHT'))) | |
) | |
def to_bbox(self): | |
return f"bbox {self.ulx} {self.uly} {self.ulx + self.width} {self.uly + self.height}" | |
@dataclass | |
class OcrBox: | |
id: Optional[str] | |
coords: Coordinates | |
@dataclass | |
class Word(OcrBox): | |
content: str | |
subs_content: Optional[str] | |
hyphen_start: Optional[bool] | |
confidence: Optional[float] | |
alternatives: List[str] | |
suffix: Optional[str] | |
@dataclass | |
class Line(OcrBox): | |
words: Iterable[Word] | |
@dataclass | |
class TextBlock(OcrBox): | |
lines: Iterable[Line] | |
@dataclass | |
class Page(OcrBox): | |
blocks: Iterable[TextBlock] | |
def parse_alto(path: Path) -> Iterable[Page]: | |
tree: Any = etree.parse(str(path)) | |
nsmap = tree.getroot().nsmap | |
if 'alto' not in nsmap: | |
nsmap['alto'] = nsmap[None] | |
del nsmap[None] | |
for page_elem in tree.xpath('.//alto:Page', namespaces=nsmap): # noqa | |
blocks: List[TextBlock] = [] | |
for block_elem in page_elem.xpath('./alto:PrintSpace/alto:TextBlock', namespaces=nsmap): | |
lines: List[Line] = [] | |
in_hyphen = False | |
hyphen_rest = None | |
for line_elem in block_elem.xpath('./alto:TextLine', namespaces=nsmap): | |
words: List[Word] = [] | |
for word_elem in line_elem.xpath('./alto:String', namespaces=nsmap): | |
subs_type = word_elem.get('SUBS_TYPE') | |
if subs_type == 'HypPart1': | |
in_hyphen = True | |
elif subs_type == 'HypPart2': | |
in_hyphen = False | |
elif in_hyphen and subs_type is None: | |
continue | |
next_elem = word_elem.getnext() | |
subs_content = word_elem.get('SUBS_CONTENT') | |
content = word_elem.get('CONTENT') | |
if subs_content is not None: | |
if subs_type == 'HypPart1': | |
content = subs_content[:len(content)] | |
hyphen_rest = subs_content[len(content):] | |
elif subs_type == 'HypPart2': | |
content = hyphen_rest | |
hyphen_rest = None | |
words.append(Word( | |
id=word_elem.get('ID'), | |
coords=Coordinates.parse(word_elem), | |
content=content, | |
subs_content=subs_content, | |
hyphen_start=(None if subs_type is None else subs_type == 'HypPart1'), | |
confidence=float(word_elem.get('WC')) if 'WC' in word_elem.attrib else None, | |
alternatives=[elem.text for elem in word_elem.xpath('.//alto:ALTERNATIVE', namespaces=nsmap)], | |
suffix=(' ' if next_elem is not None and next_elem.tag[-2:] == 'SP' else None) | |
)) | |
lines.append(Line( | |
id=line_elem.get('ID'), | |
coords=Coordinates.parse(line_elem), | |
words=words)) | |
blocks.append(TextBlock( | |
id=block_elem.get('ID'), | |
coords=Coordinates.parse(block_elem), | |
lines=lines)) | |
yield Page( | |
id=page_elem.get('ID'), | |
coords=Coordinates.parse(page_elem), | |
blocks=blocks) | |
def to_hocr(pages: Iterable[Page], out_path: Path): | |
with out_path.open('wt') as fp: | |
fp.write(HOCR_HEADER) | |
for idx, page in enumerate(pages): # noqa | |
fp.write(f' <div class="ocr_page" id="{page.id}" title="{page.coords.to_bbox()}; ppageno {idx}">\n') | |
for block in page.blocks: | |
fp.write(f' <div class="ocrx_block" title="{block.coords.to_bbox()}">\n') | |
for line in block.lines: | |
fp.write(f' <div class="ocr_line" title="{line.coords.to_bbox()}">') | |
for word in line.words: | |
if word.alternatives and word.hyphen_start is None: | |
out = [f'<span class="alternatives"><ins class="alt">{html.escape(word.content)}</ins>'] | |
for alternative in word.alternatives: | |
out.append(f'<del class="alt">{html.escape(alternative)}</del>') | |
out.append('</span>') | |
word_txt = ''.join(out) | |
else: | |
word_txt = html.escape(word.content) | |
if word.hyphen_start: | |
word_txt += '­' | |
title_parts = [word.coords.to_bbox()] | |
if word.confidence is not None: | |
title_parts.append(f'x_conf {word.confidence * 100:.2f}') | |
fp.write(f'<span class="ocrx_word" title="{";".join(title_parts)}">{word_txt}</span>{word.suffix or ""}') | |
fp.write('</div>\n') | |
fp.write(' </div>\n') | |
fp.write(' </div>\n') | |
fp.write('</body>\n</html>') | |
def to_miniocr(pages: Iterable[Page], out_path: Path): | |
def format_coords(coords: Coordinates) -> str: | |
return f"{coords.ulx} {coords.uly} {coords.width} {coords.height}" | |
with out_path.open('wt') as fp: | |
fp.write('<ocr>\n') | |
for idx, page in enumerate(pages): # noqa | |
fp.write(f'<p xml:id="{page.id}" wh="{page.coords.width} {page.coords.height}">\n') | |
for block in page.blocks: | |
fp.write('<b>\n') | |
for line in block.lines: | |
fp.write('<l>') | |
for word in line.words: | |
if word.alternatives and word.hyphen_start is None: | |
word_txt = html.escape(MINIOCR_ALTMARKER.join([word.content, *word.alternatives])) | |
else: | |
word_txt = html.escape(word.content) | |
if word.hyphen_start: | |
word_txt += '\xad' | |
fp.write(f'<w x="{format_coords(word.coords)}">{word_txt}</w>{word.suffix or ""}') | |
fp.write('</l>\n') | |
fp.write('</b>\n') | |
fp.write('</p>\n') | |
fp.write('</ocr>') | |
def to_txt(pages: Iterable[Page], out_path: Path): | |
with out_path.open('wt') as fp: | |
lines = chain.from_iterable(chain.from_iterable(b.lines for b in p.blocks) for p in pages) | |
for line in lines: | |
in_hyphenation = False | |
for word in line.words: | |
fp.write(word.content) | |
if word.suffix: | |
fp.write(word.suffix) | |
if word.hyphen_start: | |
in_hyphenation = True | |
if not in_hyphenation: | |
fp.write(' ') | |
fp.write('\n') | |
def convert(alto_path: Path): | |
pages = list(parse_alto(alto_path)) | |
hocr_path = alto_path.parent / f'{alto_path.stem}.html' | |
to_hocr(pages, hocr_path) | |
miniocr_path = alto_path.parent / f'{alto_path.stem}_mini.xml' | |
to_miniocr(pages, miniocr_path) | |
txt_path = alto_path.parent / f'{alto_path.stem}.txt' | |
to_txt(pages, txt_path) | |
def main(base_dir: Path): | |
with ProcessPoolExecutor(max_workers=cpu_count()) as pool: | |
futs = [pool.submit(convert, p) for p in base_dir.glob('**/ocr.xml')] | |
with progressbar(as_completed(futs), length=len(futs)) as prog: | |
for _ in prog: | |
pass | |
if __name__ == '__main__': | |
main(Path(sys.argv[1])) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment