Last active
February 23, 2026 00:07
-
-
Save cesarandreu/8d593ac0e6f44c36e162d3a405d9c6a7 to your computer and use it in GitHub Desktop.
Split FLAC/WV/APE+CUE files into separate tracks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| split_music.py — Split FLAC albums stored as single files with cue sheets. | |
| Commands: | |
| scan [music-root] Discover and report FLAC+cue pairs; preview tracks | |
| split [music-root] Split audio into individual tracks | |
| clean [music-root] Delete original unsplit files after verification | |
| Default music root: /Music | |
| """ | |
| import sys | |
| import os | |
| import re | |
| import json | |
| import time | |
| import functools | |
| import subprocess | |
| from pathlib import Path | |
| from dataclasses import dataclass | |
| from datetime import datetime, timezone | |
| MUSIC_ROOT = "/Music" | |
| LOG_FILENAME = ".split-log.json" | |
| AUDIO_EXTENSIONS = {".flac", ".ape", ".wav", ".wv", ".tta", ".aiff", ".aif", ".m4a"} | |
| CUE_ENCODINGS = ["utf-8-sig", "utf-8", "latin-1", "cp1252"] | |
| # --------------------------------------------------------------------------- | |
| # Data structures | |
| # --------------------------------------------------------------------------- | |
| @dataclass | |
| class Track: | |
| number: int | |
| title: str | |
| artist: str | None | |
| start_sec: float | |
| @dataclass | |
| class CueSheet: | |
| album_title: str | None | |
| album_artist: str | None | |
| audio_file: Path | |
| tracks: list[Track] | |
| # --------------------------------------------------------------------------- | |
| # Cue parsing | |
| # --------------------------------------------------------------------------- | |
| def parse_cue(cue_path: Path, audio_file: Path) -> CueSheet: | |
| """ | |
| Parse a cue sheet and return a CueSheet. | |
| Raises ValueError with a clear message on: | |
| - Multiple FILE directives (multifile cue — not a split target) | |
| - FILE directive with path separators (Windows multifile cue) | |
| - Missing INDEX 01 for any track | |
| - Zero tracks parsed | |
| - All encodings failed | |
| """ | |
| content = None | |
| for encoding in CUE_ENCODINGS: | |
| try: | |
| content = cue_path.read_text(encoding=encoding) | |
| break | |
| except (UnicodeDecodeError, UnicodeError): | |
| continue | |
| except OSError as e: | |
| raise ValueError(f"cannot read file: {e}") from e | |
| if content is None: | |
| raise ValueError(f"cannot decode with any of {CUE_ENCODINGS}") | |
| album_title: str | None = None | |
| album_artist: str | None = None | |
| tracks: list[Track] = [] | |
| file_count = 0 | |
| cur_track_num: int | None = None | |
| cur_track_title: str | None = None | |
| cur_track_artist: str | None = None | |
| cur_track_index01: float | None = None | |
| in_track = False | |
| def flush_track() -> None: | |
| nonlocal cur_track_num, cur_track_title, cur_track_artist, cur_track_index01 | |
| if cur_track_num is not None: | |
| if cur_track_index01 is None: | |
| raise ValueError(f"track {cur_track_num} has no INDEX 01") | |
| tracks.append(Track( | |
| number=cur_track_num, | |
| title=cur_track_title or f"Track {cur_track_num}", | |
| artist=cur_track_artist, | |
| start_sec=cur_track_index01, | |
| )) | |
| cur_track_num = None | |
| cur_track_title = None | |
| cur_track_artist = None | |
| cur_track_index01 = None | |
| def parse_quoted(rest: str) -> str: | |
| rest = rest.strip() | |
| if rest.startswith('"'): | |
| end = rest.find('"', 1) | |
| return rest[1:end] if end != -1 else rest[1:] | |
| parts = rest.split() | |
| return parts[0] if parts else rest | |
| for line in content.splitlines(): | |
| stripped = line.strip() | |
| upper = stripped.upper() | |
| if upper.startswith("FILE "): | |
| file_count += 1 | |
| if file_count > 1: | |
| raise ValueError( | |
| "multiple FILE directives — multifile cue, not a split target" | |
| ) | |
| fname = parse_quoted(stripped[5:]) | |
| if "\\" in fname: | |
| raise ValueError( | |
| f"FILE directive contains backslash path ({fname!r}) — multifile cue" | |
| ) | |
| elif upper.startswith("TRACK ") and "AUDIO" in upper: | |
| flush_track() | |
| in_track = True | |
| parts = stripped.split() | |
| if len(parts) >= 2 and parts[1].isdigit(): | |
| cur_track_num = int(parts[1]) | |
| else: | |
| raise ValueError(f"cannot parse TRACK line: {stripped!r}") | |
| elif upper.startswith("TITLE "): | |
| title = parse_quoted(stripped[6:]) | |
| if in_track: | |
| cur_track_title = title | |
| else: | |
| album_title = title | |
| elif upper.startswith("PERFORMER "): | |
| performer = parse_quoted(stripped[10:]) | |
| if in_track: | |
| cur_track_artist = performer | |
| else: | |
| album_artist = performer | |
| elif upper.startswith("INDEX 01 "): | |
| time_str = stripped[9:].strip() | |
| parts = time_str.split(":") | |
| if len(parts) == 3: | |
| try: | |
| mm, ss, ff = int(parts[0]), int(parts[1]), int(parts[2]) | |
| cur_track_index01 = mm * 60 + ss + ff / 75.0 | |
| except ValueError: | |
| raise ValueError(f"cannot parse INDEX 01 time: {time_str!r}") | |
| else: | |
| raise ValueError(f"malformed INDEX 01: {stripped!r}") | |
| flush_track() | |
| if not tracks: | |
| raise ValueError("no tracks found in cue sheet") | |
| return CueSheet( | |
| album_title=album_title, | |
| album_artist=album_artist, | |
| audio_file=audio_file, | |
| tracks=tracks, | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Discovery | |
| # --------------------------------------------------------------------------- | |
| def find_claimed_audio(cue_path: Path) -> tuple[Path | None, str]: | |
| """ | |
| Determine the audio file claimed by a cue file. | |
| Returns (audio_path_or_None, reason_string). | |
| """ | |
| # Strategy 1: double-extension check (e.g. Album.ape.cue -> Album.ape) | |
| stem = cue_path.stem # strips .cue | |
| candidate = cue_path.parent / stem | |
| if candidate.suffix.lower() in AUDIO_EXTENSIONS: | |
| if candidate.exists(): | |
| return candidate, f"double-extension match ({candidate.name})" | |
| return None, f"double-extension expected {candidate.name!r} (not found)" | |
| # Strategy 2: FILE directive | |
| fname = _parse_file_directive(cue_path) | |
| if fname: | |
| if "\\" in fname: | |
| return None, f"FILE directive has backslash path {fname!r} (multifile cue)" | |
| candidate = cue_path.parent / fname | |
| if candidate.exists(): | |
| return candidate, f"FILE directive match ({candidate.name})" | |
| return None, f"FILE directive expected {fname!r} (not found)" | |
| return None, "no audio reference found in cue (no double-extension, no FILE directive)" | |
| def _parse_file_directive(cue_path: Path) -> str | None: | |
| """Parse the first FILE "..." line from a cue file.""" | |
| for encoding in CUE_ENCODINGS: | |
| try: | |
| with open(cue_path, encoding=encoding, errors="strict") as f: | |
| for line in f: | |
| stripped = line.strip() | |
| if stripped.upper().startswith("FILE "): | |
| rest = stripped[5:].lstrip() | |
| if rest.startswith('"'): | |
| end = rest.find('"', 1) | |
| return rest[1:end] if end != -1 else rest[1:] | |
| parts = rest.split() | |
| return parts[0] if parts else None | |
| break | |
| except (UnicodeDecodeError, UnicodeError): | |
| continue | |
| except OSError: | |
| return None | |
| return None | |
| def find_pairs(root: Path) -> tuple[list[dict], list[dict]]: | |
| """ | |
| Walk root recursively and find cue+audio pairs. | |
| Returns (matched, unmatched). | |
| matched items: {cue, audio} | |
| unmatched items: {cue, reason} | |
| """ | |
| matched = [] | |
| unmatched = [] | |
| for dirpath, dirnames, filenames in os.walk(root): | |
| dirnames.sort() | |
| dir_path = Path(dirpath) | |
| cue_files = [dir_path / f for f in filenames if f.lower().endswith(".cue")] | |
| for cue in sorted(cue_files): | |
| audio, reason = find_claimed_audio(cue) | |
| if audio is not None: | |
| matched.append({"cue": cue, "audio": audio}) | |
| else: | |
| unmatched.append({"cue": cue, "reason": reason}) | |
| return matched, unmatched | |
| # --------------------------------------------------------------------------- | |
| # Format detection | |
| # --------------------------------------------------------------------------- | |
| @functools.lru_cache(maxsize=None) | |
| def detect_m4a_codec(audio_file: Path) -> str: | |
| """ | |
| Return the audio codec name inside an M4A container (e.g. 'aac', 'alac'). | |
| Uses ffprobe; returns 'unknown' on failure. | |
| """ | |
| try: | |
| result = subprocess.run( | |
| [ | |
| "ffprobe", "-v", "quiet", | |
| "-select_streams", "a:0", | |
| "-show_entries", "stream=codec_name", | |
| "-of", "default=noprint_wrappers=1:nokey=1", | |
| str(audio_file), | |
| ], | |
| check=True, | |
| capture_output=True, | |
| text=True, | |
| ) | |
| return result.stdout.strip().lower() | |
| except subprocess.CalledProcessError: | |
| return "unknown" | |
| def get_output_ext(audio_file: Path) -> str: | |
| """ | |
| Return the output file extension for split tracks. | |
| - M4A with ALAC (lossless) → .flac | |
| - M4A with AAC or other lossy → .m4a (keep lossy in lossy container) | |
| - Everything else (FLAC, APE, WV, WAV, ...) → .flac (all lossless) | |
| """ | |
| if audio_file.suffix.lower() == ".m4a": | |
| codec = detect_m4a_codec(audio_file) | |
| return ".flac" if codec == "alac" else ".m4a" | |
| return ".flac" | |
| # --------------------------------------------------------------------------- | |
| # Filename helpers | |
| # --------------------------------------------------------------------------- | |
| def sanitize(s: str) -> str: | |
| """Remove filesystem-unsafe characters and clean up whitespace.""" | |
| s = re.sub(r'[/\\:*?"<>|]', "", s) | |
| s = re.sub(r'\s+', " ", s).strip() | |
| s = s.strip(".") | |
| return s or "untitled" | |
| def track_output_path(audio_file: Path, track: Track, ext: str = ".flac") -> Path: | |
| return audio_file.parent / f"{track.number:02d} - {sanitize(track.title)}{ext}" | |
| def all_tracks_exist(cs: CueSheet) -> bool: | |
| ext = get_output_ext(cs.audio_file) | |
| return all(track_output_path(cs.audio_file, t, ext).exists() for t in cs.tracks) | |
| # --------------------------------------------------------------------------- | |
| # Splitting | |
| # --------------------------------------------------------------------------- | |
| def split_album(cs: CueSheet) -> list[Path]: | |
| """ | |
| Split audio into individual tracks using ffmpeg. | |
| Returns list of output paths on success. | |
| Interruption safety: | |
| - Each track is written to a .part file then atomically renamed to its | |
| final path, so final files are always either complete or absent. | |
| - Stale .part files from a previous interrupted run are removed before | |
| each track starts. | |
| - Tracks whose final file already exists are skipped, so an interrupted | |
| album can be resumed without re-encoding completed tracks. | |
| - On CalledProcessError or any other exception (e.g. KeyboardInterrupt), | |
| the in-progress .part file is cleaned up before the exception propagates. | |
| """ | |
| ext = get_output_ext(cs.audio_file) | |
| # For lossy M4A (AAC), stream-copy to avoid re-encoding; | |
| # for everything else, encode to FLAC (all inputs are lossless). | |
| audio_codec = "copy" if ext == ".m4a" else "flac" | |
| output_paths: list[Path] = [] | |
| written_this_run: list[Path] = [] # only tracks encoded in this invocation | |
| tracks = cs.tracks | |
| # APE's demuxer has imprecise input-seek support; use output seeking | |
| # (slower but always exact). All other formats handle input seeking well. | |
| ape_input = cs.audio_file.suffix.lower() == ".ape" | |
| for i, track in enumerate(tracks): | |
| final_path = track_output_path(cs.audio_file, track, ext) | |
| part_path = final_path.with_name(final_path.name + ".part") | |
| output_paths.append(final_path) | |
| # Remove any stale .part file left by a previous interrupted run. | |
| if part_path.exists(): | |
| part_path.unlink() | |
| # Track already fully written — skip re-encoding. | |
| if final_path.exists(): | |
| continue | |
| start = track.start_sec | |
| is_last = (i == len(tracks) - 1) | |
| if ape_input: | |
| # Output seeking: decode from start to exact position (slow but exact). | |
| cmd = ["ffmpeg", "-y", "-i", str(cs.audio_file), "-ss", str(start)] | |
| else: | |
| # Input seeking: seek to nearest keyframe then decode (fast, exact | |
| # for FLAC/WV/WAV/M4A which have reliable seek tables). | |
| cmd = ["ffmpeg", "-y", "-ss", str(start), "-i", str(cs.audio_file)] | |
| if not is_last: | |
| duration = tracks[i + 1].start_sec - start | |
| cmd += ["-t", str(duration)] | |
| artist = track.artist or cs.album_artist or "" | |
| # Explicitly set the container format so ffmpeg doesn't try to infer | |
| # it from the .part extension (which it can't recognise). | |
| fmt = "mp4" if ext == ".m4a" else "flac" | |
| cmd += ["-f", fmt, "-c:a", audio_codec] | |
| cmd += ["-metadata", f"title={track.title}"] | |
| cmd += ["-metadata", f"tracknumber={track.number}"] | |
| if artist: | |
| cmd += ["-metadata", f"artist={artist}"] | |
| if cs.album_title: | |
| cmd += ["-metadata", f"album={cs.album_title}"] | |
| if cs.album_artist: | |
| cmd += ["-metadata", f"album_artist={cs.album_artist}"] | |
| cmd.append(str(part_path)) # write to staging file | |
| try: | |
| subprocess.run(cmd, check=True, capture_output=True) | |
| except subprocess.CalledProcessError as e: | |
| if part_path.exists(): | |
| part_path.unlink() | |
| # Roll back only tracks written in this run — pre-existing tracks | |
| # from a previous interrupted run are valid and left intact so the | |
| # next run can resume without re-encoding them. | |
| for p in written_this_run: | |
| try: | |
| p.unlink() | |
| except OSError: | |
| pass | |
| stderr_tail = e.stderr.decode(errors="replace")[-500:] | |
| raise RuntimeError( | |
| f"ffmpeg failed on track {track.number} of {cs.audio_file.name}:\n{stderr_tail}" | |
| ) from e | |
| except BaseException: | |
| # KeyboardInterrupt, SIGTERM, etc. — clean up .part and re-raise. | |
| # Pre-existing and already-written tracks are intentionally kept so | |
| # the next run can resume from this point. | |
| if part_path.exists(): | |
| part_path.unlink() | |
| raise | |
| # Atomically promote staging file to final path. | |
| part_path.rename(final_path) | |
| written_this_run.append(final_path) | |
| return output_paths | |
| def split_cue_path(cue_path: Path) -> Path: | |
| """Return the staging path for a new multifile cue: name.split.cue""" | |
| return cue_path.with_suffix(".split.cue") | |
| def write_multifile_cue(cue_path: Path, cs: CueSheet, output_paths: list[Path]) -> Path: | |
| """ | |
| Write a multifile cue alongside the original (as name.split.cue), leaving | |
| the original cue untouched. Returns the path of the written file. | |
| Encoded as UTF-8 with BOM for broad player compatibility. | |
| """ | |
| lines = [] | |
| if cs.album_artist: | |
| lines.append(f'PERFORMER "{cs.album_artist}"') | |
| if cs.album_title: | |
| lines.append(f'TITLE "{cs.album_title}"') | |
| for track, out_path in zip(cs.tracks, output_paths): | |
| file_type = "MP4" if out_path.suffix.lower() == ".m4a" else "WAVE" | |
| lines.append(f'FILE "{out_path.name}" {file_type}') | |
| lines.append(f' TRACK {track.number:02d} AUDIO') | |
| lines.append(f' TITLE "{track.title}"') | |
| artist = track.artist or cs.album_artist | |
| if artist: | |
| lines.append(f' PERFORMER "{artist}"') | |
| lines.append(f' INDEX 01 00:00:00') | |
| out = split_cue_path(cue_path) | |
| out.write_text("\n".join(lines) + "\n", encoding="utf-8-sig") | |
| return out | |
| # --------------------------------------------------------------------------- | |
| # Log | |
| # --------------------------------------------------------------------------- | |
| def log_path(root: Path) -> Path: | |
| return root / LOG_FILENAME | |
| def read_log(root: Path) -> list[dict]: | |
| lp = log_path(root) | |
| if not lp.exists(): | |
| return [] | |
| try: | |
| return json.loads(lp.read_text()) | |
| except (json.JSONDecodeError, OSError) as e: | |
| print(f"WARNING: cannot read log {lp}: {e}", file=sys.stderr) | |
| return [] | |
| def append_log(root: Path, entry: dict) -> None: | |
| entries = read_log(root) | |
| entries.append(entry) | |
| # Write to a temp file then rename so a kill/disk-full during the write | |
| # never corrupts or truncates the existing log. | |
| lp = log_path(root) | |
| tmp = lp.with_suffix(".tmp") | |
| tmp.write_text(json.dumps(entries, indent=2)) | |
| tmp.rename(lp) | |
| # --------------------------------------------------------------------------- | |
| # CLI helpers | |
| # --------------------------------------------------------------------------- | |
| def format_rel(path: Path, root: Path) -> str: | |
| try: | |
| return str(path.relative_to(root)) | |
| except ValueError: | |
| return str(path) | |
| def format_duration(seconds: float) -> str: | |
| """Format a cue-sheet timestamp as M:SS.FF.""" | |
| m = int(seconds // 60) | |
| s = seconds % 60 | |
| return f"{m}:{s:05.2f}" | |
| # Terminal colour support — disabled automatically when stdout is not a TTY | |
| # (e.g. when the user redirects output to a log file). | |
| _COLOR = sys.stdout.isatty() | |
| def _c(text: str, *styles: str) -> str: | |
| """Wrap text in ANSI styles if the terminal supports colour.""" | |
| if not _COLOR: | |
| return text | |
| _codes = {"bold": "1", "dim": "2", "red": "31", "green": "32", | |
| "yellow": "33", "cyan": "36"} | |
| prefix = "".join(f"\033[{_codes[s]}m" for s in styles) | |
| return f"{prefix}{text}\033[0m" | |
| def fmt_elapsed(seconds: float) -> str: | |
| """Format elapsed time as H:MM:SS or M:SS.""" | |
| s = int(seconds) | |
| h, s = divmod(s, 3600) | |
| m, s = divmod(s, 60) | |
| return f"{h}:{m:02d}:{s:02d}" if h else f"{m}:{s:02d}" | |
| def fmt_eta(seconds: float) -> str: | |
| """Format an ETA duration in a human-friendly way.""" | |
| s = int(seconds) | |
| h, s = divmod(s, 3600) | |
| m, s = divmod(s, 60) | |
| if h: | |
| return f"~{h}h {m:02d}m" | |
| if m: | |
| return f"~{m}m {s:02d}s" | |
| return f"~{s}s" | |
| # --------------------------------------------------------------------------- | |
| # Commands | |
| # --------------------------------------------------------------------------- | |
| def cmd_scan(root: Path) -> None: | |
| matched, unmatched = find_pairs(root) | |
| to_split: list[CueSheet] = [] | |
| already_done: list[CueSheet] = [] | |
| multifile: list[Path] = [] | |
| parse_errors: list[tuple[Path, str]] = [] | |
| for pair in matched: | |
| try: | |
| cs = parse_cue(pair["cue"], pair["audio"]) | |
| except ValueError as e: | |
| reason = str(e) | |
| if "multifile cue" in reason: | |
| multifile.append(pair["cue"]) | |
| else: | |
| parse_errors.append((pair["cue"], reason)) | |
| continue | |
| if all_tracks_exist(cs): | |
| already_done.append(cs) | |
| else: | |
| to_split.append(cs) | |
| if to_split: | |
| print(f"TO SPLIT ({len(to_split)} albums):") | |
| print() | |
| for cs in to_split: | |
| rel = format_rel(cs.audio_file, root) | |
| parts = [p for p in [cs.album_artist, cs.album_title] if p] | |
| header = " \u2014 ".join(parts) if parts else "(unknown)" | |
| print(f" {rel}") | |
| print(f" {header}") | |
| for i, t in enumerate(cs.tracks): | |
| if i + 1 < len(cs.tracks): | |
| dur = cs.tracks[i + 1].start_sec - t.start_sec | |
| dur_str = format_duration(dur) | |
| else: | |
| dur_str = "?:??" | |
| print(f" {t.number:02d}. {t.title} [{dur_str}]") | |
| print() | |
| if parse_errors: | |
| print(f"PARSE ERRORS ({len(parse_errors)}):") | |
| for cue_path, reason in parse_errors: | |
| print(f" ERROR: {format_rel(cue_path, root)}: {reason}") | |
| print() | |
| if unmatched: | |
| print(f"UNMATCHED CUES ({len(unmatched)}):") | |
| for item in unmatched: | |
| print(f" {format_rel(item['cue'], root)}: {item['reason']}") | |
| print() | |
| print("SUMMARY:") | |
| print(f" To split: {len(to_split)}") | |
| print(f" Already done: {len(already_done)}") | |
| print(f" Multifile cues: {len(multifile)} (per-track files, no action needed)") | |
| print(f" Parse errors: {len(parse_errors)}") | |
| print(f" Unmatched cues: {len(unmatched)}") | |
| def cmd_split(root: Path) -> None: | |
| matched, unmatched = find_pairs(root) | |
| # Parse all albums upfront, keeping (cs, cue_path) tuples | |
| albums: list[tuple[CueSheet, Path]] = [] | |
| multifile_count = 0 | |
| for pair in matched: | |
| try: | |
| cs = parse_cue(pair["cue"], pair["audio"]) | |
| albums.append((cs, pair["cue"])) | |
| except ValueError as e: | |
| reason = str(e) | |
| if "multifile cue" in reason: | |
| multifile_count += 1 # already individual tracks, skip silently | |
| else: | |
| print(f"{_c('ERROR', 'red', 'bold')} (parse): {format_rel(pair['cue'], root)}: {reason}") | |
| total = len(albums) | |
| ok_count = skip_count = error_count = 0 | |
| wall_start = time.monotonic() | |
| # Width of the index field, e.g. 3 for 999 albums → "[ 1/999]" | |
| w = len(str(total)) | |
| indent = " " * (w * 2 + 4) # aligns result lines under the album path | |
| try: | |
| for idx, (cs, cue_path) in enumerate(albums, 1): | |
| rel = format_rel(cs.audio_file, root) | |
| prefix = _c(f"[{idx:{w}}/{total}]", "cyan") | |
| print(f"{prefix} {rel}") | |
| if all_tracks_exist(cs): | |
| print(f"{indent}{_c('SKIP', 'dim')}") | |
| skip_count += 1 | |
| continue | |
| album_start = time.monotonic() | |
| try: | |
| output_paths = split_album(cs) | |
| except RuntimeError as e: | |
| # Indent every line of the (possibly multi-line) error message. | |
| for line in str(e).splitlines(): | |
| print(f"{indent}{_c('ERROR', 'red', 'bold')} {line}") | |
| error_count += 1 | |
| continue | |
| album_secs = time.monotonic() - album_start | |
| total_secs = time.monotonic() - wall_start | |
| split_cue = write_multifile_cue(cue_path, cs, output_paths) | |
| ok_count += 1 | |
| # ETA: average time per album processed so far × albums remaining. | |
| avg = total_secs / idx | |
| eta = _c(f"ETA {fmt_eta(avg * (total - idx))}", "dim") if idx < total else "" | |
| meta = _c(f"{len(cs.tracks)} tracks {album_secs:.1f}s elapsed {fmt_elapsed(total_secs)}", "dim") | |
| print(f"{indent}{_c('OK', 'green', 'bold')} {meta} {eta}".rstrip()) | |
| entry = { | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "original": str(cs.audio_file), | |
| "cue": str(cue_path), | |
| "split_cue": str(split_cue), | |
| "tracks": [str(p) for p in output_paths], | |
| } | |
| append_log(root, entry) | |
| except KeyboardInterrupt: | |
| total_secs = time.monotonic() - wall_start | |
| print(f"\n{_c('Interrupted', 'yellow')} after {fmt_elapsed(total_secs)} " | |
| f"({ok_count} complete albums).", file=sys.stderr) | |
| print("Safe to resume — run split again to continue from where it left off.", | |
| file=sys.stderr) | |
| sys.exit(130) | |
| total_secs = time.monotonic() - wall_start | |
| ok_str = _c(str(ok_count), "green", "bold") if ok_count else "0" | |
| err_str = _c(str(error_count), "red", "bold") if error_count else "0" | |
| print() | |
| print(f"Done in {fmt_elapsed(total_secs)}: " | |
| f"{ok_str} split, {skip_count} skipped, {err_str} errors, " | |
| f"{multifile_count} multifile cues ignored") | |
| if unmatched: | |
| print(f" ({len(unmatched)} unmatched cues not processed)") | |
| def cmd_clean(root: Path) -> None: | |
| entries = read_log(root) | |
| if not entries: | |
| print("No entries in split log.") | |
| return | |
| deleted = skipped = already_gone = 0 | |
| for entry in entries: | |
| original = Path(entry["original"]) | |
| orig_cue = Path(entry["cue"]) | |
| split_cue = Path(entry["split_cue"]) if "split_cue" in entry else None | |
| track_paths = [Path(p) for p in entry.get("tracks", [])] | |
| if not original.exists(): | |
| already_gone += 1 | |
| continue | |
| missing = [p for p in track_paths if not p.exists()] | |
| if missing: | |
| print(f"SKIP {format_rel(original, root)}: missing tracks:") | |
| for m in missing: | |
| print(f" {format_rel(m, root)}") | |
| skipped += 1 | |
| continue | |
| original.unlink() | |
| # Finalise the cue swap: atomically replace the original single-file | |
| # cue with the new multifile cue. POSIX rename replaces the destination | |
| # in one syscall — there is no window where neither cue exists. | |
| if split_cue and split_cue.exists(): | |
| split_cue.rename(orig_cue) | |
| print(f"Deleted: {format_rel(original, root)}") | |
| deleted += 1 | |
| print() | |
| print( | |
| f"Done: {deleted} deleted, {skipped} skipped (missing tracks), " | |
| f"{already_gone} already gone" | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Space estimate | |
| # --------------------------------------------------------------------------- | |
| def fmt_size(n: float) -> str: | |
| for unit in ["B", "KB", "MB", "GB", "TB"]: | |
| if n < 1024: | |
| return f"{n:.1f} {unit}" | |
| n /= 1024 | |
| return f"{n:.1f} PB" | |
| def cmd_estimate(root: Path) -> None: | |
| matched, _ = find_pairs(root) | |
| # Formats that compress more than FLAC — output will likely be larger than input. | |
| UNDERESTIMATED_FORMATS = {".ape", ".wv", ".tta"} | |
| by_ext: dict[str, list[CueSheet]] = {} | |
| already_done = 0 | |
| multifile_count = 0 | |
| parse_error_count = 0 | |
| for pair in matched: | |
| try: | |
| cs = parse_cue(pair["cue"], pair["audio"]) | |
| except ValueError as e: | |
| if "multifile cue" in str(e): | |
| multifile_count += 1 | |
| else: | |
| parse_error_count += 1 | |
| continue | |
| if all_tracks_exist(cs): | |
| already_done += 1 | |
| continue | |
| ext = cs.audio_file.suffix.lower() | |
| by_ext.setdefault(ext, []).append(cs) | |
| to_split = [cs for albums in by_ext.values() for cs in albums] | |
| if not to_split: | |
| print("Nothing left to split.") | |
| return | |
| total_input = sum(cs.audio_file.stat().st_size for cs in to_split) | |
| stat = os.statvfs(root) | |
| available = stat.f_bavail * stat.f_frsize | |
| has_underestimated = any(ext in UNDERESTIMATED_FORMATS for ext in by_ext) | |
| print("SPACE ESTIMATE") | |
| print(f" Albums to split: {len(to_split)}") | |
| print(f" Already done (skipped): {already_done}") | |
| print() | |
| print(" By format:") | |
| for ext in sorted(by_ext): | |
| albums = by_ext[ext] | |
| size = sum(cs.audio_file.stat().st_size for cs in albums) | |
| note = " (* output likely larger)" if ext in UNDERESTIMATED_FORMATS else "" | |
| print(f" {ext:6s} {len(albums):4d} albums {fmt_size(size)}{note}") | |
| print() | |
| print(f" Archive files (to split): {fmt_size(total_input)}") | |
| approx = "~" if has_underestimated else " " | |
| print(f" Estimated split output: {approx}{fmt_size(total_input)}") | |
| print(f" {'─' * 42}") | |
| print(f" Peak additional needed: {approx}{fmt_size(total_input)} (before clean)") | |
| print(f" Freed after clean: {approx}{fmt_size(total_input)} (archives removed)") | |
| print() | |
| print(f" Available on disk: {fmt_size(available)}") | |
| if available >= total_input: | |
| print(f" Sufficient space: YES") | |
| else: | |
| shortfall = total_input - available | |
| print(f" Sufficient space: NO — need ~{fmt_size(shortfall)} more") | |
| if has_underestimated: | |
| print() | |
| print(" * APE/WV/TTA compress more aggressively than FLAC, so actual") | |
| print(" output may be larger than the input size shown above.") | |
| # --------------------------------------------------------------------------- | |
| # Main | |
| # --------------------------------------------------------------------------- | |
| USAGE = """\ | |
| Usage: split_music.py <command> [music-root] | |
| Commands: | |
| scan [music-root] Show what would be split (dry run) | |
| estimate [music-root] Estimate disk space needed for split | |
| split [music-root] Split albums into individual tracks | |
| clean [music-root] Delete originals for successfully split albums | |
| Default music root: /Music | |
| """ | |
| def main() -> None: | |
| args = sys.argv[1:] | |
| if not args or args[0] in ("-h", "--help"): | |
| print(USAGE) | |
| sys.exit(0 if args else 1) | |
| command = args[0] | |
| if command not in ("scan", "estimate", "split", "clean"): | |
| print(f"Unknown command: {command!r}\n", file=sys.stderr) | |
| print(USAGE, file=sys.stderr) | |
| sys.exit(1) | |
| if len(args) > 2: | |
| print(f"Usage: split_music.py {command} [music-root]", file=sys.stderr) | |
| sys.exit(1) | |
| root = Path(args[1] if len(args) == 2 else MUSIC_ROOT).resolve() | |
| if not root.is_dir(): | |
| print(f"Error: {root!r} is not a directory", file=sys.stderr) | |
| sys.exit(1) | |
| if command == "scan": | |
| cmd_scan(root) | |
| elif command == "estimate": | |
| cmd_estimate(root) | |
| elif command == "split": | |
| cmd_split(root) | |
| elif command == "clean": | |
| cmd_clean(root) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment