|
# Written by Dominic Frye (Nebula) |
|
|
|
import os |
|
import inquirer |
|
import ctypes |
|
import ctypes.wintypes |
|
|
|
from base64 import standard_b64decode |
|
from subprocess import Popen, call |
|
from sqlite3 import connect, OperationalError |
|
from shutil import copyfile |
|
from tempfile import NamedTemporaryFile |
|
from requests import Session |
|
from http.cookiejar import CookieJar, Cookie |
|
from json import load |
|
from sys import argv, exit |
|
from pathlib import Path |
|
from Cryptodome.Cipher import AES |
|
|
|
|
|
class _DatabaseConnetion: |
|
def __init__(self, database_file: os.PathLike, try_legacy_first: bool = False): |
|
self.__database_file = database_file |
|
self.__temp_cookie_file = None |
|
self.__connection = None |
|
self.__methods = [ |
|
self.__sqlite3_connect_readonly, |
|
self.__get_connection_legacy, |
|
] |
|
if try_legacy_first: |
|
self.__methods.reverse() |
|
|
|
def __enter__(self): |
|
return self.get_connection() |
|
|
|
def __exit__(self, exc_type, exc_value, traceback): |
|
self.close() |
|
|
|
def __check_connection_ok(self, connection): |
|
try: |
|
connection.cursor().execute("select 1 from sqlite_master") |
|
return True |
|
except OperationalError: |
|
return False |
|
|
|
def __sqlite3_connect_readonly(self): |
|
uri = Path(self.__database_file).absolute().as_uri() |
|
for options in ("?mode=ro", "?mode=ro&nolock=1", "?mode=ro&immutable=1"): |
|
try: |
|
con = connect(uri + options, uri=True) |
|
except OperationalError: |
|
continue |
|
if self.__check_connection_ok(con): |
|
return con |
|
|
|
def __get_connection_legacy(self): |
|
self.__temp_cookie_file = NamedTemporaryFile(suffix=".sqlite").name |
|
copyfile(self.__database_file, self.__temp_cookie_file) |
|
con = connect(self.__temp_cookie_file) |
|
if self.__check_connection_ok(con): |
|
return con |
|
|
|
def get_connection(self): |
|
if self.__connection: |
|
return self.__connection |
|
for method in self.__methods: |
|
con = method() |
|
if con is not None: |
|
self.__connection = con |
|
return con |
|
raise RuntimeError("Unable to read database file") |
|
|
|
def cursor(self): |
|
return self.connection().cursor() |
|
|
|
def close(self): |
|
if self.__connection: |
|
self.__connection.close() |
|
if self.__temp_cookie_file: |
|
try: |
|
os.remove(self.__temp_cookie_file) |
|
except Exception: |
|
pass |
|
|
|
|
|
def unprotect_data(encrypted_value, is_key=False): |
|
class DataBlob(ctypes.Structure): |
|
_fields_ = [ |
|
("cbData", ctypes.wintypes.DWORD), |
|
("pbData", ctypes.POINTER(ctypes.c_char)), |
|
] |
|
|
|
blob_in, blob_entropy, blob_out = map( |
|
lambda x: DataBlob(len(x), ctypes.create_string_buffer(x)), |
|
[encrypted_value, b"", b""], |
|
) |
|
desc = ctypes.c_wchar_p() |
|
|
|
CRYPTPROTECT_UI_FORBIDDEN = 0x01 |
|
|
|
if not ctypes.windll.crypt32.CryptUnprotectData( |
|
ctypes.byref(blob_in), |
|
ctypes.byref(desc), |
|
ctypes.byref(blob_entropy), |
|
None, |
|
None, |
|
CRYPTPROTECT_UI_FORBIDDEN, |
|
ctypes.byref(blob_out), |
|
): |
|
raise RuntimeError("Failed to decrypt the cipher text with DPAPI") |
|
|
|
buffer_out = ctypes.create_string_buffer(int(blob_out.cbData)) |
|
ctypes.memmove(buffer_out, blob_out.pbData, blob_out.cbData) |
|
map(ctypes.windll.kernel32.LocalFree, [desc, blob_out.pbData]) |
|
if is_key: |
|
return buffer_out.raw |
|
else: |
|
return buffer_out.value |
|
|
|
|
|
def decrypt(value, encrypted_value, key): |
|
try: |
|
if len(value) != 0: |
|
return value |
|
|
|
if encrypted_value == b"": |
|
return "" |
|
|
|
data = unprotect_data(encrypted_value) |
|
assert isinstance(data, bytes) |
|
return data.decode() |
|
except RuntimeError: |
|
if not key: |
|
raise RuntimeError( |
|
"Failed to decrypt the cipher text with DPAPI and no AES key." |
|
) |
|
|
|
encrypted_value = encrypted_value[3:] |
|
nonce, tag = encrypted_value[:12], encrypted_value[-16:] |
|
aes = AES.new(key, AES.MODE_GCM, nonce=nonce) |
|
|
|
try: |
|
data = aes.decrypt_and_verify(encrypted_value[12:-16], tag) |
|
except ValueError: |
|
raise RuntimeError("Unable to get key for cookie decryption") |
|
return data.decode() |
|
|
|
|
|
def _text_factory(data): |
|
try: |
|
return data.decode("utf-8") |
|
except UnicodeDecodeError: |
|
return data |
|
|
|
|
|
## Part 1 |
|
## Dump cookies from Chrome |
|
v10_key = "" |
|
profiles_list = [] |
|
with open( |
|
f"{os.getenv('LOCALAPPDATA')}\\Google\\Chrome\\User Data\\Local State", "r" |
|
) as f: |
|
j = load(f) |
|
profiles = j["profile"]["info_cache"] |
|
|
|
for k, v in profiles.items(): |
|
profiles_list.append(f"{k} ({v['name']})") |
|
|
|
# extract decryption key |
|
key64 = j["os_crypt"]["encrypted_key"].encode("utf-8") |
|
keydpapi = standard_b64decode(key64)[5:] |
|
v10_key = unprotect_data(keydpapi, is_key=True) |
|
|
|
|
|
questions = [ |
|
inquirer.List( |
|
"profile", |
|
message="Choose a Chrome profile", |
|
choices=profiles_list, |
|
), |
|
] |
|
answers = inquirer.prompt(questions) |
|
chosen_profile = answers["profile"].split(" (")[0] |
|
|
|
canikill = input("Can I kill the Chrome process (y/n)? ").lower() |
|
if "y" in canikill: |
|
call("taskkill /f /im chrome.exe") |
|
else: |
|
exit(1) |
|
|
|
cj = CookieJar() |
|
with _DatabaseConnetion( |
|
f"{os.getenv('LOCALAPPDATA')}\\Google\\Chrome\\User Data\\{chosen_profile}\\Network\\Cookies" |
|
) as con: |
|
con.text_factory = _text_factory |
|
cur = con.cursor() |
|
cur.execute( |
|
"SELECT host_key, path, is_secure, expires_utc, name, value, encrypted_value, is_httponly " |
|
"FROM cookies WHERE host_key like ?;", |
|
("%{}%".format(".patreon.com"),), |
|
) |
|
|
|
for item in cur.fetchall(): |
|
host, path, secure, expires_nt_time_epoch, name, value, enc_value, http_only = ( |
|
item |
|
) |
|
if expires_nt_time_epoch == 0: |
|
expires = None |
|
else: |
|
expires = (expires_nt_time_epoch / 1000000) - 11644473600 |
|
|
|
value = decrypt(value, enc_value, v10_key) |
|
c = Cookie( |
|
0, |
|
name, |
|
value, |
|
None, |
|
False, |
|
host, |
|
host.startswith("."), |
|
host.startswith("."), |
|
path, |
|
True, |
|
secure, |
|
expires, |
|
False, |
|
None, |
|
None, |
|
{"HTTPOnly": ""} if http_only else {}, |
|
) |
|
# c = create_cookie(host, path, secure, expires, name, value, http_only) |
|
cj.set_cookie(c) |
|
|
|
## Part 2 |
|
## Fetch data from API |
|
print(cj) |
|
session = Session() |
|
session.cookies = cj |
|
|
|
headers = { |
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:128.0) Gecko/20100101 Firefox/128.0", |
|
"Content-Type": "application/vnd.api+json", |
|
"Connection": "keep-alive", |
|
"Priority": "u=4", |
|
"TE": "trailers", |
|
} |
|
if argv[1]: |
|
campaign_id = argv[1] |
|
else: |
|
campaign_id = input("Enter campaign id: ") |
|
url = f"https://www.patreon.com/api/posts?filter[campaign_id]={campaign_id}&filter[include_drafts]=false&include=post%2Cpost.campaign%2Cpost.access_rules%2Cpost.access_rules.tier.null%2Cpost.attachments%2Cpost.audio%2Cpost.audio_preview.null%2Cpost.drop%2Cpost.images%2Cpost.media%2Cpost.native_video_insights%2Cpost.poll.choices%2Cpost.poll.current_user_responses.user%2Cpost.poll.current_user_responses.choice%2Cpost.poll.current_user_responses.poll%2Cpost.user%2Cpost.user_defined_tags%2Cpost.ti_checks%2Cpost.content_unlock_options.product_variant.null&fields[drop]=cover_image%2Ccreated_at%2Cexpires_at%2Cid%2Cscheduled_for%2Cis_droppable%2Ccomments_cid%2Cpresence_cid%2Cpresence_count&fields[campaign]=currency%2Cshow_audio_post_download_links%2Cavatar_photo_url%2Cavatar_photo_image_urls%2Cearnings_visibility%2Cis_nsfw%2Cis_monthly%2Cname%2Curl&fields[post]=change_visibility_at%2Ccomment_count%2Ccommenter_count%2Ccontent%2Ccurrent_user_can_comment%2Ccurrent_user_can_delete%2Ccurrent_user_can_report%2Ccurrent_user_can_view%2Ccurrent_user_comment_disallowed_reason%2Ccurrent_user_has_liked%2Cembed%2Cimage%2Cinsights_last_updated_at%2Cis_paid%2Clike_count%2Cmeta_image_url%2Cmin_cents_pledged_to_view%2Cpost_file%2Cpost_metadata%2Cpublished_at%2Cpatreon_url%2Cpost_type%2Cpledge_url%2Cpreview_asset_type%2Cthumbnail%2Cthumbnail_url%2Cteaser_text%2Ctitle%2Cupgrade_url%2Curl%2Cwas_posted_by_campaign_owner%2Chas_ti_violation%2Cmoderation_status%2Cpost_level_suspension_removal_date%2Cpls_one_liners_by_category%2Cvideo_preview%2Cview_count%2Ccontent_unlock_options%2Cis_new_to_current_user&fields[post_tag]=tag_type%2Cvalue&fields[user]=image_url%2Cfull_name%2Curl&fields[access_rule]=access_rule_type%2Camount_cents&fields[media]=id%2Cimage_urls%2Cdisplay%2Cdownload_url%2Cmetadata%2Cfile_name&fields[native_video_insights]=average_view_duration%2Caverage_view_pct%2Chas_preview%2Cid%2Clast_updated_at%2Cnum_views%2Cpreview_views%2Cvideo_duration&fields[content-unlock-option]=content_unlock_type&fields[product-variant]=price_cents%2Ccurrency_code%2Ccheckout_url%2Cis_hidden%2Cpublished_at_datetime%2Ccontent_type%2Corders_count%2Caccess_metadata&json-api-version=1.0&json-api-use-default-includes=false" |
|
|
|
dl = Path("./downloads") |
|
dl.mkdir(exist_ok=True) |
|
downloads = [ |
|
os.path.splitext(f)[0] |
|
for f in os.listdir(dl) |
|
if os.path.isfile(os.path.join(dl, f)) |
|
] |
|
print(downloads) |
|
|
|
res = session.get(url, headers=headers) |
|
j = res.json() |
|
posts = j["data"] |
|
while True: |
|
try: |
|
nextlink = j["links"]["next"] |
|
print(nextlink.split("%5B")[-1]) |
|
|
|
res = session.get(nextlink, headers=headers) |
|
j = res.json() |
|
posts.extend(j["data"]) |
|
except KeyError: |
|
break |
|
|
|
## Part 3 |
|
## Download |
|
processes = [] |
|
for p in posts: |
|
attrib = p["attributes"] |
|
if (p["type"] == "post") and (attrib["post_type"] == "video_external_file"): |
|
title = attrib["title"] |
|
if title.rstrip() not in downloads: |
|
if ("filter 1" in title.lower()) or ("filter 2" in title.lower()): # add any filters here, remove this if statement if you want everything |
|
print(title) |
|
try: |
|
url = attrib["post_file"]["url"] # video_preview for a 30 second preview (available to non subscribed users too) |
|
process = Popen( |
|
f'yt-dlp "{url}" -o "downloads/{title.rstrip()}.%(ext)s"', |
|
shell=True, |
|
) |
|
processes.append(process) |
|
except: |
|
pass |
|
|
|
output = [p.wait() for p in processes] |