Last active
July 21, 2025 15:55
-
-
Save markpbaggett/eb4102592c5bdbe93a54297dc1392882 to your computer and use it in GitHub Desktop.
Download Best Files from an Avalon Work and Prep for Whisper
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import os | |
from tqdm import tqdm | |
import math | |
import subprocess | |
from subprocess import CalledProcessError | |
from csv import DictWriter | |
class AvalonBase: | |
def __init__(self, prod_or_pre="pre"): | |
self.key = self.__get_key(prod_or_pre) | |
self.headers = { | |
"Avalon-Api-Key": self.key | |
} | |
self.base = self.__set_prod_or_pre(prod_or_pre) | |
@staticmethod | |
def __get_key(prod_or_pre): | |
if prod_or_pre == "prod": | |
return os.getenv("AVALON_PROD") | |
else: | |
return os.getenv("AVALON_PRE") | |
@staticmethod | |
def __set_prod_or_pre(environment): | |
if environment == "prod": | |
return "https://avalon.library.tamu.edu" | |
else: | |
return "https://avalon-pre.library.tamu.edu" | |
def get(self, url): | |
response = requests.get( | |
url, headers=self.headers | |
) | |
return response.json() | |
class AvalonCollection(AvalonBase): | |
def __init__(self, identifier, prod_or_pre="pre"): | |
super().__init__(prod_or_pre) | |
self.identifier = identifier | |
def get_collection(self): | |
url = f"{self.base}/admin/collections/{self.identifier}.json" | |
return self.get(url) | |
def get_items(self, verbose=True, page=None, per_page=5): | |
if not page: | |
url = f"{self.base}/admin/collections/{self.identifier}/items.json" | |
else: | |
url = f"{self.base}/admin/collections/{self.identifier}/items.json?per_page={per_page}&page={page}" | |
if verbose: | |
return self.get(url) | |
else: | |
return list(self.get(url).keys()) | |
def page_items(self, verbose=True, items_per_page=10): | |
all_items = {} | |
number_of_items = self.get_collection().get("object_count", {}).get("total", 0) | |
total_pages = math.ceil(number_of_items / items_per_page) | |
iterator = tqdm(range(1, total_pages + 1), desc="Fetching items", disable=not verbose) | |
for page in iterator: | |
new_items = self.get_items(page=page, per_page=items_per_page) | |
for k, v in new_items.items(): | |
all_items[k]=v | |
return all_items | |
def write_csv(self, data): | |
all_data = [] | |
for item, value in data.items(): | |
work_id = value.get('id') | |
for file in value['files']: | |
for filename in file['files']: | |
if 'low' in filename['label'] or 'medium' in filename['label']: | |
current = { | |
"Parent work": work_id, | |
"File id": file['id'], | |
"HLS Path": filename['hls_url'], | |
"File duration": filename['duration'], | |
"Original file": filename['derivativeFile'].split('/')[-1], | |
"File quality": filename['label'] | |
} | |
break | |
all_data.append(current) | |
with open("output.csv", "w") as my_csv: | |
writer = DictWriter(my_csv, fieldnames=all_data[0].keys()) | |
writer.writeheader() | |
for row in all_data: | |
writer.writerow(row) | |
def download_best_files(self, output): | |
all_items = self.page_items() | |
for item, value in all_items.items(): | |
work_id = value["id"] | |
for file in value['files']: | |
found = False | |
path = "" | |
for filename in file['files']: | |
if 'low' in filename['label'] or 'medium' in filename['label']: | |
path = filename['hls_url'] | |
found = True | |
break | |
current = { | |
"work_id": work_id, | |
"file_id": file['id'], | |
"found": found, | |
"path": path | |
} | |
command = [ | |
"ffmpeg", | |
"-i", current.get('path'), | |
"-vn", | |
"-af", "highpass=f=100, lowpass=f=8000, afftdn, loudnorm", | |
"-acodec", "libmp3lame", | |
"-q:a", "2", | |
f"{output}/{current.get('work_id')}_{current.get("file_id")}.mp3" | |
] | |
os.makedirs(output, exist_ok=True) | |
if os.path.exists(f"{output}/{current.get('work_id')}_{current.get("file_id")}.mp3"): | |
pass | |
else: | |
try: | |
subprocess.run(command, check=True) | |
except CalledProcessError: | |
# Todo: This needs to be investigated Better Handled | |
print(f"Failed to download {current.get("file_id")} from {current.get('work_id')}") | |
if __name__ == "__main__": | |
collection = "dr26xx36k" | |
example = AvalonCollection(collection) | |
example.download_best_files(f"/Volumes/digital_project_management/avalon_pre_files/{collection}") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment