Last active
December 28, 2024 10:44
-
-
Save un1tz3r0/cc46f21eb62fac1a39c14fcd42c5f29f to your computer and use it in GitHub Desktop.
This script uses ytmusicapi and pytube together to download your playlists, history or 'liked' songs as high-quality audio-only streams from Youtube Music.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' This script uses ytmusicapi and pytube together to download your playlists, history or 'liked' songs as | |
high-quality audio-only streams from Youtube Music, which are protected by a "signatureCipher" obfuscation scheme. | |
To use it, first install [ytmusicapi] and [pytube] using pip, then follow the instructions for creating the auth | |
file from the response in an authenticated session to a watch-page request as found in your browser's dev-tools. | |
The downloaded files are placed in ~/Music, named with the artist and track metadata, and will be skipped instead | |
of downloaded again next time it is run, based on the videoIds of the downloaded songs. | |
Merry Xmas - V. | |
''' | |
import ytmusicapi | |
import pytube | |
import re, os, sys | |
import requests | |
from urllib.parse import parse_qs, urljoin | |
import json | |
try: | |
import blessings | |
clear_eol = blessings.Terminal().clear_eol | |
except ImportError as err: | |
clear_eol = "\x1b[K" | |
class DownloaderMixin: | |
''' Mixin for ytmusicapi. YTMusic class that uses parts of pytube high-quality streaming and batch track and | |
playlist downloading the ytmusicapi. YTMusic class. Some examples of usage are given at the | |
end of the module after the rest of the class definition. ''' | |
def get_streaming_data_decrypted(self, videoId: str) -> dict: | |
''' This is based on the YTMusic.get_streaming_data() method but it makes use of pytube to | |
decode the signatureCipher obfuscation that "protects" the higher quality adaptiveFormat | |
stream URLs from being enjoyed by "bots". Robots deserve access to teh same high-fidelity | |
listening experience that we humans take for granted every time we leave auto-play going | |
on the tv and then pass out sitting up on the couch, phone in hand, shoes still on, sleep- | |
shopping on Amazon. ''' | |
# fetch /get_video_info? which should have a watch URL in there somewhere... | |
endpoint = "https://www.youtube.com/get_video_info" | |
params = {"video_id": videoId, "hl": self.language, "el": "detailpage", | |
"c": "WEB_REMIX", "cver": "0.1"} | |
response = requests.get(endpoint, params, headers=self.headers, proxies=self.proxies) | |
text = parse_qs(response.text) | |
if 'player_response' not in text: | |
# return text # huh? | |
raise Exception('This video is not playable (no player_response key in /get_video_info? response)') | |
player_response = json.loads(text['player_response'][0]) | |
if 'streamingData' not in player_response: | |
raise Exception('This video is not playable (no streamingData key in player_response key of /get_video_info? response)') | |
watch_url = player_response['microformat']['microformatDataRenderer']['urlCanonical'] # this seems like it will probably break easily... maybe fall back to a recursive search for a watch url anywhere in the JSON? or something? | |
# get the watch page's HTML, which we need to get the base.js URL that determines how | |
# pytube unscrambles the signatureCipher | |
watch_response = requests.get(watch_url, #params, | |
headers=self.headers, proxies=self.proxies) | |
watch_html = watch_response.text | |
# this is where pytube comes in... given the watch page HTML, it extracts for us the URL of | |
# the base.js for the video player, which is where the signatureCipher is descrambled by a | |
# variable algorithm coded in minified, obfuscated javascript. thankfully, the task of | |
# extracting from the javascript the steps needed to properly unscramble the signatureCipher | |
# is also handled by pytube. | |
player_js_url = pytube.extract.get_ytplayer_js(watch_html) | |
player_js_response = requests.get(urljoin(watch_url, player_js_url), params, headers=self.headers, proxies=self.proxies) | |
player_js = player_js_response.text | |
cipher = pytube.cipher.Cipher(js = player_js) | |
# okay, now we collect all the streams available and apply the cipher to any that have signed | |
# URLs. this is where we would also handle DASH manifests... i think? TODO, fo' sho'. | |
allformats = [] | |
sdata = player_response['streamingData'] | |
for formatsKey in ['formats', 'adaptiveFormats']: | |
if formatsKey in sdata.keys(): | |
for fmt in sdata[formatsKey]: | |
if 'signatureCipher' in fmt.keys(): | |
fmtsigcipherq = parse_qs(fmt['signatureCipher']) | |
sig = cipher.get_signature(fmtsigcipherq['s'][0]) | |
url = fmtsigcipherq['url'][0] + '&' + fmtsigcipherq['sp'][0] + '=' + sig | |
fmt['url'] = url | |
if not 'url' in fmt.keys(): | |
print(f"[warn] streamingData contains format with itag {fmt['itag']} without a url key in get_streaming_data_decrypted({repr(videoId)}):\n\n{repr(fmt)}\n") | |
continue | |
allformats.append(fmt) | |
return (sdata, allformats) | |
def download_song(self, video_id: str, dest_dir: str, chunk_size: int = 1024*1024, overwrite: bool = False, keep_incomplete: bool = False): | |
song = self.get_song(video_id) | |
artists = None | |
if 'artists' in song.keys(): | |
nonemptyuniqueartists = list(set([artist for artist in song['artists'] if len(artist) > 0])) | |
if len(nonemptyuniqueartists) > 0: | |
artists = ", ".join(list(set(song['artists']))) | |
if 'title' in song.keys(): | |
title = song['title'] | |
else: | |
title = None | |
if title != None and artists != None: | |
filename = artists + " - " + title + " [" + song['videoId'] + "]" | |
elif title != None: | |
filename = title + " [" + song['videoId'] + "]" | |
else: | |
filename = f"[{song['videoId']}]" | |
# maybe also download the thumbnail and possibly even set some tags (once we are finished downloading i guess...) | |
def sanitize(s): | |
return "".join(re.split("[^a-zA-Z 0-9_\\(\\)\\[\\]\\:\\'\\\"\\@\\!\\#\\$\\%\\&\\=\\+\\,\\.\\<\\>\\;\\|\\{\\}-]",s)).strip() | |
# pick from available streams one that is audio-only with the highest average bitrate, hence highest objective quality | |
try: | |
sdata, fmts = self.get_streaming_data_decrypted(song['videoId']) | |
audioonlyformats = [fmt for fmt in fmts if fmt['mimeType'].startswith('audio')] | |
if len(audioonlyformats) > 0: | |
bestfmt = list(sorted([(fmt['averageBitrate'], fmt) for fmt in fmts if fmt['mimeType'].startswith('audio')]))[-1][1] | |
else: | |
bestfmt = list(sorted([(fmt['averageBitrate'], fmt) for fmt in fmts]))[-1][1] | |
except Exception as err: | |
raise RuntimeError("Error selecting suitable streaming format: {err}") | |
fileext = bestfmt['mimeType'].split("/")[1].split(";")[0] # use sub-type from mimetype as file extension | |
fullfilename = dest_dir + "/" + sanitize(filename) + "." + fileext | |
if os.path.exists(fullfilename) and not overwrite: | |
print(f"Not downloading videoId {repr(song['videoId'])}, would overwrite file {repr(fullfilename)}...") | |
return False | |
print(f"Downloading videoId {repr(song['videoId'])} to file {repr(fullfilename)}...") | |
response = requests.get(bestfmt['url'], stream=True, headers=self.headers, proxies=self.proxies) | |
if 'content-length' in [k.lower() for k in response.headers.keys()]: | |
totalbytes = int([val for key,val in response.headers.items() if key.lower() == 'content-length'][0]) | |
started = False | |
wrotebytes = 0 | |
complete = False | |
try: | |
with open(fullfilename, "wb") as fout: | |
started = True | |
for chunk in response.iter_content(chunk_size=chunk_size): | |
fout.write(chunk) | |
wrotebytes = wrotebytes + len(chunk) | |
print(f"Downloaded {wrotebytes//1024} kbytes...{clear_eol}\r") | |
sys.stdout.flush() | |
complete = True | |
print(f"{clear_eol}\n") | |
sys.stdout.flush() | |
finally: | |
if started and not complete and not keep_incomplete: | |
print(f"Cleaning up partially downloaded file {repr(fullfilename)}...") | |
os.remove(fullfilename) | |
def download_playlist(self, playlist, dest_dir = "~/Music", limit_duration = 25*60, no_uploaded = True): | |
dest_dir = os.path.expanduser(dest_dir) | |
''' playlist may be specified in a few ways: | |
1. playlist id | |
2. return value of get_playlist() etc. (dict containing 'tracks' key with a list of dicts with 'videoId' keys) | |
3. list of dicts with videoId's | |
4. list of videoId strings | |
if given the result of a call to, e.g. get_playlist() or get_liked_songs(), the songs we | |
want are in a list under the 'tracks' key, assume we were passed either a list | |
of things that is directly enumerable and the elements of which each have a 'videoId', | |
for instance the dict returned by get_playlist() or get_liked_songs() etc. or a | |
''' | |
playlist_items = playlist | |
if isinstance(playlist_items, (str, bytes)): | |
# if playlist is a string, assume it is a playlist id and download the playlist | |
playlist_items = self.get_playlist(playlist_items) | |
elif hasattr(playlist_items, 'keys') and 'tracks' in playlist_items.keys(): | |
# if playlist is not string-like but is dict-like (or at least, has a keys() method ;) and | |
# has a key 'tracks', assume it is a playlist data structure as returned by get_playlist() | |
playlist_items = playlist_items['tracks'] | |
def parseDuration(s): | |
fields = s.split(":") | |
if len(fields) < 2: | |
return int(fields[0]) | |
elif len(fields) < 3: | |
return int(fields[0]) + int(fields[1]) * 60 | |
else: | |
return int(fields[-3])*60*60 + int(fields[-2])*60 + int(fields[-1]) | |
def check_video_id(video_id, dest_dir): | |
''' determine if there is already a file in the destination directory with a specific | |
videoId in brackets in the name, in which case we will skip downloading it again unless overwrite=True ''' | |
for fn in os.listdir(os.path.expanduser(dest_dir)): | |
fnparts = fn.split(os.path.extsep) | |
if any([fnpart.find('['+video_id+']') >= 0 for fnpart in fnparts]): | |
return True | |
return False | |
for listitem in list(playlist_items): | |
if (not 'videoId' in listitem.keys()): | |
raise KeyError("item in playlist_items does not have a videoId!") | |
if (not check_video_id(listitem['videoId'], dest_dir)) and ((not 'duration' in listitem.keys()) or (parseDuration(listitem['duration']) < 25*60)): | |
try: | |
ytm.download_song(listitem['videoId'], dest_dir) | |
except Exception as err: | |
print(f"Exception caught while trying to download videoId {listitem['videoId']} - {repr(listitem['title'])}: {err}") | |
else: | |
print(f"Skipping videoId {listitem['videoId']} - {repr(listitem['title'])} because a file ending with the same id exists.") | |
# Add the mixin to ytmusicapi class, creating our very own frankentype | |
class YTMusic(ytmusicapi.YTMusic, DownloaderMixin): | |
pass | |
# A simple example you can run from the cli: | |
if __name__ == "__main__": | |
usage = '''Missing file "headers_auth.json"... see ytmusicapi.readthedocs.org for explanation of how to use an | |
authenticated watch page request in a signed-in browser and the browser devtools to set up headers_auth.json for | |
ytmusicapi ''' | |
if not os.path.exists("headers_auth.json"): | |
print(usage) | |
exit | |
ytm=YTMusic("headers_auth.json") | |
# EXAMPLE - download the last 10 songs in your playback history | |
history = ytm.download_playlist(ytm.get_history()) | |
# EXAMPLE - download the most recent 1000 songs you liked | |
ytm.download_playlist(ytm.get_liked_songs(limit=1000)) | |
''' If you were a Google Play Music fan like I was, then I feel your pain. | |
As though finding out that Play Music was being shut down weren't bad enough, | |
being given the option of a Takeout archive containing only uploaded tracks, | |
not any purchased content (even though it was downloadable through Play Music), | |
or transferring your music library over to Youtube Music (which admittedly does | |
bear a passing resemblance, at least cosmetically, to Play Music now) where you | |
can still stream it all you want... I wanted those purchased songs I could | |
buy and then download to make a mixtape in <your chosen DAW> or just listen to | |
now that I am too broke to be able to afford an internet connection. ''' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment