Skip to content

Instantly share code, notes, and snippets.

@slhck
Created April 2, 2025 15:13
Show Gist options
  • Save slhck/e04fc894c9cc9320233b19e4ec16d559 to your computer and use it in GitHub Desktop.
Save slhck/e04fc894c9cc9320233b19e4ec16d559 to your computer and use it in GitHub Desktop.
find_offset.py
#!/usr/bin/env python3
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "p-tqdm",
# ]
# ///
#
# Find the temporal offset between two videos.
# Do this by computing the PSNR of the first against the second video for all frames
# within the specified search range, then finding the frame with the highest PSNR.
# The offset is the difference between the frame numbers of the two videos, expressed
# in frames or seconds (depending on the FPS).
#
# If both videos have different framerates, both are converted to the higher framerate
# via interpolation.
#
# For a basic example, just call the script with two videos:
#
# python3 find_offset.py ref.mp4 dist.mp4
#
# If you have some clues about how much the distorted video is delayed against the reference,
# you can use that to speed up the computation significantly.
# For example, if you know that the distorted video is delayed by somewhere between 10 and 12 seconds
# against the reference, you can start the search at 10 seconds, and limit the analysis duration within
# the reference to 2 seconds:
#
# python3 find_offset.py ref.mp4 dist.mp4 --start-offset 10 --max-duration 2
#
# This will truncate the calculations at 2 seconds into each comparison. You can also specify a
# smaller comparison window with the --max-search-offset parameter, which will limit the search
# range to 2 seconds within the distorted clip, but will still compute the PSNR for the entire duration
# of the reference/distorted video, so it might be slower.
#
# Requirements:
# - ffmpeg
# - ffprobe
# - pip3 install p_tqdm
#
# Author: Werner Robitza
# License: MIT
import argparse
import json
import logging
import os
import shlex
import subprocess
import textwrap
from datetime import datetime
from typing import Dict, Optional
from p_tqdm import p_imap
from tqdm import tqdm
def convert_json_strings_as_numbers(obj: Dict) -> Dict:
"""
Convert any actual numbers encapsulated in strings to numbers (floats)
Args:
obj (Dict): The object to convert
Returns:
Dict: The converted object
"""
for key, value in obj.items():
if isinstance(value, str):
try:
obj[key] = float(value)
except ValueError:
pass
elif isinstance(value, list):
for i, idx in enumerate(value):
if isinstance(idx, dict):
value[i] = convert_json_strings_as_numbers(idx)
else:
try:
value[i] = float(idx)
except ValueError:
pass
elif isinstance(value, dict):
obj[key] = convert_json_strings_as_numbers(value)
return obj
def get_ffmpeg_psnr(
ref: str,
dist: str,
fps: float,
max_duration: Optional[float] = None,
ref_offset: float = 0,
dist_offset: float = 0,
) -> float:
"""
Compute the PSNR of the distorted video compared to the reference.
The computation can be shifted by the specified offsets, allowing to find
the best offset -- the computation will stop when the shorter of the two videos
has ended (eof_action=endall).
Args:
ref: The reference video
dist: The distorted video
fps: Force the framerate of the videos to this value
max_duration: Maximum duration to consider (in seconds). Defaults to None (unlimited).
ref_offset: The offset of the reference video (in seconds). Defaults to 0.
dist_offset: The offset of the distorted video (in seconds). Defaults to 0.
Returns:
float: The average PSNR
"""
filter_chains = [
"[1][0]scale2ref[dist][ref]",
f"[dist]settb=AVTB,setpts=PTS-STARTPTS,fps=fps={fps}[distpts]",
f"[ref]settb=AVTB,setpts=PTS-STARTPTS,fps=fps={fps}[refpts]",
"[distpts][refpts]psnr=eof_action=endall,metadata=mode=print[out]",
]
cmd = [
"ffmpeg",
"-ss",
str(ref_offset),
"-i",
ref,
"-ss",
str(dist_offset),
"-i",
dist,
"-filter_complex",
";".join(filter_chains),
]
if max_duration is not None and max_duration > 0:
cmd.extend(["-t", str(max_duration)])
cmd.extend(["-map", "[out]", "-f", "null", "/dev/null"])
try:
logging.debug(f"Running command: {shlex.join(cmd)}")
ffmpeg_output = subprocess.check_output(
cmd,
stderr=subprocess.STDOUT,
).decode("utf-8")
except subprocess.CalledProcessError as e:
logging.error(f"ffmpeg exited with code {e.returncode}")
logging.error(e.output.decode("utf-8"))
raise e
# logging.debug(ffmpeg_output)
# find the line that matches this, and extract the average:
# [Parsed_psnr_0 @ 0x13b604440] PSNR y:5.771137 u:11.169655 v:11.014660 average:6.936902 min:6.936902 max:6.936902
try:
psnr_lines = [
line
for line in ffmpeg_output.split("\n")
if line.startswith("[Parsed_psnr")
]
last_line = psnr_lines[-1]
return float(last_line.split("average:")[1].split(" ")[0])
except IndexError as e:
logging.error(f"ffmpeg output did not contain PSNR lines: {ffmpeg_output}")
raise e
def get_video_stream_info_ffprobe(file: str) -> Dict:
"""
Get basic stream info via ffprobe in JSON format.
Example:
{'programs': [], 'streams': [{'codec_name': 'h264', 'width': 1280, 'height': 720, 'r_frame_rate': '50/1',
'duration': 42.84, 'bit_rate': 1052958.0, 'nb_frames': 2142.0}, {'codec_name': 'aac', 'sample_rate': 48000.0,
'channels': 2, 'r_frame_rate': '0/0', 'duration': 42.83, 'bit_rate': 125339.0, 'nb_frames': 2073.0}]}
Properties for each stream:
- Video codec
- Video bitrate
- Video resolution
- Video framerate
- Video duration
- Audio codec
- Audio bitrate
- Audio channels
- Audio sample rate
- Audio duration
Args:
file (str): The video file
"""
logging.info(f"Getting stream info for {file}")
return convert_json_strings_as_numbers(
json.loads(
subprocess.check_output(
[
"ffprobe",
"-v",
"error",
"-show_entries",
"stream=codec_name,bit_rate,width,height,r_frame_rate,nb_frames,duration,channels,sample_rate",
"-of",
"json",
file,
]
)
)
)
def get_best_offset(
ref: str,
dist: str,
fps: float,
ref_frames: int,
dist_frames: int,
max_search_offset: Optional[float] = None,
max_duration: Optional[float] = None,
start_offset: float = 0,
step_size: int = 1,
cpu_count: Optional[float] = os.cpu_count(),
) -> tuple[float, float]:
"""
Find the most likely offset of two videos (ref, dist) by computing the PSNR of the distorted video compared to the reference.
Args:
ref (str): Reference video
dist (str): Distorted video
fps (float): Force the framerate of the videos to this value
ref_frames (int): The number of frames in the reference video (unused)
dist_frames (int): The number of frames in the distorted video
max_search_offset (Optional[float], optional): Maximum search range for the delay (in seconds), default: unlimited
max_duration (Optional[float], optional): Maximum duration to consider (in seconds). Defaults to None (unlimited).
start_offset (int, optional): Begin offset of the search (in seconds). Defaults to 0.
step_size (int, optional): Frame step size. Defaults to 1.
cpu_count (Optional[float], optional): Number of CPUs to use. Defaults to os.cpu_count().
Returns:
tuple[float, float]: A tuple of [offset, avg_psnr]
"""
logging.info(
f"Computing PSNR for {dist} compared to {ref}, starting offset: {start_offset}, step size: {step_size}"
)
# a tuple of [offset, avg_psnr]
results: list[tuple[float, float]] = []
frame_duration = 1 / fps
# we only consider the maximum duration of the distorted video (as it is shorter),
# or the one the user specified
max_search_frames: int = dist_frames
if max_search_offset is not None:
if (new_max_search_frames := max_search_offset * fps) > dist_frames:
logging.warning(
f"Maximum duration {max_search_offset} exceeds the duration of the distorted video ({dist_frames/fps} seconds), using the latter."
)
else:
max_search_frames = int(new_max_search_frames)
# determine search range --> we shift the distorted video over the reference video
start_offset_frames = int(start_offset * fps)
logging.info(f"Searching in [{start_offset_frames}, {max_search_frames}]")
frame_search_range = range(start_offset_frames, max_search_frames, step_size)
# single- or multi-threaded
if cpu_count == 1:
for frame in tqdm(frame_search_range):
avg_psnr = get_ffmpeg_psnr(
ref,
dist,
fps,
max_duration=max_duration,
dist_offset=frame * frame_duration,
)
results.append((frame, avg_psnr))
else:
ffmpeg_iterator = p_imap(
lambda frame: get_ffmpeg_psnr(
ref,
dist,
fps,
max_duration=max_duration,
dist_offset=frame * frame_duration,
),
frame_search_range,
num_cpus=cpu_count,
)
for frame, avg_psnr in enumerate(ffmpeg_iterator):
results.append((frame, avg_psnr))
logging.debug(f"Results: {json.dumps(results, indent=2)}")
return max(results, key=lambda x: x[1])
def main():
parser = argparse.ArgumentParser(
description=textwrap.dedent(
"""
Find the temporal offset between two videos, where the first one is the reference, and the second one is
being shifted to match the reference. It is assumed that the second video is a distorted, possibly
downscaled version of the first one, and that it is delayed against the reference.
If your reference is delayed, invert the arguments for ref and dist.
"""
),
add_help=True,
)
parser.add_argument(
"ref",
type=str,
help="The reference video",
)
parser.add_argument(
"dist",
type=str,
help="The distorted, delayed video",
)
parser.add_argument(
"-s",
"--max-search-offset",
type=float,
help="Maximum search range for the delay (in seconds), default: unlimited",
)
parser.add_argument(
"-m",
"--max-duration",
type=float,
help="Maximum duration of the reference to consider (in seconds), default: unlimited",
)
parser.add_argument(
"-o",
"--start-offset",
type=float,
default=0,
help="Begin offset of the search for the distorted video (in seconds), default: 0",
)
parser.add_argument(
"--cpu-count",
type=int,
default=os.cpu_count(),
help="Number of ffmpeg processes to use for PSNR computation, default: number of CPU cores",
)
parser.add_argument(
"-v",
"--verbose",
action="store_true",
help="Whether to print debug messages",
)
args = parser.parse_args()
# Setup logging
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
)
# ============================================================================================
# READ THE VIDEO INFO
# read the info of the reference video
ref_info = get_video_stream_info_ffprobe(args.ref)["streams"][0]
logging.info(f"Reference video info: {ref_info}")
ref_fps = ref_info["r_frame_rate"]
ref_frames = int(ref_info["nb_frames"])
ref_width = int(ref_info["width"])
ref_height = int(ref_info["height"])
# read the info of the distorted video
dist_info = get_video_stream_info_ffprobe(args.dist)["streams"][0]
logging.info(f"Distorted video info: {dist_info}")
dist_fps = dist_info["r_frame_rate"]
dist_frames = int(dist_info["nb_frames"])
dist_width = int(dist_info["width"])
dist_height = int(dist_info["height"])
# check if the videos have the same resolution
if ref_width != dist_width or ref_height != dist_height:
logging.warning(
f"The videos have different resolutions: {ref_width}x{ref_height} vs {dist_width}x{dist_height}. The distorted video will be resampled to match the reference. This may produce incorrect results."
)
# check if the videos have the same framerate
if ref_fps != dist_fps:
logging.warning(
f"The videos have different framerates: {ref_fps} vs {dist_fps}. The distorted video will be resampled to match the reference. This may produce incorrect results."
)
# determine a single frame duration
if "/" in ref_fps:
def fraction_to_float(fraction: str) -> float:
"""
Convert a fraction string to a float.
Example: "30000/1001" -> 29.97002997002997
Args:
fraction (str): The fraction string
Returns:
float: The float value
"""
numerator, denominator = map(float, fraction.split("/"))
return numerator / denominator
ref_fps = fraction_to_float(ref_fps)
if ref_frames < dist_frames:
logging.warning(
f"The reference video has fewer frames ({ref_frames}) than the distorted video ({dist_frames}). Will not consider the extra frames for the alignment."
)
# ============================================================================================
# RUN THE ALGORITHM
begin_time = datetime.now()
found_offset, max_psnr = get_best_offset(
args.ref,
args.dist,
ref_fps,
ref_frames,
dist_frames,
max_search_offset=args.max_search_offset,
max_duration=args.max_duration,
start_offset=args.start_offset,
cpu_count=args.cpu_count,
)
end_time = datetime.now()
logging.info(f"Computation finished in {end_time - begin_time} seconds")
if args.start_offset > 0:
logging.info(
f"Found offset at frame {found_offset+args.start_offset} with PSNR {max_psnr}"
)
else:
logging.info(f"Found offset at frame {found_offset} with PSNR {max_psnr}")
# ============================================================================================
# PRINT THE RESULTS AS JSON
print(
json.dumps(
{
"date": datetime.now().isoformat(),
"reference": args.ref,
"distorted": args.dist,
"offset_frames": int(args.start_offset * ref_fps + found_offset),
"offset_seconds": (args.start_offset * ref_fps + found_offset)
* 1
/ ref_fps,
"fps": ref_fps,
"max_psnr": max_psnr,
"settings": {
"max_search_offset": args.max_search_offset,
"start_offset": args.start_offset,
"max_duration": args.max_duration,
"cpu_count": args.cpu_count,
"compute_time": (end_time - begin_time).total_seconds(),
},
},
indent=2,
)
)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment