-
-
Save empjustine/1aff3df95c05ed3334db0244707b6763 to your computer and use it in GitHub Desktop.
Bulk downloader for dash docsets (official and user contributed)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# https://editorconfig.org/ | |
root = true | |
[*] | |
indent_style = space | |
indent_size = 4 | |
insert_final_newline = true | |
trim_trailing_whitespace = true | |
end_of_line = lf | |
charset = utf-8 | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
*.tgz | |
venv/ | |
.idea/ | |
__pycache__/ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import argparse | |
import contextlib | |
import fnmatch | |
import json | |
import logging | |
import pathlib | |
import random | |
import shutil | |
import tempfile | |
import typing | |
import urllib.parse | |
import xml.etree.ElementTree | |
import tqdm | |
import doggybag | |
_LOGGER = logging.getLogger(__name__) | |
CDN_CHOICES = [ | |
"sanfrancisco", | |
"london", | |
"newyork", | |
"tokyo", | |
"frankfurt", | |
# "sydney", # 302 http://sydney.kapeli.com/ -> http://kapeli.com/ | |
"singapore", | |
] | |
def download_dash_docsets(root: pathlib.Path, docset_pattern="*"): | |
""" | |
Dash docsets are located via dash feeds : https://github.com/Kapeli/feeds | |
url : https://github.com/Kapeli/feeds/archive/master.zip | |
""" | |
destination_directory = root.joinpath("feeds") | |
destination_directory.mkdir(parents=True, exist_ok=True) | |
feeds_zip = destination_directory.joinpath("feeds.zip") | |
doggybag.download( | |
url=urllib.parse.urlsplit("https://github.com/Kapeli/feeds/archive/master.zip"), | |
destination=feeds_zip, | |
expected_content_type=("application/zip",), | |
) | |
for url, destination in dash_docsets(feeds_zip, destination_directory): | |
if not fnmatch.fnmatch(destination.name, docset_pattern): | |
continue | |
doggybag.download( | |
url=url, | |
destination=destination, | |
strict_download=True, | |
expected_content_type=("application/x-tar",), | |
) | |
def dash_docsets( | |
feeds_zip: pathlib.Path, destination_directory: pathlib.Path | |
) -> typing.Iterator[typing.Tuple[urllib.parse.SplitResult, pathlib.Path]]: | |
with _temporary_directory() as temporary_directory: | |
shutil.unpack_archive(str(feeds_zip), str(temporary_directory)) | |
# parse xml feeds and extract urls | |
with tqdm.tqdm( | |
sorted( | |
temporary_directory.joinpath("feeds-master").iterdir(), | |
key=lambda x: random.random(), | |
) | |
) as progressbar: | |
for extracted_temporary_feed_xml in progressbar: | |
url = _url_in_feed_xml(extracted_temporary_feed_xml) | |
destination = destination_directory.joinpath( | |
f"{extracted_temporary_feed_xml.stem}.tgz" | |
) | |
yield url, destination | |
def _url_in_feed_xml(extracted_temporary_feed_xml): | |
tree = xml.etree.ElementTree.parse(extracted_temporary_feed_xml) | |
root = tree.getroot() | |
url_tags = root.findall("url") | |
try: | |
url_tag = random.choice(url_tags) | |
url_text = url_tag.text | |
except IndexError: | |
raise IndexError("no urls found") | |
return urllib.parse.urlsplit(url_text) | |
def download_user_contributed_docsets(root: pathlib.Path, cdn=None, docset_pattern="*"): | |
""" | |
Dash docsets are located via dash feeds : https://github.com/Kapeli/Dash-User-Contributions | |
json file : f"http://{cdn}.kapeli.com/feeds/zzz/user_contributed/build/index.json" | |
""" | |
destination_directory = root.joinpath("feeds/zzz/user_contributed/build") | |
destination_directory.mkdir(parents=True, exist_ok=True) | |
feeds_json = destination_directory.joinpath("index.json") | |
feed_json_url = urllib.parse.urlsplit( | |
f"http://{cdn}.kapeli.com/feeds/zzz/user_contributed/build/index.json" | |
) | |
doggybag.download( | |
url=feed_json_url, | |
destination=feeds_json, | |
expected_content_type=("application/json",), | |
) | |
for archive_url, destination in user_contributed_docsets( | |
feeds_json, destination_directory | |
): | |
if not fnmatch.fnmatch(destination.name, docset_pattern): | |
continue | |
doggybag.download( | |
url=archive_url, | |
destination=destination, | |
strict_download=True, | |
expected_content_type=("application/x-tar",), | |
) | |
def user_contributed_docsets( | |
kapeli_feeds: pathlib.Path, destination_directory: pathlib.Path | |
) -> typing.Iterator[typing.Tuple[urllib.parse.SplitResult, pathlib.Path]]: | |
with kapeli_feeds.open("r") as f: | |
_kapeli_feeds = json.load(f) | |
docsets = _kapeli_feeds["docsets"] | |
iterable = docsets.items() | |
with tqdm.tqdm(sorted(iterable, key=lambda x: random.random())) as progressbar: | |
for docset_name, docset_metadata in progressbar: | |
cdn = random.choice(CDN_CHOICES) | |
# url format for packages that specify "specific_versions" | |
ignored = f"http://{cdn}.kapeli.com/feeds/zzz/user_contributed/build/{docset_name}/versions/{docset_metadata['version']}/{docset_metadata['archive']}" | |
archive_url = urllib.parse.urlsplit( | |
f"http://{cdn}.kapeli.com/feeds/zzz/user_contributed/build/{docset_name}/{docset_metadata['archive']}" | |
) | |
destination = destination_directory.joinpath( | |
docset_name, docset_metadata["archive"] | |
) | |
yield archive_url, destination | |
@contextlib.contextmanager | |
def _temporary_directory() -> typing.ContextManager[pathlib.Path]: | |
"""wraps tempfile to give you pathlib.Path""" | |
with tempfile.TemporaryDirectory() as mktemp: | |
yield pathlib.Path(mktemp) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description="A downloader for Dash Docsets") | |
parser.add_argument( | |
"--dash", help="only download dash docsets", action="store_true" | |
) | |
parser.add_argument( | |
"--user-contrib", help="only download user contrib docsets", action="store_true" | |
) | |
parser.add_argument( | |
"-d", | |
"--docset", | |
help="only download a specifics docsets. This option support the glob pattern", | |
default="*", | |
) | |
parser.add_argument( | |
"-q", "--quiet", help="reduces output verbosity", action="store_true" | |
) | |
parser.add_argument( | |
"-o", "--output", help="change output directory", default=pathlib.Path.cwd(), | |
) | |
parser.add_argument( | |
"-c", | |
"--cdn", | |
help="choose cdn (random by default)", | |
# default=random.choice(CDN_CHOICES), | |
choices=CDN_CHOICES, | |
) | |
args = parser.parse_args() | |
if args.quiet: | |
logging.basicConfig(level=logging.INFO) | |
else: | |
logging.basicConfig(level=logging.DEBUG) | |
logging.getLogger("urllib3.connectionpool").setLevel(logging.INFO) | |
output = pathlib.Path(args.output) | |
output.mkdir(parents=True, exist_ok=True) | |
if not args.user_contrib: | |
download_dash_docsets( | |
root=output, docset_pattern=args.docset, | |
) | |
if not args.dash: | |
download_user_contributed_docsets( | |
root=output, cdn=args.cdn, docset_pattern=args.docset | |
) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import contextlib | |
import gzip | |
import logging | |
import pathlib | |
import random | |
import shutil | |
import tempfile | |
import typing | |
import urllib.parse | |
import uuid | |
import requests | |
import tqdm | |
import filemetadata | |
_LOGGER = logging.getLogger(__name__) | |
MIME_TYPE_GZIP = ( | |
"application/gzip", | |
"application/x-tar", | |
) | |
SAFETY_OVERLAP_IN_SECONDS = 3600 | |
OVERWRITE_CHANCE = 10 # one in X | |
def download( | |
url: urllib.parse.SplitResult, | |
destination: pathlib.Path, | |
strict_download: bool = False, | |
expected_content_type: typing.Optional[typing.Tuple[str]] = (), | |
chunk_size: int = 32 * 1024, | |
): | |
"""Download a file | |
Verifies if upstream file "Last-Modified" is older that current file. | |
Verifies if upstream file "ETag" is different from current file. | |
Downloads file first to a temporary location then move-replace current file in destination. | |
""" | |
_url = urllib.parse.urlunsplit(url) | |
local_metadata = filemetadata.FileMetadata.read(destination) | |
response = requests.get( | |
url=_url, | |
stream=True, | |
headers={ | |
**local_metadata.if_modified_since(), | |
**local_metadata.if_none_match(), | |
}, | |
allow_redirects=not strict_download, | |
) | |
status_code = response.status_code | |
if status_code == requests.codes.not_modified: | |
return True | |
if status_code != requests.codes.ok: | |
_LOGGER.warning("status_code [%d] : %s", status_code, response.headers) | |
if strict_download: | |
response.raise_for_status() | |
content_type = response.headers.get("Content-Type", None) | |
if content_type not in expected_content_type: | |
raise ValueError("wrong Content-Type", content_type, expected_content_type) | |
remote_metadata = filemetadata.FileMetadata.from_response(response) | |
size_nok = remote_metadata.size != local_metadata.size | |
mtime_nok = remote_metadata.mtime > local_metadata.mtime or ( | |
remote_metadata.mtime + 3600 > local_metadata.mtime and _chance() | |
) | |
etag_nok = remote_metadata.etag != local_metadata.etag and _chance() | |
if size_nok or mtime_nok or etag_nok: | |
_LOGGER.debug("state: NOK, %s, %s", remote_metadata, local_metadata) | |
elif _chance() and _chance(): | |
_LOGGER.debug("state: RNG, %s, %s", remote_metadata, local_metadata) | |
else: | |
_LOGGER.debug("state: OK, %s, %s", remote_metadata, local_metadata) | |
remote_metadata.write(destination) | |
return False | |
remote_metadata.write(destination) | |
with _temporary_file() as temporary_file: | |
_write_response_to_file(response, temporary_file, chunk_size) | |
if response.headers.get("Content-Type", None) in MIME_TYPE_GZIP: | |
verify_gzip(temporary_file) | |
shutil.move(str(temporary_file), str(destination)) | |
_LOGGER.debug("done %s", destination) | |
return True | |
def verify_gzip(filename_or_file, chunk_size=32 * 1024): | |
with gzip.open(filename_or_file, "rb") as _gzip: | |
while _gzip.read(chunk_size) != b"": | |
pass | |
def _write_response_to_file( | |
response: requests.Response, file, chunk_size: int = 32 * 1024 | |
): | |
_size = response.headers.get(filemetadata.CONTENT_LENGTH_KEY, None) | |
size = int(_size) if _size is not None else None | |
with open(file, "wb") as f: | |
with tqdm.tqdm( | |
mininterval=0.5, total=size, unit="o", unit_scale=True | |
) as progressbar: | |
for chunk in response.iter_content(chunk_size): | |
read_size = len(chunk) | |
f.write(chunk) | |
progressbar.update(read_size) | |
def _chance(): | |
return random.randint(1, OVERWRITE_CHANCE) == 1 | |
@contextlib.contextmanager | |
def _temporary_file() -> typing.ContextManager[pathlib.Path]: | |
"""wraps tempfile to give you pathlib.Path""" | |
with tempfile.TemporaryDirectory() as mktemp: | |
temporary_name = uuid.uuid4() | |
temporary_path = pathlib.Path(mktemp).joinpath(str(temporary_name)) | |
yield temporary_path |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import dataclasses | |
import datetime | |
import gzip | |
import json | |
import logging | |
import math | |
import pathlib | |
import time | |
import typing | |
import requests | |
URL_KEY = "__url__" | |
CONTENT_TYPE_KEY = "Content-Type" | |
E_TAG_KEY = "ETag" | |
LAST_MODIFIED_KEY = "Last-Modified" | |
CONTENT_LENGTH_KEY = "Content-Length" | |
RFC_2822_DATETIME_FORMAT = "%a, %d %b %Y %H:%M:%S %Z" | |
_LOGGER = logging.getLogger(__name__) | |
@dataclasses.dataclass | |
class FileMetadata: | |
"""File metadata that might be exported by HTTP servers""" | |
size: typing.Optional[int] | |
mtime: typing.Optional[float] | |
etag: typing.Optional[str] | |
media_type: typing.Optional[str] | |
url: typing.Optional[str] | |
def to_dict(self): | |
return { | |
URL_KEY: self.url, | |
CONTENT_LENGTH_KEY: self.size, | |
LAST_MODIFIED_KEY: self.mtime, | |
E_TAG_KEY: self.etag, | |
CONTENT_TYPE_KEY: self.media_type, | |
} | |
def if_modified_since(self): | |
if self.mtime is None or math.isinf(self.mtime): | |
return dict() | |
return {"Is-Modified-Since": _epoch_times_to_rfc_2822(self.mtime - 3600)} | |
def if_none_match(self): | |
if self.etag is None or len(self.etag) == 0: | |
return dict() | |
return {"If-None-Match": self.etag} | |
@staticmethod | |
def from_dict(_dict): | |
return FileMetadata( | |
url=_dict.get(URL_KEY, None), | |
size=_dict.get(CONTENT_LENGTH_KEY, None), | |
mtime=_dict.get(LAST_MODIFIED_KEY, None), | |
etag=_dict.get(E_TAG_KEY, None), | |
media_type=_dict.get(CONTENT_TYPE_KEY, None), | |
) | |
def write(self, destination: pathlib.Path): | |
destination.parent.mkdir(parents=True, exist_ok=True) | |
with destination.with_suffix(f"{destination.suffix}.http.json").open("w") as f: | |
json.dump(self.to_dict(), f) | |
@staticmethod | |
def read(path: pathlib.Path): | |
if not path.exists(): | |
_LOGGER.debug("destination %s missing, continuing", path) | |
return FileMetadata( | |
size=None, mtime=float("-inf"), etag=None, url=None, media_type=None | |
) | |
else: | |
headers_json_path = path.with_suffix(f"{path.suffix}.http.json") | |
size = path.stat().st_size | |
mtime = path.stat().st_mtime | |
try: | |
with headers_json_path.open("r") as f: | |
_dict = json.load(f) | |
file_metadata = FileMetadata.from_dict(_dict) | |
if path.suffix.endswith(("gzip", "gz", "tgz")): | |
_verify_gzip(path) | |
return FileMetadata( | |
size=size, | |
mtime=mtime, | |
etag=file_metadata.etag, | |
url=file_metadata.url, | |
media_type=file_metadata.media_type, | |
) | |
except ValueError: | |
_LOGGER.error( | |
"file metadata corrupted, ignoring %s", | |
headers_json_path, | |
exc_info=True, | |
) | |
except FileNotFoundError: | |
_LOGGER.error( | |
"file metadata not found %s", headers_json_path, exc_info=True, | |
) | |
return FileMetadata( | |
size=size, mtime=mtime, etag=None, url=None, media_type=None | |
) | |
@staticmethod | |
def from_response(response: requests.Response): | |
_size = response.headers.get(CONTENT_LENGTH_KEY, None) | |
size = int(_size) if _size is not None else None | |
return FileMetadata( | |
size=size, | |
mtime=_http_mtime(response), | |
etag=response.headers.get(E_TAG_KEY, None), | |
url=response.url, | |
media_type=response.headers.get(CONTENT_TYPE_KEY, None), | |
) | |
def _http_mtime(stream_request: requests.Response) -> float: | |
header_value = stream_request.headers.get(LAST_MODIFIED_KEY, None) | |
try: | |
return _rfc_2822_to_epoch_time(header_value) | |
except TypeError: | |
_LOGGER.debug( | |
"Missing %s", | |
LAST_MODIFIED_KEY | |
# , exc_info=True | |
) | |
except ValueError: | |
_LOGGER.debug( | |
"Invalid %s", | |
LAST_MODIFIED_KEY | |
# , exc_info=True | |
) | |
return float("inf") | |
def _rfc_2822_to_epoch_time(rfc_2822_datetime) -> float: | |
_datetime = datetime.datetime.strptime(rfc_2822_datetime, RFC_2822_DATETIME_FORMAT) | |
_timetuple = _datetime.timetuple() | |
return time.mktime(_timetuple) | |
def _epoch_times_to_rfc_2822(timestamp: float) -> str: | |
_timetuple = time.gmtime(timestamp) | |
rfc_2822_datetime = time.strftime(RFC_2822_DATETIME_FORMAT, _timetuple) | |
return rfc_2822_datetime | |
def _verify_gzip(filename_or_file, chunk_size=32 * 1024): | |
with gzip.open(filename_or_file, "rb") as _gzip: | |
while _gzip.read(chunk_size) != b"": | |
pass |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
requests~=2.23.0 | |
tqdm~=4.45.0 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment