Skip to content

Instantly share code, notes, and snippets.

@empjustine
Forked from lucasg/dash-doggybag.py
Last active May 18, 2020 21:38
Show Gist options
  • Save empjustine/1aff3df95c05ed3334db0244707b6763 to your computer and use it in GitHub Desktop.
Save empjustine/1aff3df95c05ed3334db0244707b6763 to your computer and use it in GitHub Desktop.
Bulk downloader for dash docsets (official and user contributed)
# https://editorconfig.org/
root = true
[*]
indent_style = space
indent_size = 4
insert_final_newline = true
trim_trailing_whitespace = true
end_of_line = lf
charset = utf-8
*.tgz
venv/
.idea/
__pycache__/
#!/usr/bin/env python
import argparse
import contextlib
import fnmatch
import json
import logging
import pathlib
import random
import shutil
import tempfile
import typing
import urllib.parse
import xml.etree.ElementTree
import tqdm
import doggybag
_LOGGER = logging.getLogger(__name__)
CDN_CHOICES = [
"sanfrancisco",
"london",
"newyork",
"tokyo",
"frankfurt",
# "sydney", # 302 http://sydney.kapeli.com/ -> http://kapeli.com/
"singapore",
]
def download_dash_docsets(root: pathlib.Path, docset_pattern="*"):
"""
Dash docsets are located via dash feeds : https://github.com/Kapeli/feeds
url : https://github.com/Kapeli/feeds/archive/master.zip
"""
destination_directory = root.joinpath("feeds")
destination_directory.mkdir(parents=True, exist_ok=True)
feeds_zip = destination_directory.joinpath("feeds.zip")
doggybag.download(
url=urllib.parse.urlsplit("https://github.com/Kapeli/feeds/archive/master.zip"),
destination=feeds_zip,
expected_content_type=("application/zip",),
)
for url, destination in dash_docsets(feeds_zip, destination_directory):
if not fnmatch.fnmatch(destination.name, docset_pattern):
continue
doggybag.download(
url=url,
destination=destination,
strict_download=True,
expected_content_type=("application/x-tar",),
)
def dash_docsets(
feeds_zip: pathlib.Path, destination_directory: pathlib.Path
) -> typing.Iterator[typing.Tuple[urllib.parse.SplitResult, pathlib.Path]]:
with _temporary_directory() as temporary_directory:
shutil.unpack_archive(str(feeds_zip), str(temporary_directory))
# parse xml feeds and extract urls
with tqdm.tqdm(
sorted(
temporary_directory.joinpath("feeds-master").iterdir(),
key=lambda x: random.random(),
)
) as progressbar:
for extracted_temporary_feed_xml in progressbar:
url = _url_in_feed_xml(extracted_temporary_feed_xml)
destination = destination_directory.joinpath(
f"{extracted_temporary_feed_xml.stem}.tgz"
)
yield url, destination
def _url_in_feed_xml(extracted_temporary_feed_xml):
tree = xml.etree.ElementTree.parse(extracted_temporary_feed_xml)
root = tree.getroot()
url_tags = root.findall("url")
try:
url_tag = random.choice(url_tags)
url_text = url_tag.text
except IndexError:
raise IndexError("no urls found")
return urllib.parse.urlsplit(url_text)
def download_user_contributed_docsets(root: pathlib.Path, cdn=None, docset_pattern="*"):
"""
Dash docsets are located via dash feeds : https://github.com/Kapeli/Dash-User-Contributions
json file : f"http://{cdn}.kapeli.com/feeds/zzz/user_contributed/build/index.json"
"""
destination_directory = root.joinpath("feeds/zzz/user_contributed/build")
destination_directory.mkdir(parents=True, exist_ok=True)
feeds_json = destination_directory.joinpath("index.json")
feed_json_url = urllib.parse.urlsplit(
f"http://{cdn}.kapeli.com/feeds/zzz/user_contributed/build/index.json"
)
doggybag.download(
url=feed_json_url,
destination=feeds_json,
expected_content_type=("application/json",),
)
for archive_url, destination in user_contributed_docsets(
feeds_json, destination_directory
):
if not fnmatch.fnmatch(destination.name, docset_pattern):
continue
doggybag.download(
url=archive_url,
destination=destination,
strict_download=True,
expected_content_type=("application/x-tar",),
)
def user_contributed_docsets(
kapeli_feeds: pathlib.Path, destination_directory: pathlib.Path
) -> typing.Iterator[typing.Tuple[urllib.parse.SplitResult, pathlib.Path]]:
with kapeli_feeds.open("r") as f:
_kapeli_feeds = json.load(f)
docsets = _kapeli_feeds["docsets"]
iterable = docsets.items()
with tqdm.tqdm(sorted(iterable, key=lambda x: random.random())) as progressbar:
for docset_name, docset_metadata in progressbar:
cdn = random.choice(CDN_CHOICES)
# url format for packages that specify "specific_versions"
ignored = f"http://{cdn}.kapeli.com/feeds/zzz/user_contributed/build/{docset_name}/versions/{docset_metadata['version']}/{docset_metadata['archive']}"
archive_url = urllib.parse.urlsplit(
f"http://{cdn}.kapeli.com/feeds/zzz/user_contributed/build/{docset_name}/{docset_metadata['archive']}"
)
destination = destination_directory.joinpath(
docset_name, docset_metadata["archive"]
)
yield archive_url, destination
@contextlib.contextmanager
def _temporary_directory() -> typing.ContextManager[pathlib.Path]:
"""wraps tempfile to give you pathlib.Path"""
with tempfile.TemporaryDirectory() as mktemp:
yield pathlib.Path(mktemp)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="A downloader for Dash Docsets")
parser.add_argument(
"--dash", help="only download dash docsets", action="store_true"
)
parser.add_argument(
"--user-contrib", help="only download user contrib docsets", action="store_true"
)
parser.add_argument(
"-d",
"--docset",
help="only download a specifics docsets. This option support the glob pattern",
default="*",
)
parser.add_argument(
"-q", "--quiet", help="reduces output verbosity", action="store_true"
)
parser.add_argument(
"-o", "--output", help="change output directory", default=pathlib.Path.cwd(),
)
parser.add_argument(
"-c",
"--cdn",
help="choose cdn (random by default)",
# default=random.choice(CDN_CHOICES),
choices=CDN_CHOICES,
)
args = parser.parse_args()
if args.quiet:
logging.basicConfig(level=logging.INFO)
else:
logging.basicConfig(level=logging.DEBUG)
logging.getLogger("urllib3.connectionpool").setLevel(logging.INFO)
output = pathlib.Path(args.output)
output.mkdir(parents=True, exist_ok=True)
if not args.user_contrib:
download_dash_docsets(
root=output, docset_pattern=args.docset,
)
if not args.dash:
download_user_contributed_docsets(
root=output, cdn=args.cdn, docset_pattern=args.docset
)
import contextlib
import gzip
import logging
import pathlib
import random
import shutil
import tempfile
import typing
import urllib.parse
import uuid
import requests
import tqdm
import filemetadata
_LOGGER = logging.getLogger(__name__)
MIME_TYPE_GZIP = (
"application/gzip",
"application/x-tar",
)
SAFETY_OVERLAP_IN_SECONDS = 3600
OVERWRITE_CHANCE = 10 # one in X
def download(
url: urllib.parse.SplitResult,
destination: pathlib.Path,
strict_download: bool = False,
expected_content_type: typing.Optional[typing.Tuple[str]] = (),
chunk_size: int = 32 * 1024,
):
"""Download a file
Verifies if upstream file "Last-Modified" is older that current file.
Verifies if upstream file "ETag" is different from current file.
Downloads file first to a temporary location then move-replace current file in destination.
"""
_url = urllib.parse.urlunsplit(url)
local_metadata = filemetadata.FileMetadata.read(destination)
response = requests.get(
url=_url,
stream=True,
headers={
**local_metadata.if_modified_since(),
**local_metadata.if_none_match(),
},
allow_redirects=not strict_download,
)
status_code = response.status_code
if status_code == requests.codes.not_modified:
return True
if status_code != requests.codes.ok:
_LOGGER.warning("status_code [%d] : %s", status_code, response.headers)
if strict_download:
response.raise_for_status()
content_type = response.headers.get("Content-Type", None)
if content_type not in expected_content_type:
raise ValueError("wrong Content-Type", content_type, expected_content_type)
remote_metadata = filemetadata.FileMetadata.from_response(response)
size_nok = remote_metadata.size != local_metadata.size
mtime_nok = remote_metadata.mtime > local_metadata.mtime or (
remote_metadata.mtime + 3600 > local_metadata.mtime and _chance()
)
etag_nok = remote_metadata.etag != local_metadata.etag and _chance()
if size_nok or mtime_nok or etag_nok:
_LOGGER.debug("state: NOK, %s, %s", remote_metadata, local_metadata)
elif _chance() and _chance():
_LOGGER.debug("state: RNG, %s, %s", remote_metadata, local_metadata)
else:
_LOGGER.debug("state: OK, %s, %s", remote_metadata, local_metadata)
remote_metadata.write(destination)
return False
remote_metadata.write(destination)
with _temporary_file() as temporary_file:
_write_response_to_file(response, temporary_file, chunk_size)
if response.headers.get("Content-Type", None) in MIME_TYPE_GZIP:
verify_gzip(temporary_file)
shutil.move(str(temporary_file), str(destination))
_LOGGER.debug("done %s", destination)
return True
def verify_gzip(filename_or_file, chunk_size=32 * 1024):
with gzip.open(filename_or_file, "rb") as _gzip:
while _gzip.read(chunk_size) != b"":
pass
def _write_response_to_file(
response: requests.Response, file, chunk_size: int = 32 * 1024
):
_size = response.headers.get(filemetadata.CONTENT_LENGTH_KEY, None)
size = int(_size) if _size is not None else None
with open(file, "wb") as f:
with tqdm.tqdm(
mininterval=0.5, total=size, unit="o", unit_scale=True
) as progressbar:
for chunk in response.iter_content(chunk_size):
read_size = len(chunk)
f.write(chunk)
progressbar.update(read_size)
def _chance():
return random.randint(1, OVERWRITE_CHANCE) == 1
@contextlib.contextmanager
def _temporary_file() -> typing.ContextManager[pathlib.Path]:
"""wraps tempfile to give you pathlib.Path"""
with tempfile.TemporaryDirectory() as mktemp:
temporary_name = uuid.uuid4()
temporary_path = pathlib.Path(mktemp).joinpath(str(temporary_name))
yield temporary_path
import dataclasses
import datetime
import gzip
import json
import logging
import math
import pathlib
import time
import typing
import requests
URL_KEY = "__url__"
CONTENT_TYPE_KEY = "Content-Type"
E_TAG_KEY = "ETag"
LAST_MODIFIED_KEY = "Last-Modified"
CONTENT_LENGTH_KEY = "Content-Length"
RFC_2822_DATETIME_FORMAT = "%a, %d %b %Y %H:%M:%S %Z"
_LOGGER = logging.getLogger(__name__)
@dataclasses.dataclass
class FileMetadata:
"""File metadata that might be exported by HTTP servers"""
size: typing.Optional[int]
mtime: typing.Optional[float]
etag: typing.Optional[str]
media_type: typing.Optional[str]
url: typing.Optional[str]
def to_dict(self):
return {
URL_KEY: self.url,
CONTENT_LENGTH_KEY: self.size,
LAST_MODIFIED_KEY: self.mtime,
E_TAG_KEY: self.etag,
CONTENT_TYPE_KEY: self.media_type,
}
def if_modified_since(self):
if self.mtime is None or math.isinf(self.mtime):
return dict()
return {"Is-Modified-Since": _epoch_times_to_rfc_2822(self.mtime - 3600)}
def if_none_match(self):
if self.etag is None or len(self.etag) == 0:
return dict()
return {"If-None-Match": self.etag}
@staticmethod
def from_dict(_dict):
return FileMetadata(
url=_dict.get(URL_KEY, None),
size=_dict.get(CONTENT_LENGTH_KEY, None),
mtime=_dict.get(LAST_MODIFIED_KEY, None),
etag=_dict.get(E_TAG_KEY, None),
media_type=_dict.get(CONTENT_TYPE_KEY, None),
)
def write(self, destination: pathlib.Path):
destination.parent.mkdir(parents=True, exist_ok=True)
with destination.with_suffix(f"{destination.suffix}.http.json").open("w") as f:
json.dump(self.to_dict(), f)
@staticmethod
def read(path: pathlib.Path):
if not path.exists():
_LOGGER.debug("destination %s missing, continuing", path)
return FileMetadata(
size=None, mtime=float("-inf"), etag=None, url=None, media_type=None
)
else:
headers_json_path = path.with_suffix(f"{path.suffix}.http.json")
size = path.stat().st_size
mtime = path.stat().st_mtime
try:
with headers_json_path.open("r") as f:
_dict = json.load(f)
file_metadata = FileMetadata.from_dict(_dict)
if path.suffix.endswith(("gzip", "gz", "tgz")):
_verify_gzip(path)
return FileMetadata(
size=size,
mtime=mtime,
etag=file_metadata.etag,
url=file_metadata.url,
media_type=file_metadata.media_type,
)
except ValueError:
_LOGGER.error(
"file metadata corrupted, ignoring %s",
headers_json_path,
exc_info=True,
)
except FileNotFoundError:
_LOGGER.error(
"file metadata not found %s", headers_json_path, exc_info=True,
)
return FileMetadata(
size=size, mtime=mtime, etag=None, url=None, media_type=None
)
@staticmethod
def from_response(response: requests.Response):
_size = response.headers.get(CONTENT_LENGTH_KEY, None)
size = int(_size) if _size is not None else None
return FileMetadata(
size=size,
mtime=_http_mtime(response),
etag=response.headers.get(E_TAG_KEY, None),
url=response.url,
media_type=response.headers.get(CONTENT_TYPE_KEY, None),
)
def _http_mtime(stream_request: requests.Response) -> float:
header_value = stream_request.headers.get(LAST_MODIFIED_KEY, None)
try:
return _rfc_2822_to_epoch_time(header_value)
except TypeError:
_LOGGER.debug(
"Missing %s",
LAST_MODIFIED_KEY
# , exc_info=True
)
except ValueError:
_LOGGER.debug(
"Invalid %s",
LAST_MODIFIED_KEY
# , exc_info=True
)
return float("inf")
def _rfc_2822_to_epoch_time(rfc_2822_datetime) -> float:
_datetime = datetime.datetime.strptime(rfc_2822_datetime, RFC_2822_DATETIME_FORMAT)
_timetuple = _datetime.timetuple()
return time.mktime(_timetuple)
def _epoch_times_to_rfc_2822(timestamp: float) -> str:
_timetuple = time.gmtime(timestamp)
rfc_2822_datetime = time.strftime(RFC_2822_DATETIME_FORMAT, _timetuple)
return rfc_2822_datetime
def _verify_gzip(filename_or_file, chunk_size=32 * 1024):
with gzip.open(filename_or_file, "rb") as _gzip:
while _gzip.read(chunk_size) != b"":
pass
requests~=2.23.0
tqdm~=4.45.0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment