Skip to content

Instantly share code, notes, and snippets.

@deveworld
Last active May 6, 2024 08:45
Show Gist options
  • Save deveworld/de7fabafefb44e81ec43e1d425b7b285 to your computer and use it in GitHub Desktop.
Save deveworld/de7fabafefb44e81ec43e1d425b7b285 to your computer and use it in GitHub Desktop.
Diff-SVC Audio Data Preprocess Python Script
import os
import glob
import parmap
import multiprocessing
import subprocess as sp
FFMPEG_BIN = "ffmpeg"
def encode(input, output_path):
command = [
FFMPEG_BIN,
'-i', input,
'-v', 'error',
f'{output_path}/{os.path.basename(input).split(".")[0]}.wav'
]
sp.run(command)
if __name__ == '__main__':
cpus = multiprocessing.cpu_count()
output_path = "encoded"
files = glob.glob("*.mp4")
os.makedirs(output_path, exist_ok=True)
print("encoding...")
parmap.map(encode, files, output_path, pm_pbar=True, pm_processes=cpus)
import os
import glob
import parmap
import multiprocessing
from pydub import AudioSegment
def detect_leading_silence(sound, silence_threshold=-30.0, chunk_size=10):
trim_ms = 0
assert chunk_size > 0
while sound[trim_ms:trim_ms+chunk_size].dBFS < silence_threshold and trim_ms < len(sound):
trim_ms += chunk_size
return trim_ms
def filter(file, output_path):
sound = AudioSegment.from_wav(file)
start_trim = detect_leading_silence(sound)
end_trim = detect_leading_silence(sound.reverse())
duration = len(sound)
trimmed_sound = sound[start_trim:duration-end_trim]
if 8 <= trimmed_sound.duration_seconds <= 15:
trimmed_sound.export(os.path.join(output_path, os.path.basename(file)), format='wav')
if __name__ == '__main__':
cpus = multiprocessing.cpu_count()
output_path = "filtered"
files = glob.glob("splitted/*.wav")
os.makedirs(output_path, exist_ok=True)
print("filtering and saving...")
parmap.map(filter, files, output_path, pm_pbar=True, pm_processes=cpus)
import os
import glob
import uuid
import parmap
import shutil
import multiprocessing
from hashlib import md5
def hash(file, output_path, real_file_hash=False):
output = f"{output_path}/{md5(uuid.uuid4().bytes).hexdigest()}.wav"
if real_file_hash:
with open(file, "rb") as f:
file_hash = md5()
while chunk := f.read(8192):
file_hash.update(chunk)
output = f"{output_path}/{file_hash.hexdigest()}.wav"
shutil.copyfile(file, output)
if __name__ == '__main__':
cpus = multiprocessing.cpu_count()
output_path = "hashed"
files = glob.glob("filtered/*.wav")
os.makedirs(output_path, exist_ok=True)
print("hashing...")
parmap.map(hash, files, output_path, True, pm_pbar=True, pm_processes=cpus)
import os
import glob
import parmap
import multiprocessing
import subprocess as sp
# https://gist.github.com/vi/2fe3eb63383fcfdad7483ac7c97e9deb
FFMPEG_BIN = "ffmpeg"
SD_PARAMS = "-30dB:d=0.3"
MIN_FRAGMENT_DURATION = "11" # 8, 9, 10, 11
def split(input):
command = [
FFMPEG_BIN,
'-v', 'error',
'-i', input,
'-af', f'silencedetect={SD_PARAMS},ametadata=mode=print:file=-:key=lavfi.silence_start',
'-vn',
'-sn',
'-f', 's16le',
'-y', os.devnull
]
pipe = sp.Popen(command, stdout=sp.PIPE)
outputs = pipe.communicate()[0]
results = outputs.decode().split("\n")
grep_cutted = [result.split("=")[1] for result in results if "lavfi.silence_start=" in result]
prev = 0.0
splits = ""
for line in grep_cutted:
line = float(line.rstrip())
if line - prev >= float(MIN_FRAGMENT_DURATION):
splits += f"{line},"
prev = line
return splits
def save(input, splits, output):
command = [
FFMPEG_BIN,
'-v', 'error',
'-i', input,
'-c', 'copy',
'-map', '0',
'-f', 'segment',
'-segment_times', splits,
output
]
sp.run(command)
if __name__ == '__main__':
cpus = multiprocessing.cpu_count()
output_path = "splitted"
files = glob.glob("encoded/*.wav")
os.makedirs(output_path, exist_ok=True)
print("spliting...")
splits = parmap.map(split, files, pm_pbar=True, pm_processes=cpus)
print("saving...")
if cpus/2.0 == int(cpus/2.0):
cpus = int(cpus/2.0)
parmap.starmap(
save,
[(files[i], splits[i][:-1], f"./{output_path}/{os.path.basename(files[i]).split('.')[0]}.%03d.wav") for i in range(len(files))],
pm_pbar=True,
pm_processes=cpus
)
@deveworld
Copy link
Author

deveworld commented May 5, 2024

Make sure you have parmap, pydub and ffmpeg installed before running this Python script.

pip install parmap pydub
sudo apt install ffmpeg

@deveworld
Copy link
Author

The order of execution of the script is as follows:

  • encode.py
  • split.py
  • filter.py
  • hash.py

@deveworld
Copy link
Author

The results are stored in the encoded, splitted, filtered, and hashed folders.
The output you will eventually use can be found in hashed.

@deveworld
Copy link
Author

If you can, rename all mp4(or wav) file.
In linux, you can use ls | cat -n | while read n f; do mv "$f" `printf "%04d.mp4" $n`; done
In windows, you can just rename multiple file with select all file.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment