Created
April 2, 2025 15:13
-
-
Save slhck/e04fc894c9cc9320233b19e4ec16d559 to your computer and use it in GitHub Desktop.
find_offset.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# /// script | |
# requires-python = ">=3.11" | |
# dependencies = [ | |
# "p-tqdm", | |
# ] | |
# /// | |
# | |
# Find the temporal offset between two videos. | |
# Do this by computing the PSNR of the first against the second video for all frames | |
# within the specified search range, then finding the frame with the highest PSNR. | |
# The offset is the difference between the frame numbers of the two videos, expressed | |
# in frames or seconds (depending on the FPS). | |
# | |
# If both videos have different framerates, both are converted to the higher framerate | |
# via interpolation. | |
# | |
# For a basic example, just call the script with two videos: | |
# | |
# python3 find_offset.py ref.mp4 dist.mp4 | |
# | |
# If you have some clues about how much the distorted video is delayed against the reference, | |
# you can use that to speed up the computation significantly. | |
# For example, if you know that the distorted video is delayed by somewhere between 10 and 12 seconds | |
# against the reference, you can start the search at 10 seconds, and limit the analysis duration within | |
# the reference to 2 seconds: | |
# | |
# python3 find_offset.py ref.mp4 dist.mp4 --start-offset 10 --max-duration 2 | |
# | |
# This will truncate the calculations at 2 seconds into each comparison. You can also specify a | |
# smaller comparison window with the --max-search-offset parameter, which will limit the search | |
# range to 2 seconds within the distorted clip, but will still compute the PSNR for the entire duration | |
# of the reference/distorted video, so it might be slower. | |
# | |
# Requirements: | |
# - ffmpeg | |
# - ffprobe | |
# - pip3 install p_tqdm | |
# | |
# Author: Werner Robitza | |
# License: MIT | |
import argparse | |
import json | |
import logging | |
import os | |
import shlex | |
import subprocess | |
import textwrap | |
from datetime import datetime | |
from typing import Dict, Optional | |
from p_tqdm import p_imap | |
from tqdm import tqdm | |
def convert_json_strings_as_numbers(obj: Dict) -> Dict: | |
""" | |
Convert any actual numbers encapsulated in strings to numbers (floats) | |
Args: | |
obj (Dict): The object to convert | |
Returns: | |
Dict: The converted object | |
""" | |
for key, value in obj.items(): | |
if isinstance(value, str): | |
try: | |
obj[key] = float(value) | |
except ValueError: | |
pass | |
elif isinstance(value, list): | |
for i, idx in enumerate(value): | |
if isinstance(idx, dict): | |
value[i] = convert_json_strings_as_numbers(idx) | |
else: | |
try: | |
value[i] = float(idx) | |
except ValueError: | |
pass | |
elif isinstance(value, dict): | |
obj[key] = convert_json_strings_as_numbers(value) | |
return obj | |
def get_ffmpeg_psnr( | |
ref: str, | |
dist: str, | |
fps: float, | |
max_duration: Optional[float] = None, | |
ref_offset: float = 0, | |
dist_offset: float = 0, | |
) -> float: | |
""" | |
Compute the PSNR of the distorted video compared to the reference. | |
The computation can be shifted by the specified offsets, allowing to find | |
the best offset -- the computation will stop when the shorter of the two videos | |
has ended (eof_action=endall). | |
Args: | |
ref: The reference video | |
dist: The distorted video | |
fps: Force the framerate of the videos to this value | |
max_duration: Maximum duration to consider (in seconds). Defaults to None (unlimited). | |
ref_offset: The offset of the reference video (in seconds). Defaults to 0. | |
dist_offset: The offset of the distorted video (in seconds). Defaults to 0. | |
Returns: | |
float: The average PSNR | |
""" | |
filter_chains = [ | |
"[1][0]scale2ref[dist][ref]", | |
f"[dist]settb=AVTB,setpts=PTS-STARTPTS,fps=fps={fps}[distpts]", | |
f"[ref]settb=AVTB,setpts=PTS-STARTPTS,fps=fps={fps}[refpts]", | |
"[distpts][refpts]psnr=eof_action=endall,metadata=mode=print[out]", | |
] | |
cmd = [ | |
"ffmpeg", | |
"-ss", | |
str(ref_offset), | |
"-i", | |
ref, | |
"-ss", | |
str(dist_offset), | |
"-i", | |
dist, | |
"-filter_complex", | |
";".join(filter_chains), | |
] | |
if max_duration is not None and max_duration > 0: | |
cmd.extend(["-t", str(max_duration)]) | |
cmd.extend(["-map", "[out]", "-f", "null", "/dev/null"]) | |
try: | |
logging.debug(f"Running command: {shlex.join(cmd)}") | |
ffmpeg_output = subprocess.check_output( | |
cmd, | |
stderr=subprocess.STDOUT, | |
).decode("utf-8") | |
except subprocess.CalledProcessError as e: | |
logging.error(f"ffmpeg exited with code {e.returncode}") | |
logging.error(e.output.decode("utf-8")) | |
raise e | |
# logging.debug(ffmpeg_output) | |
# find the line that matches this, and extract the average: | |
# [Parsed_psnr_0 @ 0x13b604440] PSNR y:5.771137 u:11.169655 v:11.014660 average:6.936902 min:6.936902 max:6.936902 | |
try: | |
psnr_lines = [ | |
line | |
for line in ffmpeg_output.split("\n") | |
if line.startswith("[Parsed_psnr") | |
] | |
last_line = psnr_lines[-1] | |
return float(last_line.split("average:")[1].split(" ")[0]) | |
except IndexError as e: | |
logging.error(f"ffmpeg output did not contain PSNR lines: {ffmpeg_output}") | |
raise e | |
def get_video_stream_info_ffprobe(file: str) -> Dict: | |
""" | |
Get basic stream info via ffprobe in JSON format. | |
Example: | |
{'programs': [], 'streams': [{'codec_name': 'h264', 'width': 1280, 'height': 720, 'r_frame_rate': '50/1', | |
'duration': 42.84, 'bit_rate': 1052958.0, 'nb_frames': 2142.0}, {'codec_name': 'aac', 'sample_rate': 48000.0, | |
'channels': 2, 'r_frame_rate': '0/0', 'duration': 42.83, 'bit_rate': 125339.0, 'nb_frames': 2073.0}]} | |
Properties for each stream: | |
- Video codec | |
- Video bitrate | |
- Video resolution | |
- Video framerate | |
- Video duration | |
- Audio codec | |
- Audio bitrate | |
- Audio channels | |
- Audio sample rate | |
- Audio duration | |
Args: | |
file (str): The video file | |
""" | |
logging.info(f"Getting stream info for {file}") | |
return convert_json_strings_as_numbers( | |
json.loads( | |
subprocess.check_output( | |
[ | |
"ffprobe", | |
"-v", | |
"error", | |
"-show_entries", | |
"stream=codec_name,bit_rate,width,height,r_frame_rate,nb_frames,duration,channels,sample_rate", | |
"-of", | |
"json", | |
file, | |
] | |
) | |
) | |
) | |
def get_best_offset( | |
ref: str, | |
dist: str, | |
fps: float, | |
ref_frames: int, | |
dist_frames: int, | |
max_search_offset: Optional[float] = None, | |
max_duration: Optional[float] = None, | |
start_offset: float = 0, | |
step_size: int = 1, | |
cpu_count: Optional[float] = os.cpu_count(), | |
) -> tuple[float, float]: | |
""" | |
Find the most likely offset of two videos (ref, dist) by computing the PSNR of the distorted video compared to the reference. | |
Args: | |
ref (str): Reference video | |
dist (str): Distorted video | |
fps (float): Force the framerate of the videos to this value | |
ref_frames (int): The number of frames in the reference video (unused) | |
dist_frames (int): The number of frames in the distorted video | |
max_search_offset (Optional[float], optional): Maximum search range for the delay (in seconds), default: unlimited | |
max_duration (Optional[float], optional): Maximum duration to consider (in seconds). Defaults to None (unlimited). | |
start_offset (int, optional): Begin offset of the search (in seconds). Defaults to 0. | |
step_size (int, optional): Frame step size. Defaults to 1. | |
cpu_count (Optional[float], optional): Number of CPUs to use. Defaults to os.cpu_count(). | |
Returns: | |
tuple[float, float]: A tuple of [offset, avg_psnr] | |
""" | |
logging.info( | |
f"Computing PSNR for {dist} compared to {ref}, starting offset: {start_offset}, step size: {step_size}" | |
) | |
# a tuple of [offset, avg_psnr] | |
results: list[tuple[float, float]] = [] | |
frame_duration = 1 / fps | |
# we only consider the maximum duration of the distorted video (as it is shorter), | |
# or the one the user specified | |
max_search_frames: int = dist_frames | |
if max_search_offset is not None: | |
if (new_max_search_frames := max_search_offset * fps) > dist_frames: | |
logging.warning( | |
f"Maximum duration {max_search_offset} exceeds the duration of the distorted video ({dist_frames/fps} seconds), using the latter." | |
) | |
else: | |
max_search_frames = int(new_max_search_frames) | |
# determine search range --> we shift the distorted video over the reference video | |
start_offset_frames = int(start_offset * fps) | |
logging.info(f"Searching in [{start_offset_frames}, {max_search_frames}]") | |
frame_search_range = range(start_offset_frames, max_search_frames, step_size) | |
# single- or multi-threaded | |
if cpu_count == 1: | |
for frame in tqdm(frame_search_range): | |
avg_psnr = get_ffmpeg_psnr( | |
ref, | |
dist, | |
fps, | |
max_duration=max_duration, | |
dist_offset=frame * frame_duration, | |
) | |
results.append((frame, avg_psnr)) | |
else: | |
ffmpeg_iterator = p_imap( | |
lambda frame: get_ffmpeg_psnr( | |
ref, | |
dist, | |
fps, | |
max_duration=max_duration, | |
dist_offset=frame * frame_duration, | |
), | |
frame_search_range, | |
num_cpus=cpu_count, | |
) | |
for frame, avg_psnr in enumerate(ffmpeg_iterator): | |
results.append((frame, avg_psnr)) | |
logging.debug(f"Results: {json.dumps(results, indent=2)}") | |
return max(results, key=lambda x: x[1]) | |
def main(): | |
parser = argparse.ArgumentParser( | |
description=textwrap.dedent( | |
""" | |
Find the temporal offset between two videos, where the first one is the reference, and the second one is | |
being shifted to match the reference. It is assumed that the second video is a distorted, possibly | |
downscaled version of the first one, and that it is delayed against the reference. | |
If your reference is delayed, invert the arguments for ref and dist. | |
""" | |
), | |
add_help=True, | |
) | |
parser.add_argument( | |
"ref", | |
type=str, | |
help="The reference video", | |
) | |
parser.add_argument( | |
"dist", | |
type=str, | |
help="The distorted, delayed video", | |
) | |
parser.add_argument( | |
"-s", | |
"--max-search-offset", | |
type=float, | |
help="Maximum search range for the delay (in seconds), default: unlimited", | |
) | |
parser.add_argument( | |
"-m", | |
"--max-duration", | |
type=float, | |
help="Maximum duration of the reference to consider (in seconds), default: unlimited", | |
) | |
parser.add_argument( | |
"-o", | |
"--start-offset", | |
type=float, | |
default=0, | |
help="Begin offset of the search for the distorted video (in seconds), default: 0", | |
) | |
parser.add_argument( | |
"--cpu-count", | |
type=int, | |
default=os.cpu_count(), | |
help="Number of ffmpeg processes to use for PSNR computation, default: number of CPU cores", | |
) | |
parser.add_argument( | |
"-v", | |
"--verbose", | |
action="store_true", | |
help="Whether to print debug messages", | |
) | |
args = parser.parse_args() | |
# Setup logging | |
logging.basicConfig( | |
level=logging.DEBUG if args.verbose else logging.INFO, | |
format="%(asctime)s - %(levelname)s - %(message)s", | |
) | |
# ============================================================================================ | |
# READ THE VIDEO INFO | |
# read the info of the reference video | |
ref_info = get_video_stream_info_ffprobe(args.ref)["streams"][0] | |
logging.info(f"Reference video info: {ref_info}") | |
ref_fps = ref_info["r_frame_rate"] | |
ref_frames = int(ref_info["nb_frames"]) | |
ref_width = int(ref_info["width"]) | |
ref_height = int(ref_info["height"]) | |
# read the info of the distorted video | |
dist_info = get_video_stream_info_ffprobe(args.dist)["streams"][0] | |
logging.info(f"Distorted video info: {dist_info}") | |
dist_fps = dist_info["r_frame_rate"] | |
dist_frames = int(dist_info["nb_frames"]) | |
dist_width = int(dist_info["width"]) | |
dist_height = int(dist_info["height"]) | |
# check if the videos have the same resolution | |
if ref_width != dist_width or ref_height != dist_height: | |
logging.warning( | |
f"The videos have different resolutions: {ref_width}x{ref_height} vs {dist_width}x{dist_height}. The distorted video will be resampled to match the reference. This may produce incorrect results." | |
) | |
# check if the videos have the same framerate | |
if ref_fps != dist_fps: | |
logging.warning( | |
f"The videos have different framerates: {ref_fps} vs {dist_fps}. The distorted video will be resampled to match the reference. This may produce incorrect results." | |
) | |
# determine a single frame duration | |
if "/" in ref_fps: | |
def fraction_to_float(fraction: str) -> float: | |
""" | |
Convert a fraction string to a float. | |
Example: "30000/1001" -> 29.97002997002997 | |
Args: | |
fraction (str): The fraction string | |
Returns: | |
float: The float value | |
""" | |
numerator, denominator = map(float, fraction.split("/")) | |
return numerator / denominator | |
ref_fps = fraction_to_float(ref_fps) | |
if ref_frames < dist_frames: | |
logging.warning( | |
f"The reference video has fewer frames ({ref_frames}) than the distorted video ({dist_frames}). Will not consider the extra frames for the alignment." | |
) | |
# ============================================================================================ | |
# RUN THE ALGORITHM | |
begin_time = datetime.now() | |
found_offset, max_psnr = get_best_offset( | |
args.ref, | |
args.dist, | |
ref_fps, | |
ref_frames, | |
dist_frames, | |
max_search_offset=args.max_search_offset, | |
max_duration=args.max_duration, | |
start_offset=args.start_offset, | |
cpu_count=args.cpu_count, | |
) | |
end_time = datetime.now() | |
logging.info(f"Computation finished in {end_time - begin_time} seconds") | |
if args.start_offset > 0: | |
logging.info( | |
f"Found offset at frame {found_offset+args.start_offset} with PSNR {max_psnr}" | |
) | |
else: | |
logging.info(f"Found offset at frame {found_offset} with PSNR {max_psnr}") | |
# ============================================================================================ | |
# PRINT THE RESULTS AS JSON | |
print( | |
json.dumps( | |
{ | |
"date": datetime.now().isoformat(), | |
"reference": args.ref, | |
"distorted": args.dist, | |
"offset_frames": int(args.start_offset * ref_fps + found_offset), | |
"offset_seconds": (args.start_offset * ref_fps + found_offset) | |
* 1 | |
/ ref_fps, | |
"fps": ref_fps, | |
"max_psnr": max_psnr, | |
"settings": { | |
"max_search_offset": args.max_search_offset, | |
"start_offset": args.start_offset, | |
"max_duration": args.max_duration, | |
"cpu_count": args.cpu_count, | |
"compute_time": (end_time - begin_time).total_seconds(), | |
}, | |
}, | |
indent=2, | |
) | |
) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment