Skip to content

Instantly share code, notes, and snippets.

@Joel-hanson
Created July 11, 2021 06:41
Show Gist options
  • Save Joel-hanson/02a3a72f07ad580ea2ed7317d3d8b910 to your computer and use it in GitHub Desktop.
Save Joel-hanson/02a3a72f07ad580ea2ed7317d3d8b910 to your computer and use it in GitHub Desktop.
Video spliting problem
"""
- based no splits 1000 splits
- based on time eg 10s
- based on time interval 2:10 -> 2:50
- 10 people upload 1 gb 2 file daily
- we have design a system to split it into 1000 partitions
Solution:
1. A function to split the file based on the time interval
- even if the split is in numbers or seconds the split would happend based on the time interval
2. Handle 10 people with 2 files uploading the 1gb file
- We would have to use some async method to handle the split.
- DAG kind of approach would be good to it as it will process one after the other and can be async
Architecture:
- Input: The file should be got from each user
- processing: pass those file paths to the main function which does this split.
- output: return the split files as a array or if not in a zip format.
"""
import video_split_tool # TODO: have to check if this correct package
import numpy as np
import moviepy.editor
from threading import Lock, Thread
from queue import Empty, Queue
from moviepy.video.io.ffmpeg_tools import ffmpeg_extract_subclip
NUMBER_OF_FILES = 2
class Worker(Thread):
"""
This is a wrapper class around the inbuilt thread class, where you can
specify your own id to keep track of the threads
"""
def __init__(self, id, target, args):
super().__init__(target=target, args=args)
self.user_id = id
# Converts into more readable format
def convert_to_time_parts(seconds):
hours = seconds // 3600
seconds %= 3600
mins = seconds // 60
seconds %= 60
return hours, mins, seconds
def handle_spliting_of_files(video_file_path, split_duration_time_list):
"""[summary]
Split the video file based on the list of start time and end time
Args:
video_file_path ([file_path]): This will be a video file
split_duration_time_list ([list[tuples]]): This will be list of tuples with start time and end time
[(start, end)]
Returns:
[type]: Video partition files if the split list is provided else the actual file
"""
video_partitions = []
if split_duration_time_list and video_file_path:
for start_time, end_time in split_duration_time_list:
split_file = ffmpeg_extract_subclip(video_file_path, start_time, end_time, targetname=None)
video_partitions.append(split_file) # TODO: Need to check if the tool does this
# TODO: video partitions can be save to a s3 bucket or to any other storage provider
# Saving it as buffer or to temp folder would effect the storage
return video_partitions # This is for demonstartion purpose
return video_file_path
def provide_duration_split_based_on_interval(video_duration, interval_from, interval_to):
"""The video is split based on interval
Args:
video_duration (int): The video duration
interval_from (int): from interval.
interval_to (int): to interval.
Returns:
list: list of tuple with start time and end time
"""
if interval_to > video_duration:
interval_to = video_duration
if interval_from < 0:
interval_from = 0
split_duration_time_list = [(interval_from, interval_to)]
return split_duration_time_list
def split_durations(video_duration, number_of_spits):
split_list = list(np.linspace(0, video_duration, num=number_of_spits))
for index, value in enumerate(split_list):
if index % 2 == 0:
start_end_time_tuple.add(value)
split_duration_time_list.append(start_end_time_tuple)
start_end_time_tuple = ()
start_end_time_tuple.add(value)
if start_end_time_tuple: # Handle the case where the time duration split is not even
start_end_time_tuple.add(video_duration)
split_duration_time_list.append(start_end_time_tuple)
# Handle case where the last split is not complete with the actual video duration
last_duration_split = split_duration_time_list[-1]
if last_duration_split[1] != video_duration:
prev_end_time = split_duration_time_list[-2][1]
list_split = (prev_end_time, video_duration)
split_duration_time_list.append(list_split)
return split_duration_time_list
def provide_duration_split(video_duration, split=1000, split_type="count"):
"""This function will provide the list of tuples having the start time and end
Args:
video_duration ([int]): The duration of the video file
split (int): The number of splits or the duration at which it needs to split. Defaults to 1000.
split_type (str): What type of split are we taking here . Defaults to "count".
- count,
- time_duration
example:
[1, 2, 3, 4]
(1)
(1, 2)
()
(3)
(3, 4)
[(1, 2), (3, 4)]
"""
split_duration_time_list = []
start_end_time_tuple = ()
if split_type == "count":
split_duration_time_list = split_durations(video_duration, split)
else split_type == "time_duration":
number_of_splits = video_duration // split
split_duration_time_list = split_durations(video_duration, number_of_splits)
return split_duration_time_list
def get_video_duration(video_file_path):
# Create an object by passing the location as a string
video = moviepy.editor.VideoFileClip(video_file_path)
# Contains the duration of the video in terms of seconds
video_duration = float(video.duration)
# hours, mins, secs = convert_to_time_parts(video_duration)
return video_duration
def main(user_id):
# PHASE 1
video_file_path = input("Enter the video file")
split_type = input("Enter the video split type")
video_duration = get_video_duration(video_file_path)
if split_type == "interval":
start_time = float(input("Enter the video start interval")) # TODO: Need to handle the input type of string to number
end_time = float(input("Enter the video end interval")) # TODO: Need to handle the input type of string to number
split_duration_time_list = provide_duration_split_based_on_interval(video_duration, start_time, end_time)
elif split_type in ["count", "time_duration"]:
split_duration_time_list = provide_duration_split(video_duration)
else:
raise ValueError("Not an expected split type")
split_list = handle_spliting_of_files(video_file_path, split_duration_time_list)
# PHASE 2
global_lock = Lock()
events = Queue()
def custom_worker_function(user_id, video_file_path, split_duration_time_list):
while True:
print(user_id)
with global_lock:
events.put(handle_spliting_of_files(video_file_path, split_duration_time_list))
break
while True:
try:
event = events.get(timeout=3)
# if not event.success:
# return "The db execution failed two times for {}".format(event.node)
except Empty:
running_threads = [t for _, t in thread_dict.items() if t.is_alive()]
if not running_threads:
break
result.append(event.node)
worker = Worker(
user_id, handle_spliting_of_files, args=(user_id, video_file_path, split_duration_time_list)
)
worker.start()
def multiple_user():
# PHASE 2 - Handle multiple user and mulitple files
users = int(input("No. of users?"))
for user in range(users):
print("user {}:".format(user + 1))
for i in NUMBER_OF_FILES:
main(user_id)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment