Last active
July 28, 2025 19:37
-
-
Save samehkamaleldin/ae9d5ea73c7cbac249756d1d45937c2d to your computer and use it in GitHub Desktop.
Parse 3mensio xlms
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import xml.etree.ElementTree as ET | |
import re | |
from pathlib import Path | |
from typing import Dict, List, Any, Optional | |
from dataclasses import dataclass, field | |
import logging | |
import pandas as pd | |
from datetime import datetime | |
# Set up logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
FILES_DATA_SHEET= "FILES_DATA" | |
CASES_DATA_SHEET = "CASES_DATA" | |
@dataclass | |
class FilenameInfo: | |
"""Information extracted from the filename""" | |
case_id: str | |
state: str | |
visit_type: str | |
original_filename: str | |
@dataclass | |
class ReportableValue: | |
"""Structure for reportable values in measurements""" | |
type: str | |
id: str | |
label: str | |
prefix: str | |
value: str | |
unit: str | |
denominator: str | |
precision: str | |
@dataclass | |
class Measurement: | |
"""Structure for measurement data""" | |
id: str | |
report_id: str | |
workflow: str | |
label: str | |
is_custom: str | |
reportable_value: Optional[ReportableValue] = None | |
@dataclass | |
class ReportOption: | |
"""Structure for report options""" | |
label: str | |
value: str | |
workflow: str | |
@dataclass | |
class StudyInfo: | |
"""Study information from the XML""" | |
study_description: str = "" | |
study_id: str = "" | |
study_date: str = "" | |
accession_number: str = "" | |
referring_physician: str = "" | |
patient_name: str = "" | |
patient_id: str = "" | |
patient_sex: str = "" | |
patient_weight: str = "" | |
patient_height: str = "" | |
patient_birth_date: str = "" | |
@dataclass | |
class PatientInformation: | |
"""Patient information fields""" | |
fields: Dict[str, Any] = field(default_factory=dict) | |
@dataclass | |
class CardioVascularReport: | |
"""Complete cardiovascular report data""" | |
filename_info: FilenameInfo | |
version: Dict[str, str] = field(default_factory=dict) | |
study_info: StudyInfo = field(default_factory=StudyInfo) | |
patient_information: PatientInformation = field(default_factory=PatientInformation) | |
measurements: List[Measurement] = field(default_factory=list) | |
report_options: List[ReportOption] = field(default_factory=list) | |
sections: Dict[str, Any] = field(default_factory=dict) | |
custom_fields: Dict[str, Any] = field(default_factory=dict) | |
class CardioVascularXMLParser: | |
"""Parser for cardiovascular XML files""" | |
def __init__(self): | |
self.filename_pattern = re.compile(r'([A-Z0-9]+-[A-Z0-9]+)_([^.]+)\.([^.]+)\.xml') | |
# Examples of filename pattern: | |
# - "000-EAP-005_NoPostDil.imp.xml" | |
# - "000-EAP-005_BeforePostDil.2nd.xml" | |
# - "000-EAP-005_AfterPostDil.imp.xml" | |
# - "BAV-GUH0005_BeforePostDil.imp.xml" | |
# - "BAV-GUH0005_BeforePostDil.2nd.xml" | |
def parse_filename(self, filepath: str) -> FilenameInfo: | |
"""Extract case ID and state from filename""" | |
filename = Path(filepath).name | |
match = self.filename_pattern.match(filename) | |
if not match: | |
logger.warning(f"Filename {filename} doesn't match expected pattern") | |
return FilenameInfo( | |
case_id="UNKNOWN", | |
state="UNKNOWN", | |
visit_type="UNKNOWN", | |
original_filename=filename | |
) | |
case_id, state, visit_type = match.groups() | |
return FilenameInfo( | |
case_id=case_id, | |
state=state, | |
visit_type=visit_type, | |
original_filename=filename | |
) | |
def parse_version(self, version_element: ET.Element) -> Dict[str, str]: | |
"""Parse version information""" | |
version_info = { | |
'version_text': version_element.text or "", | |
'app_name': version_element.get('AppName', ''), | |
'app_major': version_element.get('AppMajor', ''), | |
'app_minor': version_element.get('AppMinor', '') | |
} | |
return version_info | |
def parse_study_info(self, study_info_element: ET.Element) -> StudyInfo: | |
"""Parse study information""" | |
study_info = StudyInfo() | |
for child in study_info_element: | |
field_name = child.tag.lower().replace('study', '').replace('patient', '') | |
if field_name.startswith('_'): | |
field_name = field_name[1:] | |
# Map XML tags to StudyInfo fields | |
field_mapping = { | |
'description': 'study_description', | |
'id': 'study_id', | |
'date': 'study_date', | |
'accessionnumber': 'accession_number', | |
'referringphysician': 'referring_physician', | |
'name': 'patient_name', | |
'patientid': 'patient_id', | |
'sex': 'patient_sex', | |
'weight': 'patient_weight', | |
'height': 'patient_height', | |
'birthdate': 'patient_birth_date' | |
} | |
if child.tag in ['StudyDescription', 'StudyId', 'StudyDate', 'AccessionNumber', 'ReferringPhysician']: | |
attr_name = field_mapping.get(child.tag.lower().replace('study', ''), child.tag.lower()) | |
if hasattr(study_info, attr_name): | |
setattr(study_info, attr_name, child.text or "") | |
elif child.tag in ['PatientName', 'PatientId', 'PatientSex', 'PatientWeight', 'PatientHeight', 'PatientBirthDate']: | |
attr_name = field_mapping.get(child.tag.lower(), child.tag.lower()) | |
if hasattr(study_info, attr_name): | |
setattr(study_info, attr_name, child.text or "") | |
return study_info | |
def parse_patient_information(self, patient_info_element: ET.Element) -> PatientInformation: | |
"""Parse patient information fields""" | |
patient_info = PatientInformation() | |
fields_element = patient_info_element.find('Fields') | |
if fields_element is not None: | |
for field in fields_element: | |
field_type = field.get('Type', '') | |
patient_info.fields[field.tag] = { | |
'value': field.text or "", | |
'type': field_type | |
} | |
return patient_info | |
def parse_reportable_value(self, reportable_value_element: ET.Element) -> ReportableValue: | |
"""Parse a reportable value element""" | |
return ReportableValue( | |
type=reportable_value_element.get('Type', ''), | |
id=reportable_value_element.get('Id', ''), | |
label=reportable_value_element.get('Label', ''), | |
prefix=reportable_value_element.get('Prefix', ''), | |
value=reportable_value_element.get('Value', ''), | |
unit=reportable_value_element.get('Unit', ''), | |
denominator=reportable_value_element.get('Denominator', ''), | |
precision=reportable_value_element.get('Precision', '') | |
) | |
def parse_measurement(self, measurement_element: ET.Element) -> Measurement: | |
"""Parse a measurement element""" | |
measurement = Measurement( | |
id=measurement_element.get('Id', ''), | |
report_id=measurement_element.get('ReportId', ''), | |
workflow=measurement_element.get('Workflow', ''), | |
label=measurement_element.get('Label', ''), | |
is_custom=measurement_element.get('IsCustom', '') | |
) | |
# Parse reportable value if present | |
reportable_value_element = measurement_element.find('ReportableValue') | |
if reportable_value_element is not None: | |
measurement.reportable_value = self.parse_reportable_value(reportable_value_element) | |
return measurement | |
def parse_report_option(self, report_option_element: ET.Element) -> ReportOption: | |
"""Parse a report option element""" | |
return ReportOption( | |
label=report_option_element.get('Label', ''), | |
value=report_option_element.get('Value', ''), | |
workflow=report_option_element.get('Workflow', '') | |
) | |
def parse_sections(self, sections_element: ET.Element) -> Dict[str, Any]: | |
"""Parse sections element""" | |
sections = {} | |
for section in sections_element: | |
section_name = section.tag | |
section_data = { | |
'is_visible': section.get('IsVisible', ''), | |
'fields': {} | |
} | |
fields_element = section.find('Fields') | |
if fields_element is not None: | |
for field in fields_element: | |
field_type = field.get('Type', '') | |
section_data['fields'][field.tag] = { | |
'value': field.text or "", | |
'type': field_type | |
} | |
sections[section_name] = section_data | |
return sections | |
def parse_models(self, models_element: ET.Element) -> Dict[str, Any]: | |
"""Parse models section which contains various medical device information""" | |
models = {} | |
for model in models_element: | |
if model.tag == 'StudyInfo': | |
continue # Already parsed separately | |
elif model.tag == 'PatientInformation': | |
continue # Already parsed separately | |
else: | |
# Parse other model types (AorticValve, Femoral, Subclavian, Carotids, etc.) | |
model_data = {'fields': {}, 'report_options': {}} | |
fields_element = model.find('Fields') | |
if fields_element is not None: | |
for field in fields_element: | |
field_type = field.get('Type', '') | |
model_data['fields'][field.tag] = { | |
'value': field.text or "", | |
'type': field_type | |
} | |
report_options_element = model.find('ReportOptions') | |
if report_options_element is not None: | |
for option in report_options_element: | |
model_data['report_options'][option.tag] = { | |
'value': option.text or "", | |
'attributes': option.attrib | |
} | |
models[model.tag] = model_data | |
return models | |
def parse_xml_file(self, filepath: str) -> CardioVascularReport: | |
"""Parse a single XML file and return structured data""" | |
logger.info(f"Parsing XML file: {filepath}") | |
# Parse filename | |
filename_info = self.parse_filename(filepath) | |
# Parse XML content | |
try: | |
tree = ET.parse(filepath) | |
root = tree.getroot() | |
except ET.ParseError as e: | |
logger.error(f"Error parsing XML file {filepath}: {e}") | |
raise | |
# Initialize report | |
report = CardioVascularReport(filename_info=filename_info) | |
# Parse each section | |
for element in root: | |
if element.tag == 'Version': | |
report.version = self.parse_version(element) | |
elif element.tag == 'Models': | |
# Parse StudyInfo | |
study_info_element = element.find('StudyInfo') | |
if study_info_element is not None: | |
report.study_info = self.parse_study_info(study_info_element) | |
# Parse PatientInformation | |
patient_info_element = element.find('PatientInformation') | |
if patient_info_element is not None: | |
report.patient_information = self.parse_patient_information(patient_info_element) | |
# Parse other models | |
models = self.parse_models(element) | |
report.custom_fields.update(models) | |
elif element.tag == 'Sections': | |
report.sections = self.parse_sections(element) | |
elif element.tag == 'Custom': | |
# Parse custom measurements | |
measurements_element = element.find('Measurements') | |
if measurements_element is not None: | |
for measurement_element in measurements_element: | |
measurement = self.parse_measurement(measurement_element) | |
report.measurements.append(measurement) | |
# Parse report options | |
report_options_element = element.find('ReportOptions') | |
if report_options_element is not None: | |
for option_element in report_options_element: | |
option = self.parse_report_option(option_element) | |
report.report_options.append(option) | |
elif element.tag == 'CustomFields': | |
# Parse any custom fields | |
for field in element: | |
report.custom_fields[field.tag] = { | |
'value': field.text or "", | |
'attributes': field.attrib | |
} | |
logger.info(f"Successfully parsed {filepath}") | |
return report | |
def parse_directory(self, directory_path: str) -> List[CardioVascularReport]: | |
"""Parse all XML files in a directory""" | |
directory = Path(directory_path) | |
xml_files = list(directory.glob('*.xml')) | |
if not xml_files: | |
logger.warning(f"No XML files found in {directory_path}") | |
return [] | |
reports = [] | |
for xml_file in xml_files: | |
try: | |
report = self.parse_xml_file(str(xml_file)) | |
reports.append(report) | |
except Exception as e: | |
logger.error(f"Failed to parse {xml_file}: {e}") | |
return reports | |
def to_dict(self, report: CardioVascularReport) -> Dict[str, Any]: | |
"""Convert report to dictionary format""" | |
def convert_dataclass_to_dict(obj): | |
if hasattr(obj, '__dict__'): | |
result = {} | |
for key, value in obj.__dict__.items(): | |
if isinstance(value, list): | |
result[key] = [convert_dataclass_to_dict(item) for item in value] | |
elif hasattr(value, '__dict__'): | |
result[key] = convert_dataclass_to_dict(value) | |
else: | |
result[key] = value | |
return result | |
else: | |
return obj | |
return convert_dataclass_to_dict(report) | |
class CardioVascularBatchProcessor: | |
""" | |
Batch processor for cardiovascular XML files that exports to XLSX format | |
matching the template structure | |
""" | |
def __init__(self): | |
self.parser = CardioVascularXMLParser() | |
# Define the exact column mapping from the template | |
self.excel_columns = [ | |
'File Name', | |
'Patient Name', | |
'Patient ID', | |
'Visit Type', | |
'Study Date', | |
'Received Date', | |
'Review Date', | |
'Study Description', | |
'Patient Size', | |
'Patient Weight', | |
'Comments', | |
'Host Name', | |
'UpperDistanceA', | |
'LowerDistanceB', | |
'UpperDistanceC', | |
'LowerDistanceD', | |
'Prosthesis Waist Width', | |
'Inflow Width', | |
'NCS', | |
'LCS', | |
'THV size', | |
'Balloon PostDil Timing', | |
'Analysability(Stent Geometry)', | |
'Non-Analysability Reasons(Stent Geometry)', | |
'Analysability(Depth Implantation)', | |
'Frame of analysis', | |
'LAO/RAO', | |
'CAU/CRA', | |
'LAO/RAO Angle', | |
'CAU/CRA Angle', | |
'Crossover', | |
'Post Orientation', | |
'Middle Post Orientation' | |
] | |
# Define file types for cases sheet | |
self.file_types = [ | |
'NoPostDil.imp', | |
'NoPostDil.2nd', | |
'BeforePostDil.imp', | |
'BeforePostDil.2nd', | |
'AfterPostDil.imp', | |
'AfterPostDil.2nd' | |
] | |
# Define measurement columns that repeat for each file type | |
self.measurement_columns = [ | |
'Comments', | |
'Host Name', | |
'UpperDistanceA', | |
'LowerDistanceB', | |
'UpperDistanceC', | |
'LowerDistanceD', | |
'Prosthesis Waist Width', | |
'Inflow Width', | |
'NCS', | |
'LCS', | |
'THV size', | |
'Balloon PostDil Timing', | |
'Analysability(Stent Geometry)', | |
'Non-Analysability Reasons(Stent Geometry)', | |
'Analysability(Depth Implantation)', | |
'Frame of analysis', | |
'LAO/RAO', | |
'CAU/CRA', | |
'LAO/RAO Angle', | |
'CAU/CRA Angle', | |
'Crossover', | |
'Post Orientation', | |
'Middle Post Orientation' | |
] | |
# Build cases sheet columns | |
self.cases_columns = [ | |
'Patient ID', | |
'Study Date', | |
'Received Date', | |
'Review Date', | |
'Study Description', | |
'Patient Size', | |
'Patient Weight' | |
] | |
# Add file type blocks with duplicated column names | |
for file_type in self.file_types: | |
self.cases_columns.append(file_type) # File type identifier column | |
self.cases_columns.extend(self.measurement_columns) # Duplicated measurement columns | |
def extract_patient_id(self, report: CardioVascularReport) -> str: | |
"""Extract and normalize patient ID from report""" | |
pid = report.study_info.patient_id.strip() | |
if " " in pid: | |
pid = pid.split(" ")[0].strip() # Use first part of ID as patient name if it contains spaces | |
if not pid: | |
pid = "UNKNOWN" | |
return pid | |
def extract_measurement_value(self, measurements: List[Measurement], measurement_id: str) -> Optional[float]: | |
"""Extract measurement value by ID""" | |
for measurement in measurements: | |
if measurement.id == measurement_id and measurement.reportable_value: | |
try: | |
return float(measurement.reportable_value.value) | |
except (ValueError, TypeError): | |
return None | |
return None | |
def extract_report_option_value(self, report_options: List[ReportOption], option_label: str) -> Optional[str]: | |
"""Extract report option value by label""" | |
for option in report_options: | |
if option.label == option_label: | |
return option.value | |
return None | |
def parse_study_date(self, date_string: str) -> Optional[datetime]: | |
"""Parse study date string to datetime""" | |
if not date_string: | |
return None | |
try: | |
# Try common date formats | |
date_formats = [ | |
'%m/%d/%Y %I:%M %p', # 1/1/2024 12:01 PM | |
'%m/%d/%Y %H:%M', # 1/1/2024 12:01 (24-hour) | |
'%d/%m/%Y %I:%M %p', # 1/1/2024 12:01 PM (day/month) | |
'%d/%m/%Y %H:%M', # 1/1/2024 12:01 (day/month, 24-hour) | |
'%Y-%m-%d %H:%M:%S', # 2024-01-01 12:01:00 | |
'%Y-%m-%d', # 2024-01-01 | |
'%m/%d/%Y', # 1/1/2024 | |
'%d/%m/%Y' # 1/1/2024 (day/month) | |
] | |
for fmt in date_formats: | |
try: | |
return datetime.strptime(date_string, fmt) | |
except ValueError: | |
continue | |
logger.warning(f"Could not parse date: {date_string}") | |
return None | |
except Exception as e: | |
logger.warning(f"Error parsing date {date_string}: {e}") | |
return None | |
def convert_report_to_excel_row(self, report: CardioVascularReport) -> Dict[str, Any]: | |
"""Convert a CardioVascularReport to an Excel row dictionary""" | |
# Extract patient information | |
patient_name = "" | |
patient_weight = None | |
patient_height = None | |
if report.patient_information.fields: | |
patient_name = report.patient_information.fields.get('PatientName', {}).get('value', '').strip() | |
try: | |
weight_str = report.patient_information.fields.get('PatientWeight', {}).get('value', '') | |
if weight_str: | |
patient_weight = float(weight_str) | |
except (ValueError, TypeError): | |
pass | |
try: | |
height_str = report.patient_information.fields.get('PatientHeight', {}).get('value', '') | |
if height_str: | |
patient_height = float(height_str) | |
except (ValueError, TypeError): | |
pass | |
# Extract measurements | |
prosthesis_waist_width = self.extract_measurement_value(report.measurements, 'ProsthesisWaistWidth') | |
inflow_width = self.extract_measurement_value(report.measurements, 'InflowWidth') | |
ncs = self.extract_measurement_value(report.measurements, 'NCS') | |
lcs = self.extract_measurement_value(report.measurements, 'LCS') | |
# Extract report options | |
thv_size = self.extract_report_option_value(report.report_options, 'THV size') | |
balloon_postdil_timing = self.extract_report_option_value(report.report_options, 'Balloon PostDil Timing') | |
analysability_stent = self.extract_report_option_value(report.report_options, 'Analysability(Stent Geometry)') | |
analysability_depth = self.extract_report_option_value(report.report_options, 'Analysability(Depth Implantation)') | |
frame_analysis = self.extract_report_option_value(report.report_options, 'Frame of analysis') | |
lao_rao = self.extract_report_option_value(report.report_options, 'LAO/RAO') | |
cau_cra = self.extract_report_option_value(report.report_options, 'CAU/CRA') | |
lao_rao_angle = self.extract_report_option_value(report.report_options, 'LAO/RAO Angle') | |
cau_cra_angle = self.extract_report_option_value(report.report_options, 'CAU/CRA Angle') | |
crossover = self.extract_report_option_value(report.report_options, 'Crossover') | |
post_orientation = self.extract_report_option_value(report.report_options, 'Post Orientation') | |
middle_post_orientation = self.extract_report_option_value(report.report_options, 'Middle Post Orientation') | |
# Convert numeric strings to float where applicable | |
def safe_float(value): | |
if value is None: | |
return None | |
try: | |
return float(value) | |
except (ValueError, TypeError): | |
return None | |
thv_size_float = safe_float(thv_size) | |
frame_analysis_float = safe_float(frame_analysis) | |
lao_rao_angle_float = safe_float(lao_rao_angle) | |
cau_cra_angle_float = safe_float(cau_cra_angle) | |
# Parse study date | |
study_date = self.parse_study_date(report.study_info.study_date) | |
# Create the row dictionary matching Excel template columns | |
pid = self.extract_patient_id(report) | |
row = { | |
'File Name': report.filename_info.original_filename, | |
'Patient Name': patient_name, | |
'Patient ID': pid, | |
'Visit Type': report.filename_info.visit_type, | |
'Study Date': study_date, | |
'Received Date': None, # Not available in XML | |
'Review Date': None, # Not available in XML | |
'Study Description': report.study_info.study_description, | |
'Patient Size': patient_height, | |
'Patient Weight': patient_weight, | |
'Comments': None, # Not available in XML | |
'Host Name': None, # Not available in XML | |
'UpperDistanceA': None, # Not available in XML | |
'LowerDistanceB': None, # Not available in XML | |
'UpperDistanceC': None, # Not available in XML | |
'LowerDistanceD': None, # Not available in XML | |
'Prosthesis Waist Width': prosthesis_waist_width, | |
'Inflow Width': inflow_width, | |
'NCS': ncs, | |
'LCS': lcs, | |
'THV size': thv_size_float, | |
'Balloon PostDil Timing': balloon_postdil_timing, | |
'Analysability(Stent Geometry)': analysability_stent, | |
'Non-Analysability Reasons(Stent Geometry)': None, # Not directly available | |
'Analysability(Depth Implantation)': analysability_depth, | |
'Frame of analysis': frame_analysis_float, | |
'LAO/RAO': lao_rao, | |
'CAU/CRA': cau_cra, | |
'LAO/RAO Angle': lao_rao_angle_float, | |
'CAU/CRA Angle': cau_cra_angle_float, | |
'Crossover': crossover, | |
'Post Orientation': post_orientation, | |
'Middle Post Orientation': middle_post_orientation | |
} | |
return row | |
def get_file_type_identifier(self, filename_info: FilenameInfo) -> str: | |
"""Get the file type identifier from filename info""" | |
return f"{filename_info.state}.{filename_info.visit_type}" | |
def group_reports_by_case(self, reports: List[CardioVascularReport]) -> Dict[str, List[CardioVascularReport]]: | |
"""Group reports by case ID (Patient ID)""" | |
cases = {} | |
for report in reports: | |
patient_id = self.extract_patient_id(report) | |
if patient_id not in cases: | |
cases[patient_id] = [] | |
cases[patient_id].append(report) | |
return cases | |
def convert_case_to_excel_row(self, case_reports: List[CardioVascularReport]) -> List[Any]: | |
"""Convert a case (group of reports) to an Excel row list for cases sheet""" | |
# Use the first report for common case information | |
primary_report = case_reports[0] | |
# Extract common case information | |
patient_weight = None | |
patient_height = None | |
if primary_report.patient_information.fields: | |
try: | |
weight_str = primary_report.patient_information.fields.get('PatientWeight', {}).get('value', '') | |
if weight_str: | |
patient_weight = float(weight_str) | |
except (ValueError, TypeError): | |
pass | |
try: | |
height_str = primary_report.patient_information.fields.get('PatientHeight', {}).get('value', '') | |
if height_str: | |
patient_height = float(height_str) | |
except (ValueError, TypeError): | |
pass | |
# Parse study date | |
study_date = self.parse_study_date(primary_report.study_info.study_date) | |
# Start with common case info | |
row_values = [ | |
self.extract_patient_id(primary_report), # Patient ID | |
study_date, # Study Date | |
None, # Received Date | |
None, # Review Date | |
primary_report.study_info.study_description, # Study Description | |
patient_height, # Patient Size | |
patient_weight # Patient Weight | |
] | |
# Create a mapping of file types to reports | |
file_type_reports = {} | |
for report in case_reports: | |
file_type = self.get_file_type_identifier(report.filename_info) | |
file_type_reports[file_type] = report | |
# Process each file type block | |
for file_type in self.file_types: | |
# Add file type identifier column | |
if file_type in file_type_reports: | |
row_values.append(file_type_reports[file_type].filename_info.original_filename) | |
else: | |
row_values.append(None) | |
# Add measurement columns for this file type | |
if file_type in file_type_reports: | |
report = file_type_reports[file_type] | |
# Extract measurements for this file | |
prosthesis_waist_width = self.extract_measurement_value(report.measurements, 'ProsthesisWaistWidth') | |
inflow_width = self.extract_measurement_value(report.measurements, 'InflowWidth') | |
ncs = self.extract_measurement_value(report.measurements, 'NCS') | |
lcs = self.extract_measurement_value(report.measurements, 'LCS') | |
# Extract report options | |
thv_size = self.extract_report_option_value(report.report_options, 'THV size') | |
balloon_postdil_timing = self.extract_report_option_value(report.report_options, 'Balloon PostDil Timing') | |
analysability_stent = self.extract_report_option_value(report.report_options, 'Analysability(Stent Geometry)') | |
analysability_depth = self.extract_report_option_value(report.report_options, 'Analysability(Depth Implantation)') | |
frame_analysis = self.extract_report_option_value(report.report_options, 'Frame of analysis') | |
lao_rao = self.extract_report_option_value(report.report_options, 'LAO/RAO') | |
cau_cra = self.extract_report_option_value(report.report_options, 'CAU/CRA') | |
lao_rao_angle = self.extract_report_option_value(report.report_options, 'LAO/RAO Angle') | |
cau_cra_angle = self.extract_report_option_value(report.report_options, 'CAU/CRA Angle') | |
crossover = self.extract_report_option_value(report.report_options, 'Crossover') | |
post_orientation = self.extract_report_option_value(report.report_options, 'Post Orientation') | |
middle_post_orientation = self.extract_report_option_value(report.report_options, 'Middle Post Orientation') | |
# Convert numeric strings to float where applicable | |
def safe_float(value): | |
if value is None: | |
return None | |
try: | |
return float(value) | |
except (ValueError, TypeError): | |
return None | |
thv_size_float = safe_float(thv_size) | |
frame_analysis_float = safe_float(frame_analysis) | |
lao_rao_angle_float = safe_float(lao_rao_angle) | |
cau_cra_angle_float = safe_float(cau_cra_angle) | |
# Add measurement values in the correct order | |
measurement_values = [ | |
None, # Comments - Not available in XML | |
None, # Host Name - Not available in XML | |
None, # UpperDistanceA - Not available in XML | |
None, # LowerDistanceB - Not available in XML | |
None, # UpperDistanceC - Not available in XML | |
None, # LowerDistanceD - Not available in XML | |
prosthesis_waist_width, | |
inflow_width, | |
ncs, | |
lcs, | |
thv_size_float, | |
balloon_postdil_timing, | |
analysability_stent, | |
None, # Non-Analysability Reasons(Stent Geometry) - Not directly available | |
analysability_depth, | |
frame_analysis_float, | |
lao_rao, | |
cau_cra, | |
lao_rao_angle_float, | |
cau_cra_angle_float, | |
crossover, | |
post_orientation, | |
middle_post_orientation | |
] | |
row_values.extend(measurement_values) | |
else: | |
# File type not present - add empty values for all measurement columns | |
row_values.extend([None] * len(self.measurement_columns)) | |
return row_values | |
def create_cases_dataframe(self, reports: List[CardioVascularReport]) -> pd.DataFrame: | |
"""Create cases DataFrame with duplicated column names handled carefully""" | |
# Group reports by case | |
cases = self.group_reports_by_case(reports) | |
if not cases: | |
logger.warning("No cases found") | |
return pd.DataFrame(columns=self.cases_columns) | |
# Convert cases to rows | |
rows_data = [] | |
for patient_id, case_reports in cases.items(): | |
try: | |
row_values = self.convert_case_to_excel_row(case_reports) | |
rows_data.append(row_values) | |
except Exception as e: | |
logger.error(f"Failed to convert case {patient_id}: {e}") | |
if not rows_data: | |
return pd.DataFrame(columns=self.cases_columns) | |
# Create DataFrame directly from rows data and column names | |
# This handles duplicated column names by using positional indexing | |
df = pd.DataFrame(rows_data, columns=self.cases_columns) | |
# Sort by Patient ID for consistent output | |
df = df.sort_values('Patient ID').reset_index(drop=True) | |
logger.info(f"Successfully created cases DataFrame with {len(df)} cases") | |
return df | |
def process_directory(self, input_directory: str, output_file: str = None) -> tuple[pd.DataFrame, pd.DataFrame]: | |
""" | |
Process all XML files in a directory and return/export as Excel format | |
Args: | |
input_directory: Path to directory containing XML files | |
output_file: Optional path to save Excel file | |
Returns: | |
tuple of (files_dataframe, cases_dataframe) | |
""" | |
logger.info(f"Processing XML files in directory: {input_directory}") | |
# Parse all XML files | |
reports = self.parser.parse_directory(input_directory) | |
if not reports: | |
logger.warning("No reports were successfully parsed") | |
empty_files_df = pd.DataFrame(columns=self.excel_columns) | |
empty_cases_df = pd.DataFrame() | |
return empty_files_df, empty_cases_df | |
# Create files DataFrame (existing functionality) | |
files_rows = [] | |
for report in reports: | |
try: | |
row = self.convert_report_to_excel_row(report) | |
files_rows.append(row) | |
except Exception as e: | |
logger.error(f"Failed to convert report {report.filename_info.original_filename}: {e}") | |
# Create files DataFrame with exact column order | |
files_df = pd.DataFrame(files_rows, columns=self.excel_columns) | |
files_df = files_df.sort_values('File Name').reset_index(drop=True) | |
# Create cases DataFrame | |
cases_df = self.create_cases_dataframe(reports) | |
logger.info(f"Successfully processed {len(files_df)} file records and {len(cases_df)} case records") | |
# Export to Excel if output file specified | |
if output_file: | |
self.export_to_excel(files_df, cases_df, output_file) | |
return files_df, cases_df | |
def export_to_excel(self, files_df: pd.DataFrame, cases_df: pd.DataFrame, output_file: str): | |
"""Export both DataFrames to Excel with formatting""" | |
logger.info(f"Exporting to Excel: {output_file}") | |
try: | |
# Create Excel writer with formatting options | |
with pd.ExcelWriter(output_file, engine='openpyxl') as writer: | |
# Export files data sheet | |
files_df.to_excel(writer, sheet_name=FILES_DATA_SHEET, index=False) | |
# Export cases data sheet | |
cases_df.to_excel(writer, sheet_name=CASES_DATA_SHEET, index=False) | |
# Get workbook for formatting | |
workbook = writer.book | |
# Format FILES_DATA sheet | |
if FILES_DATA_SHEET in writer.sheets: | |
worksheet = writer.sheets[FILES_DATA_SHEET] | |
self._format_worksheet(worksheet) | |
# Format CASES_DATA sheet | |
if CASES_DATA_SHEET in writer.sheets: | |
worksheet = writer.sheets[CASES_DATA_SHEET] | |
self._format_worksheet(worksheet) | |
logger.info(f"Successfully exported {len(files_df)} file records and {len(cases_df)} case records to {output_file}") | |
except Exception as e: | |
logger.error(f"Failed to export to Excel: {e}") | |
raise | |
def _format_worksheet(self, worksheet): | |
"""Apply formatting to a worksheet""" | |
# Auto-adjust column widths | |
for column in worksheet.columns: | |
max_length = 0 | |
column_letter = column[0].column_letter | |
for cell in column: | |
try: | |
if len(str(cell.value)) > max_length: | |
max_length = len(str(cell.value)) | |
except: | |
pass | |
# Set column width with some padding | |
adjusted_width = min(max_length + 2, 50) # Cap at 50 characters | |
worksheet.column_dimensions[column_letter].width = adjusted_width | |
def parse_cardiovascular_xml_files(directory_path: str = "data/templates") -> List[Dict[str, Any]]: | |
""" | |
Convenience function to parse all XML files in the given directory | |
and return them as dictionaries | |
""" | |
parser = CardioVascularXMLParser() | |
reports = parser.parse_directory(directory_path) | |
return [parser.to_dict(report) for report in reports] | |
def process_directory_to_excel(input_directory: str, output_file: str) -> tuple[pd.DataFrame, pd.DataFrame]: | |
""" | |
Convenience function to process all XML files in a directory and export to Excel | |
Args: | |
input_directory: Path to directory containing XML files | |
output_file: Path to save Excel file | |
Returns: | |
tuple of (files_dataframe, cases_dataframe) | |
""" | |
processor = CardioVascularBatchProcessor() | |
return processor.process_directory(input_directory, output_file) | |
if __name__ == "__main__": | |
## ==================================================================================== | |
## CHANGE THIS TO YOUR INPUT DIRECTORY | |
## ==================================================================================== | |
INPUT_DIRECTORY = Path(r"/Users/sameh/workspace/tibsystems/cardio_eggs/data/templates") | |
time_str = datetime.now().strftime("%Y%m%d_%H%M%S") | |
OUTPUT_FILENAME = f"{INPUT_DIRECTORY.name}_t{time_str}.xlsx" | |
files_df, cases_df = process_directory_to_excel(INPUT_DIRECTORY, OUTPUT_FILENAME) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment