Last active
September 13, 2023 22:20
-
-
Save redraw/cc7b1ef71179b855014fb65d15a005bc to your computer and use it in GitHub Desktop.
download track sections from mixcloud user uploads
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
pip install httpx aiofiles tqdm | |
""" | |
import sys | |
import json | |
import httpx | |
import asyncio | |
import argparse | |
import aiofiles | |
import traceback | |
from tqdm.asyncio import tqdm | |
async def _get_user_id(client, username): | |
payload = { | |
"query": "query userQuery(\n $lookup: UserLookup!\n) {\n user: userLookup(lookup: $lookup) {\n id\n username\n isSelect\n shouldShowAds\n ...UserHeadTags_user\n }\n}\n\nfragment UserHeadTags_user on User {\n displayName\n isUploader\n username\n picture {\n urlRoot\n }\n twitterAccount {\n username\n }\n}\n", | |
"variables": { | |
"lookup": { | |
"username": username, | |
}, | |
}, | |
} | |
response = await client.post("https://app.mixcloud.com/graphql", json=payload) | |
response.raise_for_status() | |
return response.json()["data"]["user"]["id"] | |
async def _get_cloudcast_ids(client, user_id, cursor=None, max_results=None): | |
ids = [] | |
while True: | |
payload = { | |
"query": "query UserUploadsPagePaginationQuery(\n $audioTypes: [AudioTypeEnum] = [SHOW]\n $count: Int = 10\n $cursor: String\n $orderBy: CloudcastOrderByEnum = LATEST\n $id: ID!\n) {\n node(id: $id) {\n __typename\n ...UserUploadsPage_user_3E72Mc\n id\n }\n}\n\nfragment UserUploadsPage_user_3E72Mc on User {\n id\n displayName\n username\n isViewer\n uploads(first: $count, isPublic: true, after: $cursor, orderBy: $orderBy, audioTypes: $audioTypes) {\n edges {\n node {\n id\n __typename\n }\n cursor\n }\n pageInfo {\n endCursor\n hasNextPage\n }\n }\n}", | |
"variables": { | |
"audioTypes": [ | |
"SHOW", | |
], | |
"count": 20, | |
"cursor": cursor, | |
"orderBy": "LATEST", | |
"id": user_id, | |
}, | |
} | |
response = await client.post("https://app.mixcloud.com/graphql", json=payload) | |
print(f"[{response.status_code}] {cursor=} {response.url}", file=sys.stderr) | |
response.raise_for_status() | |
cursor = response.json()["data"]["node"]["uploads"]["pageInfo"]["endCursor"] | |
ids.extend([edge["node"]["id"] for edge in response.json()["data"]["node"]["uploads"]["edges"]]) | |
if not cursor or (max_results and len(ids) > max_results): | |
break | |
return ids[:max_results] | |
async def _get_cloudcast_sections(client, cloudcast_id): | |
payload = { | |
"query": "query PlayerControlsQuery(\n $cloudcastId: ID!\n) {\n cloudcast(id: $cloudcastId) {\n owner { displayName }\n id\n name\n slug\n ...PlayerSliderComponent_cloudcast\n }\n}\n\nfragment PlayerSliderComponent_cloudcast on Cloudcast {\n id\n sections {\n __typename\n ... on TrackSection {\n artistName\n songName\n startSeconds\n }\n ... on ChapterSection {\n chapter\n startSeconds\n }\n }\n}", | |
"variables": { | |
"cloudcastId": cloudcast_id, | |
}, | |
} | |
response = await client.post("https://app.mixcloud.com/graphql", json=payload) | |
response.raise_for_status() | |
data = response.json() | |
if not data["data"]: | |
raise Exception(data["errors"][0]["message"]) | |
# common fields | |
username = data["data"]["cloudcast"]["owner"]["displayName"] | |
slug = data["data"]["cloudcast"]["slug"] | |
return [ | |
{ | |
"url": f"https://www.mixcloud.com/{username}/{slug}/", | |
"name": data["data"]["cloudcast"]["name"], | |
"artist": section["artistName"], | |
"song": section["songName"], | |
} | |
for section in data["data"]["cloudcast"]["sections"] | |
if section["__typename"] == "TrackSection" | |
] | |
async def query_songs_from_uploads(client, username=None, max_results=None): | |
user_id = await _get_user_id(client, username) | |
cloudcast_ids = await _get_cloudcast_ids(client, user_id, max_results=max_results) | |
tasks = [_get_cloudcast_sections(client, cloudcast_id) for cloudcast_id in cloudcast_ids] | |
for sections in tqdm(asyncio.as_completed(tasks), total=len(tasks), desc="Downloading"): | |
try: | |
yield await sections | |
except Exception as e: | |
traceback.print_exc(file=sys.stderr) | |
def cli(username: str, cookie: str, max_results: int): | |
headers = { | |
"x-mixcloud-client-version": "6fefb6248869200e9a6d8974360c122e0b52fe2c", | |
"x-mixcloud-platform": "www", | |
"x-requested-with": "XMLHttpRequest", | |
"origin": "https://www.mixcloud.com", | |
"referer": "https://www.mixcloud.com/", | |
"user-agent": "Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:47.0) Gecko/20100101 Firefox/47.0", | |
"cookie": cookie, | |
} | |
async def run(): | |
async with httpx.AsyncClient(headers=headers, timeout=300) as client: | |
async with aiofiles.open(f"{username}.jsonl", "a") as f: | |
async for sections in query_songs_from_uploads(client, username=username, max_results=max_results): | |
await f.write(json.dumps(sections)) | |
await f.write("\n") | |
asyncio.run(run()) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser() | |
parser.add_argument( | |
"-u", | |
"--username", | |
required=True, | |
help="Username of the user to extract songs from", | |
) | |
parser.add_argument( | |
"-c", | |
"--cookie", | |
required=True, | |
help="Cookie value from browser. This contains CSRF token, and login cookie. Mixcloud requires logged user to return tracks.", | |
) | |
parser.add_argument( | |
"-m", | |
"--max-results", | |
type=int, | |
help="Max uploads to extract songs from. Defaults to all.", | |
) | |
args = parser.parse_args() | |
cli(args.username, args.cookie, args.max_results) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment