Skip to content

Instantly share code, notes, and snippets.

@itsnebulalol
Last active September 6, 2024 02:05
Show Gist options
  • Save itsnebulalol/b9bbd6654341f16342578eee01f99a5a to your computer and use it in GitHub Desktop.
Save itsnebulalol/b9bbd6654341f16342578eee01f99a5a to your computer and use it in GitHub Desktop.
Download every video from a Patreon profile

Patreon Video Downloader

Experimentation with downloading every single video from a Patreon profile. (Windows only)

Disclaimer

By using this script, you are responsible to figure out the creator's copyright information. Assume that you are not allowed to redistribute downloaded files unless specifically told.

This script requires you to be subscribed to the user you want to download videos from.

How to use

Prepare script

  1. Download the requirements.txt and main.py
  2. Create a venv with python -m venv .venv
  3. Activate the venv with .\.venv\Scripts\activate
  4. Install requirements with pip install -r requirements.txt

Get Patreon information

  1. Make sure you have Google Chrome (not Chromium!!) installed
  2. Use it to log into Patreon
  3. Open Chrome DevTools (CTRL+SHIFT+I) and go to the Network tab
  4. Go to or refresh the creator's page
  5. Find a request like the one in the image below, and copy the numbers in it (this is the campaign id, you will need this) image

Run the script

  1. Edit any filters on line 284 of main.py
  2. Run python main.py
  3. You will be asked to choose a Chrome profile, choose the one that you used to log into Patreon
  4. Press y to close your Chrome instance (please save any work before doing this)
  5. Paste in your campaign id from earlier
  6. yt-dlp will start to download every video available (this will likely peg your CPU to 100%)
  7. Grab a coffee and wait, your downloaded files will be in the downloads folder wherever you ran this script

Credits

inquirer==3.3.0
pycryptodomex==3.20.0
requests==2.32.3
yt-dlp==2024.7.16
# Written by Dominic Frye (Nebula)
import os
import inquirer
import ctypes
import ctypes.wintypes
from base64 import standard_b64decode
from subprocess import Popen, call
from sqlite3 import connect, OperationalError
from shutil import copyfile
from tempfile import NamedTemporaryFile
from requests import Session
from http.cookiejar import CookieJar, Cookie
from json import load
from sys import argv, exit
from pathlib import Path
from Cryptodome.Cipher import AES
class _DatabaseConnetion:
def __init__(self, database_file: os.PathLike, try_legacy_first: bool = False):
self.__database_file = database_file
self.__temp_cookie_file = None
self.__connection = None
self.__methods = [
self.__sqlite3_connect_readonly,
self.__get_connection_legacy,
]
if try_legacy_first:
self.__methods.reverse()
def __enter__(self):
return self.get_connection()
def __exit__(self, exc_type, exc_value, traceback):
self.close()
def __check_connection_ok(self, connection):
try:
connection.cursor().execute("select 1 from sqlite_master")
return True
except OperationalError:
return False
def __sqlite3_connect_readonly(self):
uri = Path(self.__database_file).absolute().as_uri()
for options in ("?mode=ro", "?mode=ro&nolock=1", "?mode=ro&immutable=1"):
try:
con = connect(uri + options, uri=True)
except OperationalError:
continue
if self.__check_connection_ok(con):
return con
def __get_connection_legacy(self):
self.__temp_cookie_file = NamedTemporaryFile(suffix=".sqlite").name
copyfile(self.__database_file, self.__temp_cookie_file)
con = connect(self.__temp_cookie_file)
if self.__check_connection_ok(con):
return con
def get_connection(self):
if self.__connection:
return self.__connection
for method in self.__methods:
con = method()
if con is not None:
self.__connection = con
return con
raise RuntimeError("Unable to read database file")
def cursor(self):
return self.connection().cursor()
def close(self):
if self.__connection:
self.__connection.close()
if self.__temp_cookie_file:
try:
os.remove(self.__temp_cookie_file)
except Exception:
pass
def unprotect_data(encrypted_value, is_key=False):
class DataBlob(ctypes.Structure):
_fields_ = [
("cbData", ctypes.wintypes.DWORD),
("pbData", ctypes.POINTER(ctypes.c_char)),
]
blob_in, blob_entropy, blob_out = map(
lambda x: DataBlob(len(x), ctypes.create_string_buffer(x)),
[encrypted_value, b"", b""],
)
desc = ctypes.c_wchar_p()
CRYPTPROTECT_UI_FORBIDDEN = 0x01
if not ctypes.windll.crypt32.CryptUnprotectData(
ctypes.byref(blob_in),
ctypes.byref(desc),
ctypes.byref(blob_entropy),
None,
None,
CRYPTPROTECT_UI_FORBIDDEN,
ctypes.byref(blob_out),
):
raise RuntimeError("Failed to decrypt the cipher text with DPAPI")
buffer_out = ctypes.create_string_buffer(int(blob_out.cbData))
ctypes.memmove(buffer_out, blob_out.pbData, blob_out.cbData)
map(ctypes.windll.kernel32.LocalFree, [desc, blob_out.pbData])
if is_key:
return buffer_out.raw
else:
return buffer_out.value
def decrypt(value, encrypted_value, key):
try:
if len(value) != 0:
return value
if encrypted_value == b"":
return ""
data = unprotect_data(encrypted_value)
assert isinstance(data, bytes)
return data.decode()
except RuntimeError:
if not key:
raise RuntimeError(
"Failed to decrypt the cipher text with DPAPI and no AES key."
)
encrypted_value = encrypted_value[3:]
nonce, tag = encrypted_value[:12], encrypted_value[-16:]
aes = AES.new(key, AES.MODE_GCM, nonce=nonce)
try:
data = aes.decrypt_and_verify(encrypted_value[12:-16], tag)
except ValueError:
raise RuntimeError("Unable to get key for cookie decryption")
return data.decode()
def _text_factory(data):
try:
return data.decode("utf-8")
except UnicodeDecodeError:
return data
## Part 1
## Dump cookies from Chrome
v10_key = ""
profiles_list = []
with open(
f"{os.getenv('LOCALAPPDATA')}\\Google\\Chrome\\User Data\\Local State", "r"
) as f:
j = load(f)
profiles = j["profile"]["info_cache"]
for k, v in profiles.items():
profiles_list.append(f"{k} ({v['name']})")
# extract decryption key
key64 = j["os_crypt"]["encrypted_key"].encode("utf-8")
keydpapi = standard_b64decode(key64)[5:]
v10_key = unprotect_data(keydpapi, is_key=True)
questions = [
inquirer.List(
"profile",
message="Choose a Chrome profile",
choices=profiles_list,
),
]
answers = inquirer.prompt(questions)
chosen_profile = answers["profile"].split(" (")[0]
canikill = input("Can I kill the Chrome process (y/n)? ").lower()
if "y" in canikill:
call("taskkill /f /im chrome.exe")
else:
exit(1)
cj = CookieJar()
with _DatabaseConnetion(
f"{os.getenv('LOCALAPPDATA')}\\Google\\Chrome\\User Data\\{chosen_profile}\\Network\\Cookies"
) as con:
con.text_factory = _text_factory
cur = con.cursor()
cur.execute(
"SELECT host_key, path, is_secure, expires_utc, name, value, encrypted_value, is_httponly "
"FROM cookies WHERE host_key like ?;",
("%{}%".format(".patreon.com"),),
)
for item in cur.fetchall():
host, path, secure, expires_nt_time_epoch, name, value, enc_value, http_only = (
item
)
if expires_nt_time_epoch == 0:
expires = None
else:
expires = (expires_nt_time_epoch / 1000000) - 11644473600
value = decrypt(value, enc_value, v10_key)
c = Cookie(
0,
name,
value,
None,
False,
host,
host.startswith("."),
host.startswith("."),
path,
True,
secure,
expires,
False,
None,
None,
{"HTTPOnly": ""} if http_only else {},
)
# c = create_cookie(host, path, secure, expires, name, value, http_only)
cj.set_cookie(c)
## Part 2
## Fetch data from API
print(cj)
session = Session()
session.cookies = cj
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:128.0) Gecko/20100101 Firefox/128.0",
"Content-Type": "application/vnd.api+json",
"Connection": "keep-alive",
"Priority": "u=4",
"TE": "trailers",
}
if argv[1]:
campaign_id = argv[1]
else:
campaign_id = input("Enter campaign id: ")
url = f"https://www.patreon.com/api/posts?filter[campaign_id]={campaign_id}&filter[include_drafts]=false&include=post%2Cpost.campaign%2Cpost.access_rules%2Cpost.access_rules.tier.null%2Cpost.attachments%2Cpost.audio%2Cpost.audio_preview.null%2Cpost.drop%2Cpost.images%2Cpost.media%2Cpost.native_video_insights%2Cpost.poll.choices%2Cpost.poll.current_user_responses.user%2Cpost.poll.current_user_responses.choice%2Cpost.poll.current_user_responses.poll%2Cpost.user%2Cpost.user_defined_tags%2Cpost.ti_checks%2Cpost.content_unlock_options.product_variant.null&fields[drop]=cover_image%2Ccreated_at%2Cexpires_at%2Cid%2Cscheduled_for%2Cis_droppable%2Ccomments_cid%2Cpresence_cid%2Cpresence_count&fields[campaign]=currency%2Cshow_audio_post_download_links%2Cavatar_photo_url%2Cavatar_photo_image_urls%2Cearnings_visibility%2Cis_nsfw%2Cis_monthly%2Cname%2Curl&fields[post]=change_visibility_at%2Ccomment_count%2Ccommenter_count%2Ccontent%2Ccurrent_user_can_comment%2Ccurrent_user_can_delete%2Ccurrent_user_can_report%2Ccurrent_user_can_view%2Ccurrent_user_comment_disallowed_reason%2Ccurrent_user_has_liked%2Cembed%2Cimage%2Cinsights_last_updated_at%2Cis_paid%2Clike_count%2Cmeta_image_url%2Cmin_cents_pledged_to_view%2Cpost_file%2Cpost_metadata%2Cpublished_at%2Cpatreon_url%2Cpost_type%2Cpledge_url%2Cpreview_asset_type%2Cthumbnail%2Cthumbnail_url%2Cteaser_text%2Ctitle%2Cupgrade_url%2Curl%2Cwas_posted_by_campaign_owner%2Chas_ti_violation%2Cmoderation_status%2Cpost_level_suspension_removal_date%2Cpls_one_liners_by_category%2Cvideo_preview%2Cview_count%2Ccontent_unlock_options%2Cis_new_to_current_user&fields[post_tag]=tag_type%2Cvalue&fields[user]=image_url%2Cfull_name%2Curl&fields[access_rule]=access_rule_type%2Camount_cents&fields[media]=id%2Cimage_urls%2Cdisplay%2Cdownload_url%2Cmetadata%2Cfile_name&fields[native_video_insights]=average_view_duration%2Caverage_view_pct%2Chas_preview%2Cid%2Clast_updated_at%2Cnum_views%2Cpreview_views%2Cvideo_duration&fields[content-unlock-option]=content_unlock_type&fields[product-variant]=price_cents%2Ccurrency_code%2Ccheckout_url%2Cis_hidden%2Cpublished_at_datetime%2Ccontent_type%2Corders_count%2Caccess_metadata&json-api-version=1.0&json-api-use-default-includes=false"
dl = Path("./downloads")
dl.mkdir(exist_ok=True)
downloads = [
os.path.splitext(f)[0]
for f in os.listdir(dl)
if os.path.isfile(os.path.join(dl, f))
]
print(downloads)
res = session.get(url, headers=headers)
j = res.json()
posts = j["data"]
while True:
try:
nextlink = j["links"]["next"]
print(nextlink.split("%5B")[-1])
res = session.get(nextlink, headers=headers)
j = res.json()
posts.extend(j["data"])
except KeyError:
break
## Part 3
## Download
processes = []
for p in posts:
attrib = p["attributes"]
if (p["type"] == "post") and (attrib["post_type"] == "video_external_file"):
title = attrib["title"]
if title.rstrip() not in downloads:
if ("filter 1" in title.lower()) or ("filter 2" in title.lower()): # add any filters here, remove this if statement if you want everything
print(title)
try:
url = attrib["post_file"]["url"] # video_preview for a 30 second preview (available to non subscribed users too)
process = Popen(
f'yt-dlp "{url}" -o "downloads/{title.rstrip()}.%(ext)s"',
shell=True,
)
processes.append(process)
except:
pass
output = [p.wait() for p in processes]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment