Created
June 5, 2025 16:01
-
-
Save noel-friedrich/b78631292a769dd9a7418fc55653c636 to your computer and use it in GitHub Desktop.
An export of the ipython notebook used to create more MrBeast progress bar warpings! See https://youtu.be/QuR_3Mar-nI
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# %% | |
import cv2, math, os | |
from pathlib import Path | |
import matplotlib.pyplot as plt | |
import numpy as np | |
import scipy.signal | |
import shutil, subprocess | |
from scipy.io import wavfile | |
import skimage as ski | |
# %% | |
class VideoWrapper: | |
def __init__(self, video_file_path: Path, src_color_mode=cv2.COLOR_BGR2RGB): | |
self.video_capture = cv2.VideoCapture(video_file_path) | |
self.fps = self.video_capture.get(cv2.CAP_PROP_FPS) | |
self.src_color_mode = src_color_mode | |
def reset(self): | |
self.video_capture.set(cv2.CAP_PROP_POS_FRAMES, 0) | |
def videotime_to_frame(self, video_time): | |
if isinstance(video_time, str): | |
if video_time.count(":") == 1: | |
minutes, seconds = [int(s) for s in video_time.split(":")] | |
return self.seconds_to_frame(minutes * 60 + seconds) | |
elif video_time.count(":") == 2: | |
minutes, seconds, milliseconds = [int(s) for s in video_time.split(":")] | |
return self.seconds_to_frame(minutes * 60 + seconds + milliseconds / 1000) | |
else: | |
raise ValueError("Invalid videotime format!") | |
return self.seconds_to_frame(video_time) | |
def seconds_to_frame(self, seconds: float): | |
return math.floor(seconds * self.fps) | |
def iter_images(self, start_seconds=None, end_seconds=None): | |
frame_index = 0 | |
start_index = 0 | |
end_index = math.inf | |
if start_seconds is not None: | |
start_index = self.videotime_to_frame(start_seconds) | |
if end_seconds is not None: | |
end_index = self.videotime_to_frame(end_seconds) | |
self.video_capture.set(cv2.CAP_PROP_POS_FRAMES, start_index - 1) | |
success, image = self.video_capture.read() | |
frame_index = start_index | |
while success and frame_index <= end_index: | |
if frame_index >= start_index and frame_index <= end_index: | |
yield frame_index, cv2.cvtColor(image, self.src_color_mode) | |
success, image = self.video_capture.read() | |
frame_index += 1 | |
def get_image_at(self, seconds, frame_mode=False): | |
frame_index = self.videotime_to_frame(seconds) if not frame_mode else seconds | |
self.video_capture.set(cv2.CAP_PROP_POS_FRAMES, frame_index - 1) | |
raw_image = self.video_capture.read()[1] | |
return cv2.cvtColor(raw_image, self.src_color_mode) | |
class ProgressbarFinder: | |
def __init__(self, y=0.98, thresh=20): | |
self.progressbar_y = y | |
self.thresh = thresh | |
def measure(self, image, debug=False): | |
# assume the progress bar has constant color | |
y_index = math.floor(image.shape[0] * self.progressbar_y) | |
pixel_row = image[y_index].astype(np.float32) | |
first_color = pixel_row[0] | |
if debug: | |
distances = [np.linalg.norm(first_color - c) for c in pixel_row] | |
plt.scatter(np.arange(len(distances)), distances, c=pixel_row / 255) | |
for i, color in enumerate(pixel_row): | |
distance = np.linalg.norm(color - first_color) | |
if distance > self.thresh: | |
return i / (image.shape[1] - 1) | |
videos_directory = Path("videos") | |
video_file = videos_directory / "ages1-to-100.fullhd.mp4" | |
progress_bar_bounds = ("18:05:500", "18:48") | |
video_wrapper = VideoWrapper(video_file) | |
sample_image = video_wrapper.get_image_at(progress_bar_bounds[0]) | |
progressbar_finder = ProgressbarFinder(y=0.97, thresh=50) | |
progressbar_finder.measure(sample_image, debug=True) | |
# %% | |
progressbar_finder = ProgressbarFinder(y=0.99, thresh=30) | |
progress = np.array([[frame_index, progressbar_finder.measure(img)] for frame_index, img in video_wrapper.iter_images(*progress_bar_bounds)]) | |
plt.xlabel("Frame Index") | |
plt.ylabel("Progress Bar Progress") | |
plt.plot(progress[:, 0], progress[:, 1]) | |
progress[:, 1] = scipy.signal.savgol_filter(progress[:, 1], 30, 3) | |
plt.bar(progress[:, 0], progress[:, 1], color="yellow") | |
# %% | |
sample_frame = 33000 | |
plt.imsave("temp.png", video_wrapper.get_image_at(sample_frame, frame_mode=True)) | |
min_frame = video_wrapper.videotime_to_frame(progress_bar_bounds[0]) | |
max_frame = video_wrapper.videotime_to_frame(progress_bar_bounds[1]) | |
(sample_frame - min_frame) / (max_frame - min_frame) | |
# %% | |
real_progress = np.array([[progress[i, 0], i / (len(progress) - 1)] for i, _ in enumerate(progress[:, 0])]) | |
progress_wrongness = progress[:, 1] - real_progress[:, 1] | |
plt.xlabel("frame index") | |
plt.ylabel("progressbar-wrongness") | |
plt.bar(progress[:, 0], progress_wrongness, color="tomato") | |
plt.plot(progress[:, 0], progress_wrongness, color="red") | |
# %% | |
progress_velocity = np.gradient(progress[:, 1], progress[:, 0]) | |
plt.xlabel("frame index") | |
plt.ylabel("Progress Bar Speed") | |
plt.plot(progress_velocity, color="red") | |
# %% | |
# horizontal stretch | |
temp_directory_path = Path("_temp_imagedir") | |
if not os.path.exists(temp_directory_path): | |
os.mkdir(temp_directory_path) | |
progress_from_index_map = { | |
index: progress_value | |
for index, progress_value in progress | |
} | |
def horizontally_stretch_image(image, progressbar_value, real_progress_value): | |
def get_source_x(new_x): | |
if new_x <= real_progress_value: | |
return (new_x / real_progress_value) * progressbar_value | |
else: | |
return (new_x - real_progress_value) / (1 - real_progress_value) * (1 - progressbar_value) + progressbar_value | |
plt.plot([get_source_x(x) for x in np.linspace(0, 1, 100)]) | |
image_width = image.shape[1] | |
image_height = image.shape[0] | |
src_points = [] | |
dst_points = [] | |
for y in [0, image_height - 1]: | |
src_points.append([0, y]) | |
dst_points.append([0, y]) | |
src_points.append([image_width - 1, y]) | |
dst_points.append([image_width - 1, y]) | |
dst_points.append([progressbar_value * image_width, y]) | |
src_points.append([real_progress_value * image_width, y]) | |
transform = ski.transform.PiecewiseAffineTransform() | |
transform.estimate(src_points, dst_points) | |
output_image = ski.transform.warp(image, transform) | |
return output_image | |
# # test transform | |
# sample_frame_index = 32650 | |
# sample_image = video_wrapper.get_image_at(sample_frame_index, frame_mode=True) | |
# output_image = horizontally_stretch_image(sample_image, progress_from_index_map[sample_frame_index], 0.1) | |
# plt.imshow(output_image) | |
num_total_images = len(progress) | |
image_count = 0 | |
for frame_index, image in video_wrapper.iter_images(*progress_bar_bounds): | |
image_count += 1 | |
real_progress = image_count / (num_total_images - 1) | |
progress_value = progress_from_index_map[frame_index] | |
transformed_image = horizontally_stretch_image(image, progress_value, real_progress) | |
plt.imsave(temp_directory_path / f"{frame_index:06}.jpg", transformed_image) | |
print(f"{image_count}/{num_total_images} ", end="\r") | |
print(f"finished making {image_count} images") | |
# %% | |
# sharpness madness filter | |
temp_directory_path = Path("_temp_imagedir") | |
if not os.path.exists(temp_directory_path): | |
os.mkdir(temp_directory_path) | |
progress_from_index_map = { | |
index: progress_value | |
for index, progress_value in progress | |
} | |
max_wrongness = max(progress_wrongness) | |
def sharpen_image_filter(image, progressbar_value, real_progress_value): | |
diff = abs(progressbar_value - real_progress_value) / max_wrongness | |
return ski.filters.unsharp_mask(image, radius=30, amount=diff * 10.0) | |
# # test transform | |
# sample_frame_index = 32650 | |
# sample_image = video_wrapper.get_image_at(sample_frame_index, frame_mode=True) | |
# output_image = sharpen_image_filter(sample_image, progress_from_index_map[sample_frame_index], 0.5) | |
# plt.imshow(output_image) | |
num_total_images = len(progress) | |
image_count = 0 | |
for frame_index, image in video_wrapper.iter_images(*progress_bar_bounds): | |
# oopsie. TODO: Remove | |
image_count += 1 | |
if image_count < 50: | |
continue | |
real_progress = image_count / (num_total_images - 1) | |
progress_value = progress_from_index_map[frame_index] | |
transformed_image = sharpen_image_filter(image, progress_value, real_progress) | |
plt.imsave(temp_directory_path / f"{frame_index:06}.jpg", transformed_image) | |
print(f"{image_count}/{num_total_images} ", end="\r") | |
print(f"finished making {image_count} images") | |
# %% | |
# constant stretch | |
from scipy.ndimage import uniform_filter1d | |
temp_directory_path = Path("_temp_imagedir") | |
if not os.path.exists(temp_directory_path): | |
os.mkdir(temp_directory_path) | |
progress_from_index_map = { | |
index: progress_value | |
for index, progress_value in progress | |
} | |
sample_frame_index = 32650 | |
sample_image = video_wrapper.get_image_at(sample_frame_index, frame_mode=True) | |
image_width = sample_image.shape[1] | |
def inverse_frame_map(progress_level): | |
best_frame_index = None | |
best_distance = math.inf | |
for i, (_, curr_progress) in enumerate(progress): | |
frame_index = i / (len(progress) - 1) | |
distance = abs(curr_progress - progress_level) | |
if distance < best_distance: | |
best_frame_index = frame_index | |
best_distance = distance | |
return math.floor(best_frame_index * image_width) | |
min_frame_index = min(progress[:, 1]) | |
max_frame_index = max(progress[:, 1]) | |
normalised_progress = (progress[:, 1] - min_frame_index) / max_frame_index | |
inverse_progress_map = [ | |
# inverse_frame_map(x_index / (image_width - 1)) | |
math.floor(normalised_progress[math.floor(x_index / image_width * len(normalised_progress))] * (image_width - 1)) | |
for x_index in range(image_width) | |
] | |
def constant_stretch_filter(image): | |
return image[:, inverse_progress_map] | |
# test transform | |
# output_image = constant_stretch_filter(sample_image) | |
# plt.imshow(output_image) | |
num_total_images = len(progress) | |
image_count = 0 | |
for frame_index, image in video_wrapper.iter_images(*progress_bar_bounds): | |
image_count += 1 | |
transformed_image = constant_stretch_filter(image) | |
plt.imsave(temp_directory_path / f"{frame_index:06}.jpg", transformed_image) | |
print(f"{image_count}/{num_total_images} ", end="\r") | |
print(f"finished making {image_count} images") | |
# %% | |
# hydraulic press filter | |
temp_directory_path = Path("_temp_imagedir") | |
if not os.path.exists(temp_directory_path): | |
os.mkdir(temp_directory_path) | |
progress_from_index_map = { | |
index: progress_value | |
for index, progress_value in progress | |
} | |
def horizontally_stretch_image(image, progressbar_value, real_progress_value): | |
stretch_factor = real_progress_value / progressbar_value | |
image_width = image.shape[1] | |
image_height = image.shape[0] | |
src_points = [] | |
dst_points = [] | |
for y in [0, image_height - 1]: | |
src_points.append([0, y]) | |
dst_points.append([0, y]) | |
src_points.append([(image_width - 1) * stretch_factor, y]) | |
dst_points.append([image_width - 1, y]) | |
transform = ski.transform.PiecewiseAffineTransform() | |
transform.estimate(src_points, dst_points) | |
output_image = ski.transform.warp(image, transform) | |
return output_image | |
# # test transform | |
# sample_frame_index = 32800 | |
# sample_image = video_wrapper.get_image_at(sample_frame_index, frame_mode=True) | |
# output_image = horizontally_stretch_image(sample_image, progress_from_index_map[sample_frame_index], 0.1) | |
# plt.imshow(output_image) | |
num_total_images = len(progress) | |
image_count = 0 | |
for frame_index, image in video_wrapper.iter_images(*progress_bar_bounds): | |
image_count += 1 | |
real_progress = image_count / (num_total_images - 1) | |
progress_value = progress_from_index_map[frame_index] | |
transformed_image = horizontally_stretch_image(image, progress_value, real_progress) | |
plt.imsave(temp_directory_path / f"{frame_index:06}.jpg", transformed_image) | |
print(f"{image_count}/{num_total_images} ", end="\r") | |
print(f"finished making {image_count} images") | |
# %% | |
# photocopier filter | |
temp_directory_path = Path("_temp_imagedir") | |
if not os.path.exists(temp_directory_path): | |
os.mkdir(temp_directory_path) | |
progress_from_index_map = { | |
index: progress_value | |
for index, progress_value in progress | |
} | |
sample_image = video_wrapper.get_image_at(0) | |
photocopier_image = np.zeros(sample_image.shape, dtype=sample_image.dtype) | |
photocopier_x_index = 0 | |
yellow_line = np.array([[255, 255, 0] for i in range(photocopier_image.shape[0])]) | |
yellow_line_width = 5 | |
def photocopier_filter(image, progressbar_value): | |
global photocopier_x_index | |
prev_x = photocopier_x_index | |
photocopier_x_index = max(math.floor(progressbar_value * photocopier_image.shape[1]) - 4, 0) | |
for x_index in range(prev_x, photocopier_x_index): | |
photocopier_image[:, x_index] = image[:, x_index] | |
for x_index in range(photocopier_x_index): | |
image[:, x_index] = photocopier_image[:, x_index] | |
for offset in range(yellow_line_width): | |
image[:, photocopier_x_index + offset] = yellow_line | |
return image | |
# # test transform | |
# sample_frame_index = 32800 | |
# sample_image = video_wrapper.get_image_at(sample_frame_index, frame_mode=True) | |
# output_image = horizontally_stretch_image(sample_image, progress_from_index_map[sample_frame_index], 0.1) | |
# plt.imshow(output_image) | |
num_total_images = len(progress) | |
image_count = 0 | |
for frame_index, image in video_wrapper.iter_images(*progress_bar_bounds): | |
image_count += 1 | |
progress_value = progress_from_index_map[frame_index] | |
transformed_image = photocopier_filter(image, progress_value) | |
plt.imsave(temp_directory_path / f"{frame_index:06}.jpg", transformed_image) | |
print(f"{image_count}/{num_total_images} ", end="\r") | |
print(f"finished making {image_count} images") | |
# %% | |
video_output_path = Path("temp_video.mp4") | |
images = [img for img in os.listdir(temp_directory_path)] | |
images.sort() | |
frame = cv2.imread(os.path.join(temp_directory_path, images[0])) | |
height, width, layers = frame.shape | |
video = cv2.VideoWriter(video_output_path, 0, video_wrapper.fps, (width, height)) | |
for i, image in enumerate(images): | |
video.write(cv2.imread(os.path.join(temp_directory_path, image))) | |
print(f"{i}/{len(images) - 1}", end="\r") | |
cv2.destroyAllWindows() | |
video.release() | |
shutil.rmtree(temp_directory_path) | |
print(f"Exported {video_output_path} successfully.") | |
# %% | |
audio_output_path = Path("temp_audio.wav") | |
start_frame_index = video_wrapper.videotime_to_frame(progress_bar_bounds[0]) | |
end_frame_index = video_wrapper.videotime_to_frame(progress_bar_bounds[1]) | |
start_seconds = start_frame_index / video_wrapper.fps | |
end_seconds = end_frame_index / video_wrapper.fps | |
ffmpeg_cmd = [ | |
"ffmpeg", | |
"-y", # overwrite if exists | |
"-i", str(video_file), | |
"-ss", str(start_seconds), | |
"-to", str(end_seconds), | |
"-vn", # no video | |
"-acodec", "pcm_s16le", | |
"-ar", "44100", # You can choose a different sample rate if desired | |
"-ac", "2", # 2-channel stereo | |
str(audio_output_path) | |
] | |
subprocess.run(ffmpeg_cmd, check=True) | |
print(f"audio written to: {audio_output_path.resolve()}") | |
# %% | |
final_output_path = Path("outputs") / Path("agevideo_tempname.mp4") | |
cmd = [ | |
"ffmpeg", | |
"-y", # overwrite if final_output.mp4 already exists | |
"-i", str(video_output_path), | |
"-i", str(audio_output_path), | |
"-c:v", "copy", | |
"-c:a", "aac", | |
"-map", "0:v:0", | |
"-map", "1:a:0", | |
"-shortest", | |
str(final_output_path) | |
] | |
subprocess.run(cmd, check=True) | |
print(f"Combined video+audio written to: {final_output_path.resolve()}") | |
# %% | |
os.remove(audio_output_path) | |
os.remove(video_output_path) | |
print("also deleted temp audio files") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment