Skip to content

Instantly share code, notes, and snippets.

@BenMcLean
Created June 9, 2026 23:27
Show Gist options
  • Select an option

  • Save BenMcLean/bcd843eb9ca96c056157b0fe095c752b to your computer and use it in GitHub Desktop.

Select an option

Save BenMcLean/bcd843eb9ca96c056157b0fe095c752b to your computer and use it in GitHub Desktop.
Extract 5.1 DTS streams from Audio DVDs (not DVD-Audio, not Blu-Ray, just Audio DVDs) with ffmpeg and mkvtoolnix
@ECHO OFF
cd /d "%~dp0"
python.exe "%~dpn0.py" %*
@PAUSE
import os
import sys
import re
import subprocess
import json
import argparse
MASTER_MKV = "master_linear.mkv"
# Programs shorter than this are disc-navigation dummies, not real audio tracks
MIN_TRACK_SECONDS = 5.0
def run_command(cmd, error_msg):
"""Executes a system command and hard crashes on any error code."""
result = subprocess.run(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
if result.returncode != 0:
if os.path.exists(MASTER_MKV):
os.remove(MASTER_MKV)
print(f"\n[ERROR] {error_msg}", file=sys.stderr)
print(f"CLI Error Output:\n{result.stderr}", file=sys.stderr)
sys.exit(result.returncode)
return result.stdout
def bcd_byte_to_int(bcd_byte):
return ((bcd_byte >> 4) * 10) + (bcd_byte & 0x0F)
def read_ifo_time_span(time_bytes):
"""
Parse a 4-byte DVD BCD time value (H:M:S:F) into (total_seconds, is_ntsc).
fps_mask 0x01 = 25fps PAL, 0x03 = 29.97fps NTSC.
Returns (0.0, True) when fps bits are invalid (0x00 or 0x02).
DVD NTSC timecode uses 30fps nominal integer counting: H:M:S:F where the
"S" field means "30 nominal frames" not a wall-clock second. The correct
conversion is total_nominal_frames / (30000/1001). Treating the stored
seconds as wall-clock seconds introduces a 0.1% error that accumulates to
several seconds over an album-length stream. For PAL, nominal fps equals
real fps (25), so both formulas are identical.
"""
frame_byte = time_bytes[3]
fps_mask = frame_byte >> 6
if (fps_mask & 0x01) != 1:
return 0.0, True
is_ntsc = fps_mask == 0x03
fps_nominal = 30 if is_ntsc else 25
fps_real = 30000 / 1001 if is_ntsc else 25.0
frames = bcd_byte_to_int(frame_byte & 0x3F)
hours = bcd_byte_to_int(time_bytes[0])
minutes = bcd_byte_to_int(time_bytes[1])
seconds = bcd_byte_to_int(time_bytes[2])
total_frames = (hours * 3600 + minutes * 60 + seconds) * fps_nominal + frames
return total_frames / fps_real, is_ntsc
def seconds_to_timestamp(total_seconds):
hrs = int(total_seconds // 3600)
mins = int((total_seconds % 3600) // 60)
secs = total_seconds % 60
return f"{hrs:02d}:{mins:02d}:{secs:06.3f}"
def _read_pgc_programs(ifo_data, program_chain):
"""
Parse programs in one PGC. Returns (real_programs, total_frames, is_ntsc).
real_programs excludes programs shorter than MIN_TRACK_FRAMES (disc-nav dummies).
Returns (0, 0, True) if the chain index is out of range.
"""
pcgit_pos = int.from_bytes(ifo_data[0xCC:0xD0], "big") * 0x800
num_pgcs = int.from_bytes(ifo_data[pcgit_pos : pcgit_pos + 2], "big")
if program_chain < 1 or program_chain > num_pgcs:
return 0, 0, True
entry_base = pcgit_pos + 8 * program_chain
chain_offset = int.from_bytes(ifo_data[entry_base + 4 : entry_base + 8], "big")
pgc_base = pcgit_pos + chain_offset
num_programs = ifo_data[pgc_base + 2]
if num_programs == 0:
return 0, 0, True
program_map_offset = int.from_bytes(
ifo_data[pgc_base + 0xE6 : pgc_base + 0xE8], "big"
)
cell_table_offset = int.from_bytes(
ifo_data[pgc_base + 0xE8 : pgc_base + 0xEA], "big"
)
real_programs = 0
total_seconds = 0.0
is_ntsc = True
for current_program in range(num_programs):
entry_cell = ifo_data[pgc_base + program_map_offset + current_program]
if current_program < num_programs - 1:
exit_cell = (
ifo_data[pgc_base + program_map_offset + current_program + 1] - 1
)
else:
exit_cell = entry_cell
program_seconds = 0.0
for current_cell in range(entry_cell, exit_cell + 1):
cell_start = cell_table_offset + (current_cell - 1) * 0x18
cell_type = ifo_data[pgc_base + cell_start] >> 6
if cell_type == 0x00 or cell_type == 0x01:
tb = ifo_data[pgc_base + cell_start + 4 : pgc_base + cell_start + 8]
secs, is_ntsc = read_ifo_time_span(tb)
program_seconds += secs
if program_seconds >= MIN_TRACK_SECONDS:
real_programs += 1
total_seconds += program_seconds
return real_programs, total_seconds, is_ntsc
def _find_best_pgc(ifo_data):
"""
Scan all PGCs in an IFO and return the one with the most real programs.
Returns (pgc_num, real_programs, total_seconds).
"""
pcgit_pos = int.from_bytes(ifo_data[0xCC:0xD0], "big") * 0x800
num_pgcs = int.from_bytes(ifo_data[pcgit_pos : pcgit_pos + 2], "big")
best_pgc, best_real, best_secs = 1, 0, 0.0
for pgc in range(1, num_pgcs + 1):
real, secs, _ = _read_pgc_programs(ifo_data, pgc)
if (real, secs) > (best_real, best_secs):
best_pgc, best_real, best_secs = pgc, real, secs
return best_pgc, best_real, best_secs
def find_disc_layout():
"""
Scans all VTS IFO+VOB pairs in the current directory (and VIDEO_TS/).
For every title set with at least one real audio program, selects the best
PGC (the one with the most programs) and adds it to the output list.
Returns [(ifo_path, best_pgc, vob_files), ...] sorted by VTS number.
No attempt is made to guess which title sets belong to the album — the
caller extracts all of them and skips any that lack the chosen codec.
"""
search_dirs = ["."]
if os.path.isdir("VIDEO_TS"):
search_dirs.append("VIDEO_TS")
# vts_entries[vts_num] = (real_progs, total_secs, best_pgc, ifo_path, vob_files)
vts_entries = {}
for d in search_dirs:
for f in sorted(os.listdir(d)):
m = re.match(r"^(VTS_(\d+))_0\.IFO$", f, re.IGNORECASE)
if not m:
continue
vts_num = int(m.group(2))
vts_base = m.group(1)
ifo_path = os.path.join(d, f)
vob_files = sorted(
os.path.join(d, v)
for v in os.listdir(d)
if re.match(
r"^" + re.escape(vts_base) + r"_[1-9]\d*\.VOB$", v, re.IGNORECASE
)
)
if not vob_files:
continue
try:
ifo_data = open(ifo_path, "rb").read()
except OSError:
continue
if ifo_data[0:12] != b"DVDVIDEO-VTS":
continue
best_pgc, real_progs, total_secs = _find_best_pgc(ifo_data)
if real_progs < 1:
continue
existing = vts_entries.get(vts_num)
if existing is None or (real_progs, total_secs) > (
existing[0],
existing[1],
):
vts_entries[vts_num] = (
real_progs,
total_secs,
best_pgc,
ifo_path,
vob_files,
)
if not vts_entries:
print(
"[ERROR] No valid IFO/VOB pairs found in current directory or VIDEO_TS/.",
file=sys.stderr,
)
sys.exit(1)
total_tracks = sum(e[0] for e in vts_entries.values())
if len(vts_entries) == 1:
e = next(iter(vts_entries.values()))
print(
f"-> Found 1 title set: {e[0]} track(s) in {os.path.basename(e[3])} (PGC {e[2]})"
)
else:
desc = ", ".join(
f"{vts_entries[n][0]} in {os.path.basename(vts_entries[n][3])} (PGC {vts_entries[n][2]})"
for n in sorted(vts_entries)
)
print(
f"-> Found {len(vts_entries)} title sets, {total_tracks} total tracks: {desc}"
)
return [
(vts_entries[n][3], vts_entries[n][2], vts_entries[n][4])
for n in sorted(vts_entries)
]
def scan_vob_tracks(vob_path, verbose=True):
"""
Uses mkvmerge JSON mode to identify DTS and AC3 audio tracks.
Returns (dts_id, ac3_id) as mkvmerge track ID strings.
Always use mkvmerge's track ID (not ffmpeg stream indices) for extraction,
since MPEG-PS stream ordering in ffmpeg can differ from mkvmerge's ordering.
"""
if verbose:
print(f"Analyzing {vob_path} codecs via mkvmerge...")
cmd = ["mkvmerge", "-J", vob_path]
json_output = run_command(cmd, "Failed to analyze VOB file codecs using mkvmerge.")
try:
data = json.loads(json_output)
except Exception as e:
print(f"[ERROR] Failed to parse mkvmerge JSON: {e}", file=sys.stderr)
sys.exit(1)
dts_id = ac3_id = None
best_dts_channels = -1 # track highest-channel DTS seen so far
for track in data.get("tracks", []):
if track.get("type") == "audio":
codec = track.get("codec", "").upper()
tid = str(track.get("id"))
channels = track.get("properties", {}).get("audio_channels", 0) or 0
if "DTS" in codec:
# Always prefer the DTS stream with the most channels.
# channels==0 means unreported; treat as lowest priority so any
# stream with a known count wins, but still beats having nothing.
if channels > best_dts_channels:
best_dts_channels = channels
dts_id = tid
elif ("AC-3" in codec or "AC3" in codec) and ac3_id is None:
ac3_id = tid
return dts_id, ac3_id
def parse_ifo_chapters_exactly(ifo_path, program_chain):
"""
Reads VTS_PGCIT, walks programs in the given PGC, accumulates cell durations
(skipping angle/interleaved cells), returns chapter start timestamps as HH:MM:SS.mmm.
program_chain is determined by _find_best_pgc() during disc layout scanning.
"""
print(f"Parsing chapter data from {ifo_path} (PGC {program_chain})...")
with open(ifo_path, "rb") as f:
ifo_data = f.read()
if ifo_data[0:12] != b"DVDVIDEO-VTS":
print("[ERROR] File is not a valid VTS IFO file.", file=sys.stderr)
sys.exit(1)
pcgit_pos = int.from_bytes(ifo_data[0xCC:0xD0], "big") * 0x800
entry_base = pcgit_pos + 8 * program_chain
chain_offset = int.from_bytes(ifo_data[entry_base + 4 : entry_base + 8], "big")
pgc_base = pcgit_pos + chain_offset
num_programs = ifo_data[pgc_base + 2]
if num_programs == 0:
print("[ERROR] No programs found in IFO PGC.", file=sys.stderr)
sys.exit(1)
program_map_offset = int.from_bytes(
ifo_data[pgc_base + 0xE6 : pgc_base + 0xE8], "big"
)
cell_table_offset = int.from_bytes(
ifo_data[pgc_base + 0xE8 : pgc_base + 0xEA], "big"
)
chapter_times_seconds = [0.0] # Chapter 1 always starts at 0
duration_seconds = 0.0
is_ntsc = True
for current_program in range(num_programs):
entry_cell = ifo_data[pgc_base + program_map_offset + current_program]
if current_program < num_programs - 1:
exit_cell = (
ifo_data[pgc_base + program_map_offset + current_program + 1] - 1
)
else:
exit_cell = entry_cell
program_seconds = 0.0
for current_cell in range(entry_cell, exit_cell + 1):
cell_start = cell_table_offset + (current_cell - 1) * 0x18
cell_type = ifo_data[pgc_base + cell_start] >> 6
if cell_type == 0x00 or cell_type == 0x01:
tb = ifo_data[pgc_base + cell_start + 4 : pgc_base + cell_start + 8]
secs, cell_is_ntsc = read_ifo_time_span(tb)
is_ntsc = cell_is_ntsc
program_seconds += secs
duration_seconds += program_seconds
if current_program + 1 < num_programs:
chapter_times_seconds.append(duration_seconds)
return [seconds_to_timestamp(s) for s in chapter_times_seconds]
def _prepare_audio_source(vob_files, chosen_id):
"""
Uses mkvmerge to extract the chosen audio track (by mkvmerge track ID) from
the VOB(s) into MASTER_MKV, then returns that file as the ffmpeg source.
Always routing through mkvmerge ensures ffmpeg receives a clean, single-track
Matroska file regardless of how the MPEG-PS demuxer orders streams internally.
Caller is responsible for deleting MASTER_MKV when done.
"""
n = len(vob_files)
label = "1 VOB" if n == 1 else f"{n} VOB segments"
print(f" Extracting track {chosen_id} from {label} via mkvmerge...")
mkvmerge_cmd = [
"mkvmerge",
"-d",
"-1",
"-a",
chosen_id,
"-o",
MASTER_MKV,
vob_files[0],
]
run_command(mkvmerge_cmd, "mkvmerge failed to extract audio from VOB.")
return MASTER_MKV, "0:a:0"
def _run_ffmpeg_extract(
audio_source, audio_map, output_file, output_ext, start_ts=None, end_ts=None
):
"""Run a single ffmpeg stream-copy slice. start_ts / end_ts are HH:MM:SS.mmm or None."""
ffmpeg_cmd = ["ffmpeg", "-i", audio_source]
if start_ts:
ffmpeg_cmd += ["-ss", start_ts]
if end_ts:
ffmpeg_cmd += ["-to", end_ts]
ffmpeg_cmd += ["-map", audio_map, "-c", "copy", "-disposition:a:0", "default"]
if output_ext == "ac3":
ffmpeg_cmd += ["-f", "ac3"]
elif output_ext == "mka":
# Audio DVDs always carry hi-res as a separate stream; any EXSS extension
# embedded in the DTS 5.1 bitstream is a disc authoring anomaly that breaks
# hardware decoders. Strip it unconditionally.
ffmpeg_cmd += ["-bsf:a", "dca_core"]
ffmpeg_cmd += [output_file, "-y"]
run_command(ffmpeg_cmd, f"ffmpeg failed writing {output_file}")
def main():
parser = argparse.ArgumentParser(description="Audio DVD Surround Sound Extractor")
parser.add_argument(
"-c",
"--codec",
choices=["dts", "ac3"],
help="Explicitly choose audio stream. If omitted, prefers DTS then AC3.",
)
args = parser.parse_args()
chapter_groups = find_disc_layout() # [(ifo_path, best_pgc, vob_files), ...]
# Scan groups until we find one with surround audio to determine codec preference
dts_id = ac3_id = None
for _ifo, _pgc, _vobs in chapter_groups:
dts_id, ac3_id = scan_vob_tracks(_vobs[0], verbose=False)
if dts_id or ac3_id:
break
chosen_id, codec_name, output_ext = None, "", ""
if args.codec:
if args.codec == "dts" and dts_id:
chosen_id, codec_name, output_ext = dts_id, "DTS", "mka"
elif args.codec == "ac3" and ac3_id:
chosen_id, codec_name, output_ext = ac3_id, "AC3", "ac3"
else:
print(
f"[ERROR] Forced codec '{args.codec}' not found in any VOB.",
file=sys.stderr,
)
sys.exit(1)
else:
if dts_id:
chosen_id, codec_name, output_ext = dts_id, "DTS", "mka"
elif ac3_id:
chosen_id, codec_name, output_ext = ac3_id, "AC3", "ac3"
else:
print("[ERROR] Neither DTS nor AC3 found in any VOB.", file=sys.stderr)
sys.exit(1)
print(f"-> Selected Audio: {codec_name} (mkvmerge track {chosen_id})")
multi_group = len(chapter_groups) > 1
for group_idx, (target_ifo, best_pgc, vob_files) in enumerate(chapter_groups):
ifo_base = re.match(r"^(VTS_\d+)", os.path.basename(target_ifo), re.IGNORECASE)
group_prefix = (ifo_base.group(1) + "_") if (multi_group and ifo_base) else ""
if multi_group:
print(f"\n=== Group {group_idx + 1}: {os.path.basename(target_ifo)} ===")
# Scan per-group: skip title sets that lack the chosen codec (e.g. video-only VTS)
g_dts_id, g_ac3_id = scan_vob_tracks(vob_files[0], verbose=multi_group)
if codec_name == "DTS" and not g_dts_id:
print(f" -> Skipping: no DTS track in {os.path.basename(vob_files[0])}")
continue
if codec_name == "AC3" and not g_ac3_id:
print(f" -> Skipping: no AC-3 track in {os.path.basename(vob_files[0])}")
continue
g_id = g_dts_id if codec_name == "DTS" else g_ac3_id
timestamps = parse_ifo_chapters_exactly(target_ifo, best_pgc)
if not timestamps:
print("[ERROR] Chapter parse returned empty table.", file=sys.stderr)
sys.exit(1)
print(f"[OK] {len(timestamps)} chapter start times:")
for idx, ts in enumerate(timestamps):
print(f" {group_prefix}Track_{idx + 1:02d}: {ts}")
print(f"\n=== Preparing audio source ===")
audio_source, audio_map = _prepare_audio_source(vob_files, g_id)
try:
print(f"\n=== Slicing {codec_name} tracks ===")
for i, start_ts in enumerate(timestamps):
stem = f"{group_prefix}Track_{i + 1:02d}"
end_ts = timestamps[i + 1] if i + 1 < len(timestamps) else None
output_file = f"{stem}.{output_ext}"
print(f" {stem}: {start_ts} -> {end_ts or 'EOF'}")
_run_ffmpeg_extract(
audio_source,
audio_map,
output_file,
output_ext,
start_ts=start_ts,
end_ts=end_ts,
)
finally:
if os.path.exists(MASTER_MKV):
os.remove(MASTER_MKV)
print(f"\n[SUCCESS] All {codec_name} tracks extracted.")
if __name__ == "__main__":
main()
@BenMcLean

BenMcLean commented Jun 9, 2026

Copy link
Copy Markdown
Author

For chapter data extraction from IFO files: https://github.com/tautcony/ChapterTool

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment