Created
June 9, 2026 23:27
-
-
Save BenMcLean/bcd843eb9ca96c056157b0fe095c752b to your computer and use it in GitHub Desktop.
Extract 5.1 DTS streams from Audio DVDs (not DVD-Audio, not Blu-Ray, just Audio DVDs) with ffmpeg and mkvtoolnix
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| @ECHO OFF | |
| cd /d "%~dp0" | |
| python.exe "%~dpn0.py" %* | |
| @PAUSE |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| import sys | |
| import re | |
| import subprocess | |
| import json | |
| import argparse | |
| MASTER_MKV = "master_linear.mkv" | |
| # Programs shorter than this are disc-navigation dummies, not real audio tracks | |
| MIN_TRACK_SECONDS = 5.0 | |
| def run_command(cmd, error_msg): | |
| """Executes a system command and hard crashes on any error code.""" | |
| result = subprocess.run( | |
| cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True | |
| ) | |
| if result.returncode != 0: | |
| if os.path.exists(MASTER_MKV): | |
| os.remove(MASTER_MKV) | |
| print(f"\n[ERROR] {error_msg}", file=sys.stderr) | |
| print(f"CLI Error Output:\n{result.stderr}", file=sys.stderr) | |
| sys.exit(result.returncode) | |
| return result.stdout | |
| def bcd_byte_to_int(bcd_byte): | |
| return ((bcd_byte >> 4) * 10) + (bcd_byte & 0x0F) | |
| def read_ifo_time_span(time_bytes): | |
| """ | |
| Parse a 4-byte DVD BCD time value (H:M:S:F) into (total_seconds, is_ntsc). | |
| fps_mask 0x01 = 25fps PAL, 0x03 = 29.97fps NTSC. | |
| Returns (0.0, True) when fps bits are invalid (0x00 or 0x02). | |
| DVD NTSC timecode uses 30fps nominal integer counting: H:M:S:F where the | |
| "S" field means "30 nominal frames" not a wall-clock second. The correct | |
| conversion is total_nominal_frames / (30000/1001). Treating the stored | |
| seconds as wall-clock seconds introduces a 0.1% error that accumulates to | |
| several seconds over an album-length stream. For PAL, nominal fps equals | |
| real fps (25), so both formulas are identical. | |
| """ | |
| frame_byte = time_bytes[3] | |
| fps_mask = frame_byte >> 6 | |
| if (fps_mask & 0x01) != 1: | |
| return 0.0, True | |
| is_ntsc = fps_mask == 0x03 | |
| fps_nominal = 30 if is_ntsc else 25 | |
| fps_real = 30000 / 1001 if is_ntsc else 25.0 | |
| frames = bcd_byte_to_int(frame_byte & 0x3F) | |
| hours = bcd_byte_to_int(time_bytes[0]) | |
| minutes = bcd_byte_to_int(time_bytes[1]) | |
| seconds = bcd_byte_to_int(time_bytes[2]) | |
| total_frames = (hours * 3600 + minutes * 60 + seconds) * fps_nominal + frames | |
| return total_frames / fps_real, is_ntsc | |
| def seconds_to_timestamp(total_seconds): | |
| hrs = int(total_seconds // 3600) | |
| mins = int((total_seconds % 3600) // 60) | |
| secs = total_seconds % 60 | |
| return f"{hrs:02d}:{mins:02d}:{secs:06.3f}" | |
| def _read_pgc_programs(ifo_data, program_chain): | |
| """ | |
| Parse programs in one PGC. Returns (real_programs, total_frames, is_ntsc). | |
| real_programs excludes programs shorter than MIN_TRACK_FRAMES (disc-nav dummies). | |
| Returns (0, 0, True) if the chain index is out of range. | |
| """ | |
| pcgit_pos = int.from_bytes(ifo_data[0xCC:0xD0], "big") * 0x800 | |
| num_pgcs = int.from_bytes(ifo_data[pcgit_pos : pcgit_pos + 2], "big") | |
| if program_chain < 1 or program_chain > num_pgcs: | |
| return 0, 0, True | |
| entry_base = pcgit_pos + 8 * program_chain | |
| chain_offset = int.from_bytes(ifo_data[entry_base + 4 : entry_base + 8], "big") | |
| pgc_base = pcgit_pos + chain_offset | |
| num_programs = ifo_data[pgc_base + 2] | |
| if num_programs == 0: | |
| return 0, 0, True | |
| program_map_offset = int.from_bytes( | |
| ifo_data[pgc_base + 0xE6 : pgc_base + 0xE8], "big" | |
| ) | |
| cell_table_offset = int.from_bytes( | |
| ifo_data[pgc_base + 0xE8 : pgc_base + 0xEA], "big" | |
| ) | |
| real_programs = 0 | |
| total_seconds = 0.0 | |
| is_ntsc = True | |
| for current_program in range(num_programs): | |
| entry_cell = ifo_data[pgc_base + program_map_offset + current_program] | |
| if current_program < num_programs - 1: | |
| exit_cell = ( | |
| ifo_data[pgc_base + program_map_offset + current_program + 1] - 1 | |
| ) | |
| else: | |
| exit_cell = entry_cell | |
| program_seconds = 0.0 | |
| for current_cell in range(entry_cell, exit_cell + 1): | |
| cell_start = cell_table_offset + (current_cell - 1) * 0x18 | |
| cell_type = ifo_data[pgc_base + cell_start] >> 6 | |
| if cell_type == 0x00 or cell_type == 0x01: | |
| tb = ifo_data[pgc_base + cell_start + 4 : pgc_base + cell_start + 8] | |
| secs, is_ntsc = read_ifo_time_span(tb) | |
| program_seconds += secs | |
| if program_seconds >= MIN_TRACK_SECONDS: | |
| real_programs += 1 | |
| total_seconds += program_seconds | |
| return real_programs, total_seconds, is_ntsc | |
| def _find_best_pgc(ifo_data): | |
| """ | |
| Scan all PGCs in an IFO and return the one with the most real programs. | |
| Returns (pgc_num, real_programs, total_seconds). | |
| """ | |
| pcgit_pos = int.from_bytes(ifo_data[0xCC:0xD0], "big") * 0x800 | |
| num_pgcs = int.from_bytes(ifo_data[pcgit_pos : pcgit_pos + 2], "big") | |
| best_pgc, best_real, best_secs = 1, 0, 0.0 | |
| for pgc in range(1, num_pgcs + 1): | |
| real, secs, _ = _read_pgc_programs(ifo_data, pgc) | |
| if (real, secs) > (best_real, best_secs): | |
| best_pgc, best_real, best_secs = pgc, real, secs | |
| return best_pgc, best_real, best_secs | |
| def find_disc_layout(): | |
| """ | |
| Scans all VTS IFO+VOB pairs in the current directory (and VIDEO_TS/). | |
| For every title set with at least one real audio program, selects the best | |
| PGC (the one with the most programs) and adds it to the output list. | |
| Returns [(ifo_path, best_pgc, vob_files), ...] sorted by VTS number. | |
| No attempt is made to guess which title sets belong to the album — the | |
| caller extracts all of them and skips any that lack the chosen codec. | |
| """ | |
| search_dirs = ["."] | |
| if os.path.isdir("VIDEO_TS"): | |
| search_dirs.append("VIDEO_TS") | |
| # vts_entries[vts_num] = (real_progs, total_secs, best_pgc, ifo_path, vob_files) | |
| vts_entries = {} | |
| for d in search_dirs: | |
| for f in sorted(os.listdir(d)): | |
| m = re.match(r"^(VTS_(\d+))_0\.IFO$", f, re.IGNORECASE) | |
| if not m: | |
| continue | |
| vts_num = int(m.group(2)) | |
| vts_base = m.group(1) | |
| ifo_path = os.path.join(d, f) | |
| vob_files = sorted( | |
| os.path.join(d, v) | |
| for v in os.listdir(d) | |
| if re.match( | |
| r"^" + re.escape(vts_base) + r"_[1-9]\d*\.VOB$", v, re.IGNORECASE | |
| ) | |
| ) | |
| if not vob_files: | |
| continue | |
| try: | |
| ifo_data = open(ifo_path, "rb").read() | |
| except OSError: | |
| continue | |
| if ifo_data[0:12] != b"DVDVIDEO-VTS": | |
| continue | |
| best_pgc, real_progs, total_secs = _find_best_pgc(ifo_data) | |
| if real_progs < 1: | |
| continue | |
| existing = vts_entries.get(vts_num) | |
| if existing is None or (real_progs, total_secs) > ( | |
| existing[0], | |
| existing[1], | |
| ): | |
| vts_entries[vts_num] = ( | |
| real_progs, | |
| total_secs, | |
| best_pgc, | |
| ifo_path, | |
| vob_files, | |
| ) | |
| if not vts_entries: | |
| print( | |
| "[ERROR] No valid IFO/VOB pairs found in current directory or VIDEO_TS/.", | |
| file=sys.stderr, | |
| ) | |
| sys.exit(1) | |
| total_tracks = sum(e[0] for e in vts_entries.values()) | |
| if len(vts_entries) == 1: | |
| e = next(iter(vts_entries.values())) | |
| print( | |
| f"-> Found 1 title set: {e[0]} track(s) in {os.path.basename(e[3])} (PGC {e[2]})" | |
| ) | |
| else: | |
| desc = ", ".join( | |
| f"{vts_entries[n][0]} in {os.path.basename(vts_entries[n][3])} (PGC {vts_entries[n][2]})" | |
| for n in sorted(vts_entries) | |
| ) | |
| print( | |
| f"-> Found {len(vts_entries)} title sets, {total_tracks} total tracks: {desc}" | |
| ) | |
| return [ | |
| (vts_entries[n][3], vts_entries[n][2], vts_entries[n][4]) | |
| for n in sorted(vts_entries) | |
| ] | |
| def scan_vob_tracks(vob_path, verbose=True): | |
| """ | |
| Uses mkvmerge JSON mode to identify DTS and AC3 audio tracks. | |
| Returns (dts_id, ac3_id) as mkvmerge track ID strings. | |
| Always use mkvmerge's track ID (not ffmpeg stream indices) for extraction, | |
| since MPEG-PS stream ordering in ffmpeg can differ from mkvmerge's ordering. | |
| """ | |
| if verbose: | |
| print(f"Analyzing {vob_path} codecs via mkvmerge...") | |
| cmd = ["mkvmerge", "-J", vob_path] | |
| json_output = run_command(cmd, "Failed to analyze VOB file codecs using mkvmerge.") | |
| try: | |
| data = json.loads(json_output) | |
| except Exception as e: | |
| print(f"[ERROR] Failed to parse mkvmerge JSON: {e}", file=sys.stderr) | |
| sys.exit(1) | |
| dts_id = ac3_id = None | |
| best_dts_channels = -1 # track highest-channel DTS seen so far | |
| for track in data.get("tracks", []): | |
| if track.get("type") == "audio": | |
| codec = track.get("codec", "").upper() | |
| tid = str(track.get("id")) | |
| channels = track.get("properties", {}).get("audio_channels", 0) or 0 | |
| if "DTS" in codec: | |
| # Always prefer the DTS stream with the most channels. | |
| # channels==0 means unreported; treat as lowest priority so any | |
| # stream with a known count wins, but still beats having nothing. | |
| if channels > best_dts_channels: | |
| best_dts_channels = channels | |
| dts_id = tid | |
| elif ("AC-3" in codec or "AC3" in codec) and ac3_id is None: | |
| ac3_id = tid | |
| return dts_id, ac3_id | |
| def parse_ifo_chapters_exactly(ifo_path, program_chain): | |
| """ | |
| Reads VTS_PGCIT, walks programs in the given PGC, accumulates cell durations | |
| (skipping angle/interleaved cells), returns chapter start timestamps as HH:MM:SS.mmm. | |
| program_chain is determined by _find_best_pgc() during disc layout scanning. | |
| """ | |
| print(f"Parsing chapter data from {ifo_path} (PGC {program_chain})...") | |
| with open(ifo_path, "rb") as f: | |
| ifo_data = f.read() | |
| if ifo_data[0:12] != b"DVDVIDEO-VTS": | |
| print("[ERROR] File is not a valid VTS IFO file.", file=sys.stderr) | |
| sys.exit(1) | |
| pcgit_pos = int.from_bytes(ifo_data[0xCC:0xD0], "big") * 0x800 | |
| entry_base = pcgit_pos + 8 * program_chain | |
| chain_offset = int.from_bytes(ifo_data[entry_base + 4 : entry_base + 8], "big") | |
| pgc_base = pcgit_pos + chain_offset | |
| num_programs = ifo_data[pgc_base + 2] | |
| if num_programs == 0: | |
| print("[ERROR] No programs found in IFO PGC.", file=sys.stderr) | |
| sys.exit(1) | |
| program_map_offset = int.from_bytes( | |
| ifo_data[pgc_base + 0xE6 : pgc_base + 0xE8], "big" | |
| ) | |
| cell_table_offset = int.from_bytes( | |
| ifo_data[pgc_base + 0xE8 : pgc_base + 0xEA], "big" | |
| ) | |
| chapter_times_seconds = [0.0] # Chapter 1 always starts at 0 | |
| duration_seconds = 0.0 | |
| is_ntsc = True | |
| for current_program in range(num_programs): | |
| entry_cell = ifo_data[pgc_base + program_map_offset + current_program] | |
| if current_program < num_programs - 1: | |
| exit_cell = ( | |
| ifo_data[pgc_base + program_map_offset + current_program + 1] - 1 | |
| ) | |
| else: | |
| exit_cell = entry_cell | |
| program_seconds = 0.0 | |
| for current_cell in range(entry_cell, exit_cell + 1): | |
| cell_start = cell_table_offset + (current_cell - 1) * 0x18 | |
| cell_type = ifo_data[pgc_base + cell_start] >> 6 | |
| if cell_type == 0x00 or cell_type == 0x01: | |
| tb = ifo_data[pgc_base + cell_start + 4 : pgc_base + cell_start + 8] | |
| secs, cell_is_ntsc = read_ifo_time_span(tb) | |
| is_ntsc = cell_is_ntsc | |
| program_seconds += secs | |
| duration_seconds += program_seconds | |
| if current_program + 1 < num_programs: | |
| chapter_times_seconds.append(duration_seconds) | |
| return [seconds_to_timestamp(s) for s in chapter_times_seconds] | |
| def _prepare_audio_source(vob_files, chosen_id): | |
| """ | |
| Uses mkvmerge to extract the chosen audio track (by mkvmerge track ID) from | |
| the VOB(s) into MASTER_MKV, then returns that file as the ffmpeg source. | |
| Always routing through mkvmerge ensures ffmpeg receives a clean, single-track | |
| Matroska file regardless of how the MPEG-PS demuxer orders streams internally. | |
| Caller is responsible for deleting MASTER_MKV when done. | |
| """ | |
| n = len(vob_files) | |
| label = "1 VOB" if n == 1 else f"{n} VOB segments" | |
| print(f" Extracting track {chosen_id} from {label} via mkvmerge...") | |
| mkvmerge_cmd = [ | |
| "mkvmerge", | |
| "-d", | |
| "-1", | |
| "-a", | |
| chosen_id, | |
| "-o", | |
| MASTER_MKV, | |
| vob_files[0], | |
| ] | |
| run_command(mkvmerge_cmd, "mkvmerge failed to extract audio from VOB.") | |
| return MASTER_MKV, "0:a:0" | |
| def _run_ffmpeg_extract( | |
| audio_source, audio_map, output_file, output_ext, start_ts=None, end_ts=None | |
| ): | |
| """Run a single ffmpeg stream-copy slice. start_ts / end_ts are HH:MM:SS.mmm or None.""" | |
| ffmpeg_cmd = ["ffmpeg", "-i", audio_source] | |
| if start_ts: | |
| ffmpeg_cmd += ["-ss", start_ts] | |
| if end_ts: | |
| ffmpeg_cmd += ["-to", end_ts] | |
| ffmpeg_cmd += ["-map", audio_map, "-c", "copy", "-disposition:a:0", "default"] | |
| if output_ext == "ac3": | |
| ffmpeg_cmd += ["-f", "ac3"] | |
| elif output_ext == "mka": | |
| # Audio DVDs always carry hi-res as a separate stream; any EXSS extension | |
| # embedded in the DTS 5.1 bitstream is a disc authoring anomaly that breaks | |
| # hardware decoders. Strip it unconditionally. | |
| ffmpeg_cmd += ["-bsf:a", "dca_core"] | |
| ffmpeg_cmd += [output_file, "-y"] | |
| run_command(ffmpeg_cmd, f"ffmpeg failed writing {output_file}") | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Audio DVD Surround Sound Extractor") | |
| parser.add_argument( | |
| "-c", | |
| "--codec", | |
| choices=["dts", "ac3"], | |
| help="Explicitly choose audio stream. If omitted, prefers DTS then AC3.", | |
| ) | |
| args = parser.parse_args() | |
| chapter_groups = find_disc_layout() # [(ifo_path, best_pgc, vob_files), ...] | |
| # Scan groups until we find one with surround audio to determine codec preference | |
| dts_id = ac3_id = None | |
| for _ifo, _pgc, _vobs in chapter_groups: | |
| dts_id, ac3_id = scan_vob_tracks(_vobs[0], verbose=False) | |
| if dts_id or ac3_id: | |
| break | |
| chosen_id, codec_name, output_ext = None, "", "" | |
| if args.codec: | |
| if args.codec == "dts" and dts_id: | |
| chosen_id, codec_name, output_ext = dts_id, "DTS", "mka" | |
| elif args.codec == "ac3" and ac3_id: | |
| chosen_id, codec_name, output_ext = ac3_id, "AC3", "ac3" | |
| else: | |
| print( | |
| f"[ERROR] Forced codec '{args.codec}' not found in any VOB.", | |
| file=sys.stderr, | |
| ) | |
| sys.exit(1) | |
| else: | |
| if dts_id: | |
| chosen_id, codec_name, output_ext = dts_id, "DTS", "mka" | |
| elif ac3_id: | |
| chosen_id, codec_name, output_ext = ac3_id, "AC3", "ac3" | |
| else: | |
| print("[ERROR] Neither DTS nor AC3 found in any VOB.", file=sys.stderr) | |
| sys.exit(1) | |
| print(f"-> Selected Audio: {codec_name} (mkvmerge track {chosen_id})") | |
| multi_group = len(chapter_groups) > 1 | |
| for group_idx, (target_ifo, best_pgc, vob_files) in enumerate(chapter_groups): | |
| ifo_base = re.match(r"^(VTS_\d+)", os.path.basename(target_ifo), re.IGNORECASE) | |
| group_prefix = (ifo_base.group(1) + "_") if (multi_group and ifo_base) else "" | |
| if multi_group: | |
| print(f"\n=== Group {group_idx + 1}: {os.path.basename(target_ifo)} ===") | |
| # Scan per-group: skip title sets that lack the chosen codec (e.g. video-only VTS) | |
| g_dts_id, g_ac3_id = scan_vob_tracks(vob_files[0], verbose=multi_group) | |
| if codec_name == "DTS" and not g_dts_id: | |
| print(f" -> Skipping: no DTS track in {os.path.basename(vob_files[0])}") | |
| continue | |
| if codec_name == "AC3" and not g_ac3_id: | |
| print(f" -> Skipping: no AC-3 track in {os.path.basename(vob_files[0])}") | |
| continue | |
| g_id = g_dts_id if codec_name == "DTS" else g_ac3_id | |
| timestamps = parse_ifo_chapters_exactly(target_ifo, best_pgc) | |
| if not timestamps: | |
| print("[ERROR] Chapter parse returned empty table.", file=sys.stderr) | |
| sys.exit(1) | |
| print(f"[OK] {len(timestamps)} chapter start times:") | |
| for idx, ts in enumerate(timestamps): | |
| print(f" {group_prefix}Track_{idx + 1:02d}: {ts}") | |
| print(f"\n=== Preparing audio source ===") | |
| audio_source, audio_map = _prepare_audio_source(vob_files, g_id) | |
| try: | |
| print(f"\n=== Slicing {codec_name} tracks ===") | |
| for i, start_ts in enumerate(timestamps): | |
| stem = f"{group_prefix}Track_{i + 1:02d}" | |
| end_ts = timestamps[i + 1] if i + 1 < len(timestamps) else None | |
| output_file = f"{stem}.{output_ext}" | |
| print(f" {stem}: {start_ts} -> {end_ts or 'EOF'}") | |
| _run_ffmpeg_extract( | |
| audio_source, | |
| audio_map, | |
| output_file, | |
| output_ext, | |
| start_ts=start_ts, | |
| end_ts=end_ts, | |
| ) | |
| finally: | |
| if os.path.exists(MASTER_MKV): | |
| os.remove(MASTER_MKV) | |
| print(f"\n[SUCCESS] All {codec_name} tracks extracted.") | |
| if __name__ == "__main__": | |
| main() |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
For chapter data extraction from IFO files: https://github.com/tautcony/ChapterTool