Created
July 21, 2025 20:49
-
-
Save samehkamaleldin/28af8ab8393b7d70d4bd5eb29b6cefc0 to your computer and use it in GitHub Desktop.
Group case data
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import argparse | |
import re | |
import shutil | |
from pathlib import Path | |
from typing import Dict, List, Set | |
DATA_DIRPATH = r"PATH_TO_YOUR_DATA_DIRECTORY" # Replace with your actual data directory path | |
def extract_case_id(filename: str) -> str: | |
"""Extract case ID from filename. | |
Examples: | |
- 888-EAP-029_LVOT.csv -> 888-EAP-029 | |
- 888-EAP-029 _ CT_Baseline_CT 1.xml -> 888-EAP-029 | |
- 880-EAP-1 _ CT_Baseline_CT 1.xml -> 880-EAP-1 | |
""" | |
# Handle XML files with space pattern first | |
xml_match = re.match(r'^([^_\s]+(?:-[^_\s]+)*)\s+_', filename) | |
if xml_match: | |
return xml_match.group(1) | |
# Handle CSV and other files with underscore pattern | |
csv_match = re.match(r'^([^_]+)_', filename) | |
if csv_match: | |
return csv_match.group(1) | |
# Fallback: return filename without extension if no pattern matches | |
return Path(filename).stem | |
def scan_files(data_dir: Path) -> Dict[str, List[str]]: | |
"""Scan directory and group files by case ID.""" | |
case_files: Dict[str, List[str]] = {} | |
for file_path in data_dir.iterdir(): | |
if file_path.is_file() and file_path.suffix.lower() in {'.csv', '.xml'}: | |
case_id = extract_case_id(file_path.name) | |
if case_id not in case_files: | |
case_files[case_id] = [] | |
case_files[case_id].append(file_path.name) | |
return case_files | |
def create_case_directories(data_dir: Path, case_ids: Set[str], dry_run: bool = False) -> None: | |
"""Create directories for each case ID.""" | |
for case_id in case_ids: | |
case_dir = data_dir / case_id | |
if not case_dir.exists(): | |
if not dry_run: | |
case_dir.mkdir(parents=True, exist_ok=True) | |
print(f"Created directory: {case_dir}") | |
else: | |
print(f"Would create directory: {case_dir}") | |
def move_files(data_dir: Path, case_files: Dict[str, List[str]], dry_run: bool = False) -> None: | |
"""Move files to their respective case directories.""" | |
moved_count = 0 | |
error_count = 0 | |
for case_id, files in case_files.items(): | |
case_dir = data_dir / case_id | |
for filename in files: | |
source_path = data_dir / filename | |
target_path = case_dir / filename | |
try: | |
if source_path.exists() and not target_path.exists(): | |
if not dry_run: | |
shutil.move(str(source_path), str(target_path)) | |
moved_count += 1 | |
else: | |
print(f"Would move: {filename} -> {case_id}/") | |
moved_count += 1 | |
elif target_path.exists(): | |
print(f"Skipped (already exists): {filename}") | |
except Exception as e: | |
print(f"Error moving {filename}: {e}") | |
error_count += 1 | |
print(f"Summary: {moved_count} files {'would be ' if dry_run else ''}moved, {error_count} errors") | |
def display_summary(case_files: Dict[str, List[str]]) -> None: | |
"""Display a summary of cases and their files.""" | |
for case_id, files in sorted(case_files.items()): | |
file_types = set(Path(f).suffix.lower() for f in files) | |
print(f"{case_id}: {len(files)} files ({', '.join(sorted(file_types))})") | |
print(f"\nTotal cases: {len(case_files)}") | |
print(f"Total files: {sum(len(files) for files in case_files.values())}") | |
def main(): | |
"""Group case files into directories by case ID. | |
This script scans DATA_DIR for CSV and XML files, extracts case IDs from filenames, | |
creates directories for each case, and moves files into their respective case directories. | |
Examples: | |
- 888-EAP-029_LVOT.csv -> 888-EAP-029/888-EAP-029_LVOT.csv | |
- 888-EAP-029 _ CT_Baseline_CT 1.xml -> 888-EAP-029/888-EAP-029 _ CT_Baseline_CT 1.xml | |
""" | |
parser = argparse.ArgumentParser(description="Group case files into directories by case ID") | |
parser.add_argument('data_dir', nargs='?', default=DATA_DIRPATH, help='Data directory path') | |
parser.add_argument('--dry-run', '-n', action='store_true', help='Show what would be done without actually doing it') | |
parser.add_argument('--verbose', '-v', action='store_true', help='Enable verbose output') | |
args = parser.parse_args() | |
data_dir = Path(args.data_dir) | |
print(f"Scanning directory: {data_dir}") | |
if args.dry_run: | |
print("DRY RUN MODE - No files will be moved") | |
# Scan files and group by case ID | |
case_files = scan_files(data_dir) | |
if not case_files: | |
print("No CSV or XML files found in the directory") | |
return | |
# Display summary | |
if args.verbose or args.dry_run: | |
display_summary(case_files) | |
# Create directories | |
print(f"\nCreating directories for {len(case_files)} cases...") | |
create_case_directories(data_dir, set(case_files.keys()), args.dry_run) | |
# Move files | |
move_files(data_dir, case_files, args.dry_run) | |
if not args.dry_run: | |
print("Grouping completed successfully!") | |
else: | |
print("Dry run completed. Use without --dry-run to perform actual operations.") | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment