Skip to content

Instantly share code, notes, and snippets.

@samehkamaleldin
Last active July 28, 2025 19:37
Show Gist options
  • Save samehkamaleldin/ae9d5ea73c7cbac249756d1d45937c2d to your computer and use it in GitHub Desktop.
Save samehkamaleldin/ae9d5ea73c7cbac249756d1d45937c2d to your computer and use it in GitHub Desktop.
Parse 3mensio xlms
import xml.etree.ElementTree as ET
import re
from pathlib import Path
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, field
import logging
import pandas as pd
from datetime import datetime
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
FILES_DATA_SHEET= "FILES_DATA"
CASES_DATA_SHEET = "CASES_DATA"
@dataclass
class FilenameInfo:
"""Information extracted from the filename"""
case_id: str
state: str
visit_type: str
original_filename: str
@dataclass
class ReportableValue:
"""Structure for reportable values in measurements"""
type: str
id: str
label: str
prefix: str
value: str
unit: str
denominator: str
precision: str
@dataclass
class Measurement:
"""Structure for measurement data"""
id: str
report_id: str
workflow: str
label: str
is_custom: str
reportable_value: Optional[ReportableValue] = None
@dataclass
class ReportOption:
"""Structure for report options"""
label: str
value: str
workflow: str
@dataclass
class StudyInfo:
"""Study information from the XML"""
study_description: str = ""
study_id: str = ""
study_date: str = ""
accession_number: str = ""
referring_physician: str = ""
patient_name: str = ""
patient_id: str = ""
patient_sex: str = ""
patient_weight: str = ""
patient_height: str = ""
patient_birth_date: str = ""
@dataclass
class PatientInformation:
"""Patient information fields"""
fields: Dict[str, Any] = field(default_factory=dict)
@dataclass
class CardioVascularReport:
"""Complete cardiovascular report data"""
filename_info: FilenameInfo
version: Dict[str, str] = field(default_factory=dict)
study_info: StudyInfo = field(default_factory=StudyInfo)
patient_information: PatientInformation = field(default_factory=PatientInformation)
measurements: List[Measurement] = field(default_factory=list)
report_options: List[ReportOption] = field(default_factory=list)
sections: Dict[str, Any] = field(default_factory=dict)
custom_fields: Dict[str, Any] = field(default_factory=dict)
class CardioVascularXMLParser:
"""Parser for cardiovascular XML files"""
def __init__(self):
self.filename_pattern = re.compile(r'([A-Z0-9]+-[A-Z0-9]+)_([^.]+)\.([^.]+)\.xml')
# Examples of filename pattern:
# - "000-EAP-005_NoPostDil.imp.xml"
# - "000-EAP-005_BeforePostDil.2nd.xml"
# - "000-EAP-005_AfterPostDil.imp.xml"
# - "BAV-GUH0005_BeforePostDil.imp.xml"
# - "BAV-GUH0005_BeforePostDil.2nd.xml"
def parse_filename(self, filepath: str) -> FilenameInfo:
"""Extract case ID and state from filename"""
filename = Path(filepath).name
match = self.filename_pattern.match(filename)
if not match:
logger.warning(f"Filename {filename} doesn't match expected pattern")
return FilenameInfo(
case_id="UNKNOWN",
state="UNKNOWN",
visit_type="UNKNOWN",
original_filename=filename
)
case_id, state, visit_type = match.groups()
return FilenameInfo(
case_id=case_id,
state=state,
visit_type=visit_type,
original_filename=filename
)
def parse_version(self, version_element: ET.Element) -> Dict[str, str]:
"""Parse version information"""
version_info = {
'version_text': version_element.text or "",
'app_name': version_element.get('AppName', ''),
'app_major': version_element.get('AppMajor', ''),
'app_minor': version_element.get('AppMinor', '')
}
return version_info
def parse_study_info(self, study_info_element: ET.Element) -> StudyInfo:
"""Parse study information"""
study_info = StudyInfo()
for child in study_info_element:
field_name = child.tag.lower().replace('study', '').replace('patient', '')
if field_name.startswith('_'):
field_name = field_name[1:]
# Map XML tags to StudyInfo fields
field_mapping = {
'description': 'study_description',
'id': 'study_id',
'date': 'study_date',
'accessionnumber': 'accession_number',
'referringphysician': 'referring_physician',
'name': 'patient_name',
'patientid': 'patient_id',
'sex': 'patient_sex',
'weight': 'patient_weight',
'height': 'patient_height',
'birthdate': 'patient_birth_date'
}
if child.tag in ['StudyDescription', 'StudyId', 'StudyDate', 'AccessionNumber', 'ReferringPhysician']:
attr_name = field_mapping.get(child.tag.lower().replace('study', ''), child.tag.lower())
if hasattr(study_info, attr_name):
setattr(study_info, attr_name, child.text or "")
elif child.tag in ['PatientName', 'PatientId', 'PatientSex', 'PatientWeight', 'PatientHeight', 'PatientBirthDate']:
attr_name = field_mapping.get(child.tag.lower(), child.tag.lower())
if hasattr(study_info, attr_name):
setattr(study_info, attr_name, child.text or "")
return study_info
def parse_patient_information(self, patient_info_element: ET.Element) -> PatientInformation:
"""Parse patient information fields"""
patient_info = PatientInformation()
fields_element = patient_info_element.find('Fields')
if fields_element is not None:
for field in fields_element:
field_type = field.get('Type', '')
patient_info.fields[field.tag] = {
'value': field.text or "",
'type': field_type
}
return patient_info
def parse_reportable_value(self, reportable_value_element: ET.Element) -> ReportableValue:
"""Parse a reportable value element"""
return ReportableValue(
type=reportable_value_element.get('Type', ''),
id=reportable_value_element.get('Id', ''),
label=reportable_value_element.get('Label', ''),
prefix=reportable_value_element.get('Prefix', ''),
value=reportable_value_element.get('Value', ''),
unit=reportable_value_element.get('Unit', ''),
denominator=reportable_value_element.get('Denominator', ''),
precision=reportable_value_element.get('Precision', '')
)
def parse_measurement(self, measurement_element: ET.Element) -> Measurement:
"""Parse a measurement element"""
measurement = Measurement(
id=measurement_element.get('Id', ''),
report_id=measurement_element.get('ReportId', ''),
workflow=measurement_element.get('Workflow', ''),
label=measurement_element.get('Label', ''),
is_custom=measurement_element.get('IsCustom', '')
)
# Parse reportable value if present
reportable_value_element = measurement_element.find('ReportableValue')
if reportable_value_element is not None:
measurement.reportable_value = self.parse_reportable_value(reportable_value_element)
return measurement
def parse_report_option(self, report_option_element: ET.Element) -> ReportOption:
"""Parse a report option element"""
return ReportOption(
label=report_option_element.get('Label', ''),
value=report_option_element.get('Value', ''),
workflow=report_option_element.get('Workflow', '')
)
def parse_sections(self, sections_element: ET.Element) -> Dict[str, Any]:
"""Parse sections element"""
sections = {}
for section in sections_element:
section_name = section.tag
section_data = {
'is_visible': section.get('IsVisible', ''),
'fields': {}
}
fields_element = section.find('Fields')
if fields_element is not None:
for field in fields_element:
field_type = field.get('Type', '')
section_data['fields'][field.tag] = {
'value': field.text or "",
'type': field_type
}
sections[section_name] = section_data
return sections
def parse_models(self, models_element: ET.Element) -> Dict[str, Any]:
"""Parse models section which contains various medical device information"""
models = {}
for model in models_element:
if model.tag == 'StudyInfo':
continue # Already parsed separately
elif model.tag == 'PatientInformation':
continue # Already parsed separately
else:
# Parse other model types (AorticValve, Femoral, Subclavian, Carotids, etc.)
model_data = {'fields': {}, 'report_options': {}}
fields_element = model.find('Fields')
if fields_element is not None:
for field in fields_element:
field_type = field.get('Type', '')
model_data['fields'][field.tag] = {
'value': field.text or "",
'type': field_type
}
report_options_element = model.find('ReportOptions')
if report_options_element is not None:
for option in report_options_element:
model_data['report_options'][option.tag] = {
'value': option.text or "",
'attributes': option.attrib
}
models[model.tag] = model_data
return models
def parse_xml_file(self, filepath: str) -> CardioVascularReport:
"""Parse a single XML file and return structured data"""
logger.info(f"Parsing XML file: {filepath}")
# Parse filename
filename_info = self.parse_filename(filepath)
# Parse XML content
try:
tree = ET.parse(filepath)
root = tree.getroot()
except ET.ParseError as e:
logger.error(f"Error parsing XML file {filepath}: {e}")
raise
# Initialize report
report = CardioVascularReport(filename_info=filename_info)
# Parse each section
for element in root:
if element.tag == 'Version':
report.version = self.parse_version(element)
elif element.tag == 'Models':
# Parse StudyInfo
study_info_element = element.find('StudyInfo')
if study_info_element is not None:
report.study_info = self.parse_study_info(study_info_element)
# Parse PatientInformation
patient_info_element = element.find('PatientInformation')
if patient_info_element is not None:
report.patient_information = self.parse_patient_information(patient_info_element)
# Parse other models
models = self.parse_models(element)
report.custom_fields.update(models)
elif element.tag == 'Sections':
report.sections = self.parse_sections(element)
elif element.tag == 'Custom':
# Parse custom measurements
measurements_element = element.find('Measurements')
if measurements_element is not None:
for measurement_element in measurements_element:
measurement = self.parse_measurement(measurement_element)
report.measurements.append(measurement)
# Parse report options
report_options_element = element.find('ReportOptions')
if report_options_element is not None:
for option_element in report_options_element:
option = self.parse_report_option(option_element)
report.report_options.append(option)
elif element.tag == 'CustomFields':
# Parse any custom fields
for field in element:
report.custom_fields[field.tag] = {
'value': field.text or "",
'attributes': field.attrib
}
logger.info(f"Successfully parsed {filepath}")
return report
def parse_directory(self, directory_path: str) -> List[CardioVascularReport]:
"""Parse all XML files in a directory"""
directory = Path(directory_path)
xml_files = list(directory.glob('*.xml'))
if not xml_files:
logger.warning(f"No XML files found in {directory_path}")
return []
reports = []
for xml_file in xml_files:
try:
report = self.parse_xml_file(str(xml_file))
reports.append(report)
except Exception as e:
logger.error(f"Failed to parse {xml_file}: {e}")
return reports
def to_dict(self, report: CardioVascularReport) -> Dict[str, Any]:
"""Convert report to dictionary format"""
def convert_dataclass_to_dict(obj):
if hasattr(obj, '__dict__'):
result = {}
for key, value in obj.__dict__.items():
if isinstance(value, list):
result[key] = [convert_dataclass_to_dict(item) for item in value]
elif hasattr(value, '__dict__'):
result[key] = convert_dataclass_to_dict(value)
else:
result[key] = value
return result
else:
return obj
return convert_dataclass_to_dict(report)
class CardioVascularBatchProcessor:
"""
Batch processor for cardiovascular XML files that exports to XLSX format
matching the template structure
"""
def __init__(self):
self.parser = CardioVascularXMLParser()
# Define the exact column mapping from the template
self.excel_columns = [
'File Name',
'Patient Name',
'Patient ID',
'Visit Type',
'Study Date',
'Received Date',
'Review Date',
'Study Description',
'Patient Size',
'Patient Weight',
'Comments',
'Host Name',
'UpperDistanceA',
'LowerDistanceB',
'UpperDistanceC',
'LowerDistanceD',
'Prosthesis Waist Width',
'Inflow Width',
'NCS',
'LCS',
'THV size',
'Balloon PostDil Timing',
'Analysability(Stent Geometry)',
'Non-Analysability Reasons(Stent Geometry)',
'Analysability(Depth Implantation)',
'Frame of analysis',
'LAO/RAO',
'CAU/CRA',
'LAO/RAO Angle',
'CAU/CRA Angle',
'Crossover',
'Post Orientation',
'Middle Post Orientation'
]
# Define file types for cases sheet
self.file_types = [
'NoPostDil.imp',
'NoPostDil.2nd',
'BeforePostDil.imp',
'BeforePostDil.2nd',
'AfterPostDil.imp',
'AfterPostDil.2nd'
]
# Define measurement columns that repeat for each file type
self.measurement_columns = [
'Comments',
'Host Name',
'UpperDistanceA',
'LowerDistanceB',
'UpperDistanceC',
'LowerDistanceD',
'Prosthesis Waist Width',
'Inflow Width',
'NCS',
'LCS',
'THV size',
'Balloon PostDil Timing',
'Analysability(Stent Geometry)',
'Non-Analysability Reasons(Stent Geometry)',
'Analysability(Depth Implantation)',
'Frame of analysis',
'LAO/RAO',
'CAU/CRA',
'LAO/RAO Angle',
'CAU/CRA Angle',
'Crossover',
'Post Orientation',
'Middle Post Orientation'
]
# Build cases sheet columns
self.cases_columns = [
'Patient ID',
'Study Date',
'Received Date',
'Review Date',
'Study Description',
'Patient Size',
'Patient Weight'
]
# Add file type blocks with duplicated column names
for file_type in self.file_types:
self.cases_columns.append(file_type) # File type identifier column
self.cases_columns.extend(self.measurement_columns) # Duplicated measurement columns
def extract_patient_id(self, report: CardioVascularReport) -> str:
"""Extract and normalize patient ID from report"""
pid = report.study_info.patient_id.strip()
if " " in pid:
pid = pid.split(" ")[0].strip() # Use first part of ID as patient name if it contains spaces
if not pid:
pid = "UNKNOWN"
return pid
def extract_measurement_value(self, measurements: List[Measurement], measurement_id: str) -> Optional[float]:
"""Extract measurement value by ID"""
for measurement in measurements:
if measurement.id == measurement_id and measurement.reportable_value:
try:
return float(measurement.reportable_value.value)
except (ValueError, TypeError):
return None
return None
def extract_report_option_value(self, report_options: List[ReportOption], option_label: str) -> Optional[str]:
"""Extract report option value by label"""
for option in report_options:
if option.label == option_label:
return option.value
return None
def parse_study_date(self, date_string: str) -> Optional[datetime]:
"""Parse study date string to datetime"""
if not date_string:
return None
try:
# Try common date formats
date_formats = [
'%m/%d/%Y %I:%M %p', # 1/1/2024 12:01 PM
'%m/%d/%Y %H:%M', # 1/1/2024 12:01 (24-hour)
'%d/%m/%Y %I:%M %p', # 1/1/2024 12:01 PM (day/month)
'%d/%m/%Y %H:%M', # 1/1/2024 12:01 (day/month, 24-hour)
'%Y-%m-%d %H:%M:%S', # 2024-01-01 12:01:00
'%Y-%m-%d', # 2024-01-01
'%m/%d/%Y', # 1/1/2024
'%d/%m/%Y' # 1/1/2024 (day/month)
]
for fmt in date_formats:
try:
return datetime.strptime(date_string, fmt)
except ValueError:
continue
logger.warning(f"Could not parse date: {date_string}")
return None
except Exception as e:
logger.warning(f"Error parsing date {date_string}: {e}")
return None
def convert_report_to_excel_row(self, report: CardioVascularReport) -> Dict[str, Any]:
"""Convert a CardioVascularReport to an Excel row dictionary"""
# Extract patient information
patient_name = ""
patient_weight = None
patient_height = None
if report.patient_information.fields:
patient_name = report.patient_information.fields.get('PatientName', {}).get('value', '').strip()
try:
weight_str = report.patient_information.fields.get('PatientWeight', {}).get('value', '')
if weight_str:
patient_weight = float(weight_str)
except (ValueError, TypeError):
pass
try:
height_str = report.patient_information.fields.get('PatientHeight', {}).get('value', '')
if height_str:
patient_height = float(height_str)
except (ValueError, TypeError):
pass
# Extract measurements
prosthesis_waist_width = self.extract_measurement_value(report.measurements, 'ProsthesisWaistWidth')
inflow_width = self.extract_measurement_value(report.measurements, 'InflowWidth')
ncs = self.extract_measurement_value(report.measurements, 'NCS')
lcs = self.extract_measurement_value(report.measurements, 'LCS')
# Extract report options
thv_size = self.extract_report_option_value(report.report_options, 'THV size')
balloon_postdil_timing = self.extract_report_option_value(report.report_options, 'Balloon PostDil Timing')
analysability_stent = self.extract_report_option_value(report.report_options, 'Analysability(Stent Geometry)')
analysability_depth = self.extract_report_option_value(report.report_options, 'Analysability(Depth Implantation)')
frame_analysis = self.extract_report_option_value(report.report_options, 'Frame of analysis')
lao_rao = self.extract_report_option_value(report.report_options, 'LAO/RAO')
cau_cra = self.extract_report_option_value(report.report_options, 'CAU/CRA')
lao_rao_angle = self.extract_report_option_value(report.report_options, 'LAO/RAO Angle')
cau_cra_angle = self.extract_report_option_value(report.report_options, 'CAU/CRA Angle')
crossover = self.extract_report_option_value(report.report_options, 'Crossover')
post_orientation = self.extract_report_option_value(report.report_options, 'Post Orientation')
middle_post_orientation = self.extract_report_option_value(report.report_options, 'Middle Post Orientation')
# Convert numeric strings to float where applicable
def safe_float(value):
if value is None:
return None
try:
return float(value)
except (ValueError, TypeError):
return None
thv_size_float = safe_float(thv_size)
frame_analysis_float = safe_float(frame_analysis)
lao_rao_angle_float = safe_float(lao_rao_angle)
cau_cra_angle_float = safe_float(cau_cra_angle)
# Parse study date
study_date = self.parse_study_date(report.study_info.study_date)
# Create the row dictionary matching Excel template columns
pid = self.extract_patient_id(report)
row = {
'File Name': report.filename_info.original_filename,
'Patient Name': patient_name,
'Patient ID': pid,
'Visit Type': report.filename_info.visit_type,
'Study Date': study_date,
'Received Date': None, # Not available in XML
'Review Date': None, # Not available in XML
'Study Description': report.study_info.study_description,
'Patient Size': patient_height,
'Patient Weight': patient_weight,
'Comments': None, # Not available in XML
'Host Name': None, # Not available in XML
'UpperDistanceA': None, # Not available in XML
'LowerDistanceB': None, # Not available in XML
'UpperDistanceC': None, # Not available in XML
'LowerDistanceD': None, # Not available in XML
'Prosthesis Waist Width': prosthesis_waist_width,
'Inflow Width': inflow_width,
'NCS': ncs,
'LCS': lcs,
'THV size': thv_size_float,
'Balloon PostDil Timing': balloon_postdil_timing,
'Analysability(Stent Geometry)': analysability_stent,
'Non-Analysability Reasons(Stent Geometry)': None, # Not directly available
'Analysability(Depth Implantation)': analysability_depth,
'Frame of analysis': frame_analysis_float,
'LAO/RAO': lao_rao,
'CAU/CRA': cau_cra,
'LAO/RAO Angle': lao_rao_angle_float,
'CAU/CRA Angle': cau_cra_angle_float,
'Crossover': crossover,
'Post Orientation': post_orientation,
'Middle Post Orientation': middle_post_orientation
}
return row
def get_file_type_identifier(self, filename_info: FilenameInfo) -> str:
"""Get the file type identifier from filename info"""
return f"{filename_info.state}.{filename_info.visit_type}"
def group_reports_by_case(self, reports: List[CardioVascularReport]) -> Dict[str, List[CardioVascularReport]]:
"""Group reports by case ID (Patient ID)"""
cases = {}
for report in reports:
patient_id = self.extract_patient_id(report)
if patient_id not in cases:
cases[patient_id] = []
cases[patient_id].append(report)
return cases
def convert_case_to_excel_row(self, case_reports: List[CardioVascularReport]) -> List[Any]:
"""Convert a case (group of reports) to an Excel row list for cases sheet"""
# Use the first report for common case information
primary_report = case_reports[0]
# Extract common case information
patient_weight = None
patient_height = None
if primary_report.patient_information.fields:
try:
weight_str = primary_report.patient_information.fields.get('PatientWeight', {}).get('value', '')
if weight_str:
patient_weight = float(weight_str)
except (ValueError, TypeError):
pass
try:
height_str = primary_report.patient_information.fields.get('PatientHeight', {}).get('value', '')
if height_str:
patient_height = float(height_str)
except (ValueError, TypeError):
pass
# Parse study date
study_date = self.parse_study_date(primary_report.study_info.study_date)
# Start with common case info
row_values = [
self.extract_patient_id(primary_report), # Patient ID
study_date, # Study Date
None, # Received Date
None, # Review Date
primary_report.study_info.study_description, # Study Description
patient_height, # Patient Size
patient_weight # Patient Weight
]
# Create a mapping of file types to reports
file_type_reports = {}
for report in case_reports:
file_type = self.get_file_type_identifier(report.filename_info)
file_type_reports[file_type] = report
# Process each file type block
for file_type in self.file_types:
# Add file type identifier column
if file_type in file_type_reports:
row_values.append(file_type_reports[file_type].filename_info.original_filename)
else:
row_values.append(None)
# Add measurement columns for this file type
if file_type in file_type_reports:
report = file_type_reports[file_type]
# Extract measurements for this file
prosthesis_waist_width = self.extract_measurement_value(report.measurements, 'ProsthesisWaistWidth')
inflow_width = self.extract_measurement_value(report.measurements, 'InflowWidth')
ncs = self.extract_measurement_value(report.measurements, 'NCS')
lcs = self.extract_measurement_value(report.measurements, 'LCS')
# Extract report options
thv_size = self.extract_report_option_value(report.report_options, 'THV size')
balloon_postdil_timing = self.extract_report_option_value(report.report_options, 'Balloon PostDil Timing')
analysability_stent = self.extract_report_option_value(report.report_options, 'Analysability(Stent Geometry)')
analysability_depth = self.extract_report_option_value(report.report_options, 'Analysability(Depth Implantation)')
frame_analysis = self.extract_report_option_value(report.report_options, 'Frame of analysis')
lao_rao = self.extract_report_option_value(report.report_options, 'LAO/RAO')
cau_cra = self.extract_report_option_value(report.report_options, 'CAU/CRA')
lao_rao_angle = self.extract_report_option_value(report.report_options, 'LAO/RAO Angle')
cau_cra_angle = self.extract_report_option_value(report.report_options, 'CAU/CRA Angle')
crossover = self.extract_report_option_value(report.report_options, 'Crossover')
post_orientation = self.extract_report_option_value(report.report_options, 'Post Orientation')
middle_post_orientation = self.extract_report_option_value(report.report_options, 'Middle Post Orientation')
# Convert numeric strings to float where applicable
def safe_float(value):
if value is None:
return None
try:
return float(value)
except (ValueError, TypeError):
return None
thv_size_float = safe_float(thv_size)
frame_analysis_float = safe_float(frame_analysis)
lao_rao_angle_float = safe_float(lao_rao_angle)
cau_cra_angle_float = safe_float(cau_cra_angle)
# Add measurement values in the correct order
measurement_values = [
None, # Comments - Not available in XML
None, # Host Name - Not available in XML
None, # UpperDistanceA - Not available in XML
None, # LowerDistanceB - Not available in XML
None, # UpperDistanceC - Not available in XML
None, # LowerDistanceD - Not available in XML
prosthesis_waist_width,
inflow_width,
ncs,
lcs,
thv_size_float,
balloon_postdil_timing,
analysability_stent,
None, # Non-Analysability Reasons(Stent Geometry) - Not directly available
analysability_depth,
frame_analysis_float,
lao_rao,
cau_cra,
lao_rao_angle_float,
cau_cra_angle_float,
crossover,
post_orientation,
middle_post_orientation
]
row_values.extend(measurement_values)
else:
# File type not present - add empty values for all measurement columns
row_values.extend([None] * len(self.measurement_columns))
return row_values
def create_cases_dataframe(self, reports: List[CardioVascularReport]) -> pd.DataFrame:
"""Create cases DataFrame with duplicated column names handled carefully"""
# Group reports by case
cases = self.group_reports_by_case(reports)
if not cases:
logger.warning("No cases found")
return pd.DataFrame(columns=self.cases_columns)
# Convert cases to rows
rows_data = []
for patient_id, case_reports in cases.items():
try:
row_values = self.convert_case_to_excel_row(case_reports)
rows_data.append(row_values)
except Exception as e:
logger.error(f"Failed to convert case {patient_id}: {e}")
if not rows_data:
return pd.DataFrame(columns=self.cases_columns)
# Create DataFrame directly from rows data and column names
# This handles duplicated column names by using positional indexing
df = pd.DataFrame(rows_data, columns=self.cases_columns)
# Sort by Patient ID for consistent output
df = df.sort_values('Patient ID').reset_index(drop=True)
logger.info(f"Successfully created cases DataFrame with {len(df)} cases")
return df
def process_directory(self, input_directory: str, output_file: str = None) -> tuple[pd.DataFrame, pd.DataFrame]:
"""
Process all XML files in a directory and return/export as Excel format
Args:
input_directory: Path to directory containing XML files
output_file: Optional path to save Excel file
Returns:
tuple of (files_dataframe, cases_dataframe)
"""
logger.info(f"Processing XML files in directory: {input_directory}")
# Parse all XML files
reports = self.parser.parse_directory(input_directory)
if not reports:
logger.warning("No reports were successfully parsed")
empty_files_df = pd.DataFrame(columns=self.excel_columns)
empty_cases_df = pd.DataFrame()
return empty_files_df, empty_cases_df
# Create files DataFrame (existing functionality)
files_rows = []
for report in reports:
try:
row = self.convert_report_to_excel_row(report)
files_rows.append(row)
except Exception as e:
logger.error(f"Failed to convert report {report.filename_info.original_filename}: {e}")
# Create files DataFrame with exact column order
files_df = pd.DataFrame(files_rows, columns=self.excel_columns)
files_df = files_df.sort_values('File Name').reset_index(drop=True)
# Create cases DataFrame
cases_df = self.create_cases_dataframe(reports)
logger.info(f"Successfully processed {len(files_df)} file records and {len(cases_df)} case records")
# Export to Excel if output file specified
if output_file:
self.export_to_excel(files_df, cases_df, output_file)
return files_df, cases_df
def export_to_excel(self, files_df: pd.DataFrame, cases_df: pd.DataFrame, output_file: str):
"""Export both DataFrames to Excel with formatting"""
logger.info(f"Exporting to Excel: {output_file}")
try:
# Create Excel writer with formatting options
with pd.ExcelWriter(output_file, engine='openpyxl') as writer:
# Export files data sheet
files_df.to_excel(writer, sheet_name=FILES_DATA_SHEET, index=False)
# Export cases data sheet
cases_df.to_excel(writer, sheet_name=CASES_DATA_SHEET, index=False)
# Get workbook for formatting
workbook = writer.book
# Format FILES_DATA sheet
if FILES_DATA_SHEET in writer.sheets:
worksheet = writer.sheets[FILES_DATA_SHEET]
self._format_worksheet(worksheet)
# Format CASES_DATA sheet
if CASES_DATA_SHEET in writer.sheets:
worksheet = writer.sheets[CASES_DATA_SHEET]
self._format_worksheet(worksheet)
logger.info(f"Successfully exported {len(files_df)} file records and {len(cases_df)} case records to {output_file}")
except Exception as e:
logger.error(f"Failed to export to Excel: {e}")
raise
def _format_worksheet(self, worksheet):
"""Apply formatting to a worksheet"""
# Auto-adjust column widths
for column in worksheet.columns:
max_length = 0
column_letter = column[0].column_letter
for cell in column:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
# Set column width with some padding
adjusted_width = min(max_length + 2, 50) # Cap at 50 characters
worksheet.column_dimensions[column_letter].width = adjusted_width
def parse_cardiovascular_xml_files(directory_path: str = "data/templates") -> List[Dict[str, Any]]:
"""
Convenience function to parse all XML files in the given directory
and return them as dictionaries
"""
parser = CardioVascularXMLParser()
reports = parser.parse_directory(directory_path)
return [parser.to_dict(report) for report in reports]
def process_directory_to_excel(input_directory: str, output_file: str) -> tuple[pd.DataFrame, pd.DataFrame]:
"""
Convenience function to process all XML files in a directory and export to Excel
Args:
input_directory: Path to directory containing XML files
output_file: Path to save Excel file
Returns:
tuple of (files_dataframe, cases_dataframe)
"""
processor = CardioVascularBatchProcessor()
return processor.process_directory(input_directory, output_file)
if __name__ == "__main__":
## ====================================================================================
## CHANGE THIS TO YOUR INPUT DIRECTORY
## ====================================================================================
INPUT_DIRECTORY = Path(r"/Users/sameh/workspace/tibsystems/cardio_eggs/data/templates")
time_str = datetime.now().strftime("%Y%m%d_%H%M%S")
OUTPUT_FILENAME = f"{INPUT_DIRECTORY.name}_t{time_str}.xlsx"
files_df, cases_df = process_directory_to_excel(INPUT_DIRECTORY, OUTPUT_FILENAME)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment