Skip to content

Instantly share code, notes, and snippets.

@noel-friedrich
Created June 5, 2025 16:01
Show Gist options
  • Save noel-friedrich/b78631292a769dd9a7418fc55653c636 to your computer and use it in GitHub Desktop.
Save noel-friedrich/b78631292a769dd9a7418fc55653c636 to your computer and use it in GitHub Desktop.
An export of the ipython notebook used to create more MrBeast progress bar warpings! See https://youtu.be/QuR_3Mar-nI
# %%
import cv2, math, os
from pathlib import Path
import matplotlib.pyplot as plt
import numpy as np
import scipy.signal
import shutil, subprocess
from scipy.io import wavfile
import skimage as ski
# %%
class VideoWrapper:
def __init__(self, video_file_path: Path, src_color_mode=cv2.COLOR_BGR2RGB):
self.video_capture = cv2.VideoCapture(video_file_path)
self.fps = self.video_capture.get(cv2.CAP_PROP_FPS)
self.src_color_mode = src_color_mode
def reset(self):
self.video_capture.set(cv2.CAP_PROP_POS_FRAMES, 0)
def videotime_to_frame(self, video_time):
if isinstance(video_time, str):
if video_time.count(":") == 1:
minutes, seconds = [int(s) for s in video_time.split(":")]
return self.seconds_to_frame(minutes * 60 + seconds)
elif video_time.count(":") == 2:
minutes, seconds, milliseconds = [int(s) for s in video_time.split(":")]
return self.seconds_to_frame(minutes * 60 + seconds + milliseconds / 1000)
else:
raise ValueError("Invalid videotime format!")
return self.seconds_to_frame(video_time)
def seconds_to_frame(self, seconds: float):
return math.floor(seconds * self.fps)
def iter_images(self, start_seconds=None, end_seconds=None):
frame_index = 0
start_index = 0
end_index = math.inf
if start_seconds is not None:
start_index = self.videotime_to_frame(start_seconds)
if end_seconds is not None:
end_index = self.videotime_to_frame(end_seconds)
self.video_capture.set(cv2.CAP_PROP_POS_FRAMES, start_index - 1)
success, image = self.video_capture.read()
frame_index = start_index
while success and frame_index <= end_index:
if frame_index >= start_index and frame_index <= end_index:
yield frame_index, cv2.cvtColor(image, self.src_color_mode)
success, image = self.video_capture.read()
frame_index += 1
def get_image_at(self, seconds, frame_mode=False):
frame_index = self.videotime_to_frame(seconds) if not frame_mode else seconds
self.video_capture.set(cv2.CAP_PROP_POS_FRAMES, frame_index - 1)
raw_image = self.video_capture.read()[1]
return cv2.cvtColor(raw_image, self.src_color_mode)
class ProgressbarFinder:
def __init__(self, y=0.98, thresh=20):
self.progressbar_y = y
self.thresh = thresh
def measure(self, image, debug=False):
# assume the progress bar has constant color
y_index = math.floor(image.shape[0] * self.progressbar_y)
pixel_row = image[y_index].astype(np.float32)
first_color = pixel_row[0]
if debug:
distances = [np.linalg.norm(first_color - c) for c in pixel_row]
plt.scatter(np.arange(len(distances)), distances, c=pixel_row / 255)
for i, color in enumerate(pixel_row):
distance = np.linalg.norm(color - first_color)
if distance > self.thresh:
return i / (image.shape[1] - 1)
videos_directory = Path("videos")
video_file = videos_directory / "ages1-to-100.fullhd.mp4"
progress_bar_bounds = ("18:05:500", "18:48")
video_wrapper = VideoWrapper(video_file)
sample_image = video_wrapper.get_image_at(progress_bar_bounds[0])
progressbar_finder = ProgressbarFinder(y=0.97, thresh=50)
progressbar_finder.measure(sample_image, debug=True)
# %%
progressbar_finder = ProgressbarFinder(y=0.99, thresh=30)
progress = np.array([[frame_index, progressbar_finder.measure(img)] for frame_index, img in video_wrapper.iter_images(*progress_bar_bounds)])
plt.xlabel("Frame Index")
plt.ylabel("Progress Bar Progress")
plt.plot(progress[:, 0], progress[:, 1])
progress[:, 1] = scipy.signal.savgol_filter(progress[:, 1], 30, 3)
plt.bar(progress[:, 0], progress[:, 1], color="yellow")
# %%
sample_frame = 33000
plt.imsave("temp.png", video_wrapper.get_image_at(sample_frame, frame_mode=True))
min_frame = video_wrapper.videotime_to_frame(progress_bar_bounds[0])
max_frame = video_wrapper.videotime_to_frame(progress_bar_bounds[1])
(sample_frame - min_frame) / (max_frame - min_frame)
# %%
real_progress = np.array([[progress[i, 0], i / (len(progress) - 1)] for i, _ in enumerate(progress[:, 0])])
progress_wrongness = progress[:, 1] - real_progress[:, 1]
plt.xlabel("frame index")
plt.ylabel("progressbar-wrongness")
plt.bar(progress[:, 0], progress_wrongness, color="tomato")
plt.plot(progress[:, 0], progress_wrongness, color="red")
# %%
progress_velocity = np.gradient(progress[:, 1], progress[:, 0])
plt.xlabel("frame index")
plt.ylabel("Progress Bar Speed")
plt.plot(progress_velocity, color="red")
# %%
# horizontal stretch
temp_directory_path = Path("_temp_imagedir")
if not os.path.exists(temp_directory_path):
os.mkdir(temp_directory_path)
progress_from_index_map = {
index: progress_value
for index, progress_value in progress
}
def horizontally_stretch_image(image, progressbar_value, real_progress_value):
def get_source_x(new_x):
if new_x <= real_progress_value:
return (new_x / real_progress_value) * progressbar_value
else:
return (new_x - real_progress_value) / (1 - real_progress_value) * (1 - progressbar_value) + progressbar_value
plt.plot([get_source_x(x) for x in np.linspace(0, 1, 100)])
image_width = image.shape[1]
image_height = image.shape[0]
src_points = []
dst_points = []
for y in [0, image_height - 1]:
src_points.append([0, y])
dst_points.append([0, y])
src_points.append([image_width - 1, y])
dst_points.append([image_width - 1, y])
dst_points.append([progressbar_value * image_width, y])
src_points.append([real_progress_value * image_width, y])
transform = ski.transform.PiecewiseAffineTransform()
transform.estimate(src_points, dst_points)
output_image = ski.transform.warp(image, transform)
return output_image
# # test transform
# sample_frame_index = 32650
# sample_image = video_wrapper.get_image_at(sample_frame_index, frame_mode=True)
# output_image = horizontally_stretch_image(sample_image, progress_from_index_map[sample_frame_index], 0.1)
# plt.imshow(output_image)
num_total_images = len(progress)
image_count = 0
for frame_index, image in video_wrapper.iter_images(*progress_bar_bounds):
image_count += 1
real_progress = image_count / (num_total_images - 1)
progress_value = progress_from_index_map[frame_index]
transformed_image = horizontally_stretch_image(image, progress_value, real_progress)
plt.imsave(temp_directory_path / f"{frame_index:06}.jpg", transformed_image)
print(f"{image_count}/{num_total_images} ", end="\r")
print(f"finished making {image_count} images")
# %%
# sharpness madness filter
temp_directory_path = Path("_temp_imagedir")
if not os.path.exists(temp_directory_path):
os.mkdir(temp_directory_path)
progress_from_index_map = {
index: progress_value
for index, progress_value in progress
}
max_wrongness = max(progress_wrongness)
def sharpen_image_filter(image, progressbar_value, real_progress_value):
diff = abs(progressbar_value - real_progress_value) / max_wrongness
return ski.filters.unsharp_mask(image, radius=30, amount=diff * 10.0)
# # test transform
# sample_frame_index = 32650
# sample_image = video_wrapper.get_image_at(sample_frame_index, frame_mode=True)
# output_image = sharpen_image_filter(sample_image, progress_from_index_map[sample_frame_index], 0.5)
# plt.imshow(output_image)
num_total_images = len(progress)
image_count = 0
for frame_index, image in video_wrapper.iter_images(*progress_bar_bounds):
# oopsie. TODO: Remove
image_count += 1
if image_count < 50:
continue
real_progress = image_count / (num_total_images - 1)
progress_value = progress_from_index_map[frame_index]
transformed_image = sharpen_image_filter(image, progress_value, real_progress)
plt.imsave(temp_directory_path / f"{frame_index:06}.jpg", transformed_image)
print(f"{image_count}/{num_total_images} ", end="\r")
print(f"finished making {image_count} images")
# %%
# constant stretch
from scipy.ndimage import uniform_filter1d
temp_directory_path = Path("_temp_imagedir")
if not os.path.exists(temp_directory_path):
os.mkdir(temp_directory_path)
progress_from_index_map = {
index: progress_value
for index, progress_value in progress
}
sample_frame_index = 32650
sample_image = video_wrapper.get_image_at(sample_frame_index, frame_mode=True)
image_width = sample_image.shape[1]
def inverse_frame_map(progress_level):
best_frame_index = None
best_distance = math.inf
for i, (_, curr_progress) in enumerate(progress):
frame_index = i / (len(progress) - 1)
distance = abs(curr_progress - progress_level)
if distance < best_distance:
best_frame_index = frame_index
best_distance = distance
return math.floor(best_frame_index * image_width)
min_frame_index = min(progress[:, 1])
max_frame_index = max(progress[:, 1])
normalised_progress = (progress[:, 1] - min_frame_index) / max_frame_index
inverse_progress_map = [
# inverse_frame_map(x_index / (image_width - 1))
math.floor(normalised_progress[math.floor(x_index / image_width * len(normalised_progress))] * (image_width - 1))
for x_index in range(image_width)
]
def constant_stretch_filter(image):
return image[:, inverse_progress_map]
# test transform
# output_image = constant_stretch_filter(sample_image)
# plt.imshow(output_image)
num_total_images = len(progress)
image_count = 0
for frame_index, image in video_wrapper.iter_images(*progress_bar_bounds):
image_count += 1
transformed_image = constant_stretch_filter(image)
plt.imsave(temp_directory_path / f"{frame_index:06}.jpg", transformed_image)
print(f"{image_count}/{num_total_images} ", end="\r")
print(f"finished making {image_count} images")
# %%
# hydraulic press filter
temp_directory_path = Path("_temp_imagedir")
if not os.path.exists(temp_directory_path):
os.mkdir(temp_directory_path)
progress_from_index_map = {
index: progress_value
for index, progress_value in progress
}
def horizontally_stretch_image(image, progressbar_value, real_progress_value):
stretch_factor = real_progress_value / progressbar_value
image_width = image.shape[1]
image_height = image.shape[0]
src_points = []
dst_points = []
for y in [0, image_height - 1]:
src_points.append([0, y])
dst_points.append([0, y])
src_points.append([(image_width - 1) * stretch_factor, y])
dst_points.append([image_width - 1, y])
transform = ski.transform.PiecewiseAffineTransform()
transform.estimate(src_points, dst_points)
output_image = ski.transform.warp(image, transform)
return output_image
# # test transform
# sample_frame_index = 32800
# sample_image = video_wrapper.get_image_at(sample_frame_index, frame_mode=True)
# output_image = horizontally_stretch_image(sample_image, progress_from_index_map[sample_frame_index], 0.1)
# plt.imshow(output_image)
num_total_images = len(progress)
image_count = 0
for frame_index, image in video_wrapper.iter_images(*progress_bar_bounds):
image_count += 1
real_progress = image_count / (num_total_images - 1)
progress_value = progress_from_index_map[frame_index]
transformed_image = horizontally_stretch_image(image, progress_value, real_progress)
plt.imsave(temp_directory_path / f"{frame_index:06}.jpg", transformed_image)
print(f"{image_count}/{num_total_images} ", end="\r")
print(f"finished making {image_count} images")
# %%
# photocopier filter
temp_directory_path = Path("_temp_imagedir")
if not os.path.exists(temp_directory_path):
os.mkdir(temp_directory_path)
progress_from_index_map = {
index: progress_value
for index, progress_value in progress
}
sample_image = video_wrapper.get_image_at(0)
photocopier_image = np.zeros(sample_image.shape, dtype=sample_image.dtype)
photocopier_x_index = 0
yellow_line = np.array([[255, 255, 0] for i in range(photocopier_image.shape[0])])
yellow_line_width = 5
def photocopier_filter(image, progressbar_value):
global photocopier_x_index
prev_x = photocopier_x_index
photocopier_x_index = max(math.floor(progressbar_value * photocopier_image.shape[1]) - 4, 0)
for x_index in range(prev_x, photocopier_x_index):
photocopier_image[:, x_index] = image[:, x_index]
for x_index in range(photocopier_x_index):
image[:, x_index] = photocopier_image[:, x_index]
for offset in range(yellow_line_width):
image[:, photocopier_x_index + offset] = yellow_line
return image
# # test transform
# sample_frame_index = 32800
# sample_image = video_wrapper.get_image_at(sample_frame_index, frame_mode=True)
# output_image = horizontally_stretch_image(sample_image, progress_from_index_map[sample_frame_index], 0.1)
# plt.imshow(output_image)
num_total_images = len(progress)
image_count = 0
for frame_index, image in video_wrapper.iter_images(*progress_bar_bounds):
image_count += 1
progress_value = progress_from_index_map[frame_index]
transformed_image = photocopier_filter(image, progress_value)
plt.imsave(temp_directory_path / f"{frame_index:06}.jpg", transformed_image)
print(f"{image_count}/{num_total_images} ", end="\r")
print(f"finished making {image_count} images")
# %%
video_output_path = Path("temp_video.mp4")
images = [img for img in os.listdir(temp_directory_path)]
images.sort()
frame = cv2.imread(os.path.join(temp_directory_path, images[0]))
height, width, layers = frame.shape
video = cv2.VideoWriter(video_output_path, 0, video_wrapper.fps, (width, height))
for i, image in enumerate(images):
video.write(cv2.imread(os.path.join(temp_directory_path, image)))
print(f"{i}/{len(images) - 1}", end="\r")
cv2.destroyAllWindows()
video.release()
shutil.rmtree(temp_directory_path)
print(f"Exported {video_output_path} successfully.")
# %%
audio_output_path = Path("temp_audio.wav")
start_frame_index = video_wrapper.videotime_to_frame(progress_bar_bounds[0])
end_frame_index = video_wrapper.videotime_to_frame(progress_bar_bounds[1])
start_seconds = start_frame_index / video_wrapper.fps
end_seconds = end_frame_index / video_wrapper.fps
ffmpeg_cmd = [
"ffmpeg",
"-y", # overwrite if exists
"-i", str(video_file),
"-ss", str(start_seconds),
"-to", str(end_seconds),
"-vn", # no video
"-acodec", "pcm_s16le",
"-ar", "44100", # You can choose a different sample rate if desired
"-ac", "2", # 2-channel stereo
str(audio_output_path)
]
subprocess.run(ffmpeg_cmd, check=True)
print(f"audio written to: {audio_output_path.resolve()}")
# %%
final_output_path = Path("outputs") / Path("agevideo_tempname.mp4")
cmd = [
"ffmpeg",
"-y", # overwrite if final_output.mp4 already exists
"-i", str(video_output_path),
"-i", str(audio_output_path),
"-c:v", "copy",
"-c:a", "aac",
"-map", "0:v:0",
"-map", "1:a:0",
"-shortest",
str(final_output_path)
]
subprocess.run(cmd, check=True)
print(f"Combined video+audio written to: {final_output_path.resolve()}")
# %%
os.remove(audio_output_path)
os.remove(video_output_path)
print("also deleted temp audio files")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment