Last active
May 6, 2024 08:45
-
-
Save deveworld/de7fabafefb44e81ec43e1d425b7b285 to your computer and use it in GitHub Desktop.
Diff-SVC Audio Data Preprocess Python Script
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import glob | |
import parmap | |
import multiprocessing | |
import subprocess as sp | |
FFMPEG_BIN = "ffmpeg" | |
def encode(input, output_path): | |
command = [ | |
FFMPEG_BIN, | |
'-i', input, | |
'-v', 'error', | |
f'{output_path}/{os.path.basename(input).split(".")[0]}.wav' | |
] | |
sp.run(command) | |
if __name__ == '__main__': | |
cpus = multiprocessing.cpu_count() | |
output_path = "encoded" | |
files = glob.glob("*.mp4") | |
os.makedirs(output_path, exist_ok=True) | |
print("encoding...") | |
parmap.map(encode, files, output_path, pm_pbar=True, pm_processes=cpus) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import glob | |
import parmap | |
import multiprocessing | |
from pydub import AudioSegment | |
def detect_leading_silence(sound, silence_threshold=-30.0, chunk_size=10): | |
trim_ms = 0 | |
assert chunk_size > 0 | |
while sound[trim_ms:trim_ms+chunk_size].dBFS < silence_threshold and trim_ms < len(sound): | |
trim_ms += chunk_size | |
return trim_ms | |
def filter(file, output_path): | |
sound = AudioSegment.from_wav(file) | |
start_trim = detect_leading_silence(sound) | |
end_trim = detect_leading_silence(sound.reverse()) | |
duration = len(sound) | |
trimmed_sound = sound[start_trim:duration-end_trim] | |
if 8 <= trimmed_sound.duration_seconds <= 15: | |
trimmed_sound.export(os.path.join(output_path, os.path.basename(file)), format='wav') | |
if __name__ == '__main__': | |
cpus = multiprocessing.cpu_count() | |
output_path = "filtered" | |
files = glob.glob("splitted/*.wav") | |
os.makedirs(output_path, exist_ok=True) | |
print("filtering and saving...") | |
parmap.map(filter, files, output_path, pm_pbar=True, pm_processes=cpus) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import glob | |
import uuid | |
import parmap | |
import shutil | |
import multiprocessing | |
from hashlib import md5 | |
def hash(file, output_path, real_file_hash=False): | |
output = f"{output_path}/{md5(uuid.uuid4().bytes).hexdigest()}.wav" | |
if real_file_hash: | |
with open(file, "rb") as f: | |
file_hash = md5() | |
while chunk := f.read(8192): | |
file_hash.update(chunk) | |
output = f"{output_path}/{file_hash.hexdigest()}.wav" | |
shutil.copyfile(file, output) | |
if __name__ == '__main__': | |
cpus = multiprocessing.cpu_count() | |
output_path = "hashed" | |
files = glob.glob("filtered/*.wav") | |
os.makedirs(output_path, exist_ok=True) | |
print("hashing...") | |
parmap.map(hash, files, output_path, True, pm_pbar=True, pm_processes=cpus) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import glob | |
import parmap | |
import multiprocessing | |
import subprocess as sp | |
# https://gist.github.com/vi/2fe3eb63383fcfdad7483ac7c97e9deb | |
FFMPEG_BIN = "ffmpeg" | |
SD_PARAMS = "-30dB:d=0.3" | |
MIN_FRAGMENT_DURATION = "11" # 8, 9, 10, 11 | |
def split(input): | |
command = [ | |
FFMPEG_BIN, | |
'-v', 'error', | |
'-i', input, | |
'-af', f'silencedetect={SD_PARAMS},ametadata=mode=print:file=-:key=lavfi.silence_start', | |
'-vn', | |
'-sn', | |
'-f', 's16le', | |
'-y', os.devnull | |
] | |
pipe = sp.Popen(command, stdout=sp.PIPE) | |
outputs = pipe.communicate()[0] | |
results = outputs.decode().split("\n") | |
grep_cutted = [result.split("=")[1] for result in results if "lavfi.silence_start=" in result] | |
prev = 0.0 | |
splits = "" | |
for line in grep_cutted: | |
line = float(line.rstrip()) | |
if line - prev >= float(MIN_FRAGMENT_DURATION): | |
splits += f"{line}," | |
prev = line | |
return splits | |
def save(input, splits, output): | |
command = [ | |
FFMPEG_BIN, | |
'-v', 'error', | |
'-i', input, | |
'-c', 'copy', | |
'-map', '0', | |
'-f', 'segment', | |
'-segment_times', splits, | |
output | |
] | |
sp.run(command) | |
if __name__ == '__main__': | |
cpus = multiprocessing.cpu_count() | |
output_path = "splitted" | |
files = glob.glob("encoded/*.wav") | |
os.makedirs(output_path, exist_ok=True) | |
print("spliting...") | |
splits = parmap.map(split, files, pm_pbar=True, pm_processes=cpus) | |
print("saving...") | |
if cpus/2.0 == int(cpus/2.0): | |
cpus = int(cpus/2.0) | |
parmap.starmap( | |
save, | |
[(files[i], splits[i][:-1], f"./{output_path}/{os.path.basename(files[i]).split('.')[0]}.%03d.wav") for i in range(len(files))], | |
pm_pbar=True, | |
pm_processes=cpus | |
) |
The order of execution of the script is as follows:
- encode.py
- split.py
- filter.py
- hash.py
The results are stored in the encoded
, splitted
, filtered
, and hashed
folders.
The output you will eventually use can be found in hashed
.
If you can, rename all mp4(or wav) file.
In linux, you can use ls | cat -n | while read n f; do mv "$f" `printf "%04d.mp4" $n`; done
In windows, you can just rename multiple file with select all file.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Make sure you have
parmap
,pydub
andffmpeg
installed before running this Python script.pip install parmap pydub
sudo apt install ffmpeg