Skip to content

Instantly share code, notes, and snippets.

@aculich
Forked from danaspiegel/zdl.py
Created March 29, 2021 17:48
Show Gist options
  • Save aculich/1b42a6d4f820a164c7c8c6a4f6342348 to your computer and use it in GitHub Desktop.
Save aculich/1b42a6d4f820a164c7c8c6a4f6342348 to your computer and use it in GitHub Desktop.
Zoom Recording Downloader
#!/usr/bin/env python3
import os
import requests
import itertools
from dateutil.parser import parse
API_KEY = ''
API_SECRET = ''
ACCESS_TOKEN = ''
API_ENDPOINT_USER_LIST = 'https://api.zoom.us/v1/user/list'
API_ENDPOINT_RECORDING_LIST = 'https://api.zoom.us/v1/recording/list'
DOWNLOAD_DIRECTORY = 'downloads'
COMPLETED_MEETING_IDS_LOG = 'completed_downloads.txt'
COMPLETED_MEETING_IDS = set()
def get_credentials(host_id, page_number):
return {
'api_key': API_KEY,
'api_secret': API_SECRET,
'host_id': host_id,
'page_size': 300,
'page_number': page_number,
}
def get_user_ids():
response = requests.post(API_ENDPOINT_USER_LIST, data=get_credentials(1, 1))
user_data = response.json()
user_ids = [(user['id'], user['email'], ) for user in user_data['users']]
return user_ids
def format_filename(recording, file_type):
uuid = recording['uuid']
topic = recording['topic'].replace('/', '&')
meeting_time = parse(recording['start_time'])
return '{} - {} UTC - {}.{}'.format(
meeting_time.strftime('%Y.%m.%d'), meeting_time.strftime('%I.%M %p'), topic, file_type.lower())
def get_downloads(recording):
downloads = []
for download in recording['recording_files']:
file_type = download['file_type']
download_url = download['download_url']
downloads.append((file_type, download_url, ))
return downloads
def list_recordings(user_id):
post_data = get_credentials(user_id, 1)
response = requests.post(API_ENDPOINT_RECORDING_LIST, data=post_data)
recordings_data = response.json()
total_records = recordings_data['total_records']
page_count = recordings_data['page_count']
if total_records == 0:
return []
if page_count <= 1:
return recordings_data['meetings']
recordings = recordings_data['meetings']
for i in range(1, page_count): # start at page index 1 since we already have the first page
current_page = i + 1
print('Getting page {} of {}'.format(current_page, page_count))
post_data = get_credentials(user_id, current_page)
response = requests.post(API_ENDPOINT_RECORDING_LIST, data=post_data)
recordings_data = response.json()
if recordings_data:
recordings.extend(recordings_data['meetings'])
return recordings
def download_recording(download_url, email, filename):
dl_dir = os.sep.join([DOWNLOAD_DIRECTORY, email])
full_filename = os.sep.join([dl_dir, filename])
os.makedirs(dl_dir, exist_ok=True)
response = requests.get(download_url, stream=True)
try:
with open(full_filename, 'wb') as fd:
for chunk in response.iter_content(chunk_size=128):
fd.write(chunk)
return True
except Exception as e:
# if there was some exception, print the error and return False
print(e)
return False
def load_completed_meeting_ids():
with open(COMPLETED_MEETING_IDS_LOG, 'r') as fd:
for line in fd:
COMPLETED_MEETING_IDS.add(line.strip())
def main():
load_completed_meeting_ids()
users = get_user_ids()
for user_id, email in users:
print('Getting recording list for {}'.format(email))
recordings = list_recordings(user_id)
total_count = len(recordings)
print('Found {} recordings'.format(total_count))
for index, recording in enumerate(recordings):
success = False
meeting_id = recording['uuid']
# import ipdb;ipdb.set_trace()
if meeting_id in COMPLETED_MEETING_IDS:
print('Skipping already downloaded meeting: {}'.format(meeting_id))
continue
downloads = get_downloads(recording)
for file_type, download_url in downloads:
filename = format_filename(recording, file_type)
print('Downloading ({} of {}): {}: {}'.format(index+1, total_count, meeting_id, download_url))
success |= download_recording(download_url, email, filename)
# success = True
if success:
# if successful, write the ID of this recording to the completed file
with open(COMPLETED_MEETING_IDS_LOG, 'a') as log:
COMPLETED_MEETING_IDS.add(meeting_id)
log.write(meeting_id)
log.write('\n')
log.flush()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment