Last active
April 9, 2024 07:32
-
-
Save ChenyangGao/30caa9f5eb34a28fe75d2d64efc87b78 to your computer and use it in GitHub Desktop.
libgen (library genius) 工具集
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# encoding: utf-8 | |
"libgen.rs 搜索信息罗列" | |
__author__ = "ChenyangGao <https://chenyanggao.github.io>" | |
__version__ = (0, 0, 1) | |
__all__ = ["search", "info", "get_downlinks", "get_downlink_from_libgenli"] | |
if __name__ == "__main__": | |
from argparse import ArgumentParser, RawTextHelpFormatter | |
parser = ArgumentParser( | |
formatter_class=RawTextHelpFormatter, | |
description="libgen.rs 搜索信息罗列", | |
epilog=""" | |
library genius 是一个著名的电子书分享网站,它有一些镜像站 | |
- http://gen.lib.rus.ec | |
- https://libgen.is | |
- http://libgen.is | |
- http://libgen.rs | |
- https://libgen.rs | |
- https://libgen.st | |
- http://libgen.st | |
参考:[Library Genesis Guide](https://librarygenesis.net/) | |
其它电子书网站推荐 | |
- https://libgen.li | |
- https://annas-archive.org | |
""" | |
) | |
parser.add_argument("url", nargs="?", help="url 链接,直接从浏览器复制过来即可") | |
parser.add_argument("-d", "--detail-level", type=int, default=0, help="""输出完整信息的级别 | |
- 0 => 【默认值】只输出 md5 | |
- 1 => 输出 json 格式,只包含从搜索列表得到的基本信息 | |
- 2 => 输出 json 格式,在 1 的基础上,增加详细信息 | |
- 3 => 输出 json 格式,在 1 的基础上,增加下载链接 | |
- 4 => 输出 json 格式,在 1 的基础上,增加详细信息和下载链接 | |
""") | |
parser.add_argument("-b", "--begin", default=1, type=int, help="开始于序号,默认值 1,从 1 开始编号") | |
parser.add_argument("-e", "--end", default=0, type=int, help="结束于序号(包含),默认值 0,小于等于 0 时不限") | |
parser.add_argument("-s", "--select", help="提供一个表达式(会注入一个变量 item,是一个 dict),用于筛选条目") | |
parser.add_argument("-m", "--max-workers", default=1, type=int, help="多线程并发数,默认为 1,小于等于 0 时,则自动确定合适的并发数") | |
parser.add_argument("-v", "--version", action="store_true", help="输出版本号") | |
args = parser.parse_args() | |
if args.version: | |
print(".".join(map(str, __version__))) | |
raise SystemExit(0) | |
if not args.url: | |
parser.parse_args(["-h"]) | |
from sys import version_info | |
if version_info < (3, 11): | |
raise SystemExit("Python 版本过低,请升级到至少 3.11") | |
try: | |
from lxml.etree import _ElementTree as ElementTree | |
from lxml.html import parse, HtmlElement | |
except ImportError: | |
from sys import executable | |
from subprocess import run | |
run([executable, "-m", "pip", "install", "-U", "lxml"], check=True) | |
from lxml.etree import _ElementTree as ElementTree | |
from lxml.html import parse, HtmlElement | |
from collections.abc import Callable, Iterator | |
from functools import partial, update_wrapper | |
from gzip import GzipFile | |
from itertools import count | |
from re import compile as re_compile | |
from typing import Any, Optional | |
from urllib.parse import parse_qsl, urlencode, urljoin, urlparse, urlunparse | |
from urllib.request import urlopen, Request | |
CREB_LIBGEN_GET_search = re_compile(b'(?<=")get\.php\?md5=[^"]+').search | |
def retry( | |
func: Optional[Callable] = None, | |
/, | |
retry_times: int = 3, | |
exceptions: type[BaseException] | tuple[type[BaseException], ...] = Exception, | |
do_between: Optional[Callable[[int, BaseException], Any]] = None, | |
) -> Callable: | |
if func is None: | |
return partial( | |
retry, | |
retry_times=retry_times, | |
exceptions=exceptions, | |
) | |
if retry_times == 0: | |
return func | |
def wrapper(*args, **kwds): | |
excs: List[BaseException] = [] | |
if retry_times < 0: | |
it = count() | |
else: | |
it = range(retry_times + 1) | |
prev_exc = None | |
for i in it: | |
if i and do_between: | |
do_between(i, prev_exc) | |
try: | |
return func(*args, **kwds) | |
except exceptions as exc: | |
exc.__prev__ = prev_exc | |
prev_exc = exc | |
excs.append(exc) | |
except BaseException as exc: | |
exc.__prev__ = prev_exc | |
raise exc | |
raise BaseExceptionGroup("too many retries", tuple(excs)) | |
return wrapper | |
@retry(retry_times=5) | |
def fecth_as_etree(url: str) -> ElementTree: | |
with urlopen(Request(url, headers={"Accept-Encoding": "gzip"}), timeout=10) as resp: | |
return parse(GzipFile(fileobj=resp)) | |
def extract_libgen_title(td: HtmlElement) -> dict: | |
d: dict = {} | |
if len(td) == 1: | |
el_title = td[0] | |
else: | |
d["series"] = td[0].text_content().replace("\xa0", " ") | |
el_title = td[2] | |
d["title"] = el_title.text.strip() | |
url = d["url"] = el_title.attrib["href"] | |
d["md5"] = url[-32:] | |
if len(el_title) == 1: | |
d["edition"] = el_title[0].text_content().replace("\xa0", " ") | |
elif len(el_title) == 2: | |
d["isbn"] = el_title[1].text_content().replace("\xa0", " ") | |
elif len(el_title) >= 3: | |
d["edition"] = el_title[0].text_content().replace("\xa0", " ") | |
d["isbn"] = el_title[2].text_content().replace("\xa0", " ") | |
return d | |
def extract_libgen_item(tr: HtmlElement) -> dict: | |
return { | |
"id": int(tr[0].text), | |
"authors": [a.text for a in tr[1].xpath(".//a")], | |
**extract_libgen_title(tr[2]), | |
"publisher": tr[3].text, | |
"year": tr[4].text, | |
"pages": tr[5].text, | |
"language": tr[6].text, | |
"size": tr[7].text, | |
"extension": tr[8].text, | |
"mirrors": [td[0].attrib["href"] for td in tr[9:-1]], | |
} | |
def extract_fiction_title(td: HtmlElement) -> dict: | |
d: dict = {} | |
el_title = td[0][0] | |
d["title"] = el_title.text.strip() | |
url = d["url"] = el_title.attrib["href"] | |
d["md5"] = url[-32:] | |
if len(el_title): | |
d["edition"] = el_title[0].text | |
if len(td) > 1: | |
d["isbn"] = td[1].text | |
return d | |
def extract_fiction_item(tr: HtmlElement) -> dict: | |
return { | |
"authors": [a.text for a in tr[0].xpath('.//a')], | |
"series": tr[1].text, | |
**extract_fiction_title(tr[2]), | |
"language": tr[3].text, | |
"upload_datetime": tr[4].attrib["title"][-19:], | |
"size": tr[4].text.replace("\xa0", " "), | |
"mirrors": [a.attrib["href"] for a in tr[5].xpath('.//a')], | |
} | |
def search(url: str) -> Iterator[dict]: | |
"搜索文件" | |
if url.startswith("/"): | |
url = "https://libgen.rs" + url | |
elif not url.startswith(("http://", "https://")): | |
if url.startswith("?"): | |
url = "https://libgen.rs/search.php" + url | |
elif parse_qsl(url): | |
url = "https://libgen.rs/search.php?" + url | |
else: | |
url = "https://libgen.rs/search.php?q=" + "+".join(url.replace("&", "%26").split()) | |
urlp = urlparse(url) | |
params: dict = dict(parse_qsl(urlp.query)) | |
is_fiction = urlp.path.startswith("/fiction") | |
if is_fiction: | |
type = "fiction" | |
extract_item = extract_fiction_item | |
res = 25 | |
else: | |
type = "libgen" | |
extract_item = extract_libgen_item | |
res = int(params.get("res", 25)) | |
if res not in (25, 50, 100): | |
res = 25 | |
params["res"] = res | |
page = int(params.get("page", 1)) | |
if page <= 0: | |
page = 1 | |
params["page"] = page | |
url = urlunparse(urlp._replace(query="")) | |
ls_tr: list[HtmlElement] | |
while True: | |
etree = fecth_as_etree(url+"?"+urlencode(params)) | |
if is_fiction: | |
ls_tr = etree.xpath("body/table/tbody/tr") # type: ignore | |
else: | |
ls_tr = etree.xpath("body/table[3]/tr[position()>1]") # type: ignore | |
for tr in ls_tr: | |
item = extract_item(tr) | |
item["url"] = urljoin(url, item["url"]) | |
item["type"] = type | |
yield item | |
if len(ls_tr) < res: | |
break | |
params["page"] += 1 | |
def info(md5: str, is_fiction: bool = False) -> dict: | |
"查询文件信息" | |
def extract_field_text(el): | |
return el.text_content().rstrip(": ").replace("\xa0", " ") | |
def extract_el_a(el, callback=None): | |
info = { | |
"href": urljoin(url, el.attrib["href"]), | |
"text": " ".join(el.itertext()) | |
} | |
if callback: | |
info.update(callback(el)) | |
return info | |
def extract_nested_el_table(el): | |
return dict(zip( | |
filter(None, map(extract_field_text, el[0])), | |
map(extract_field_value, el[1]), | |
)) | |
def extract_field_value(el): | |
if len(el): | |
sel = el[0] | |
if sel.tag == "ul": | |
return [extract_el_a(a) for a in sel.xpath(".//a")] | |
elif sel.tag in "b" and len(sel) and sel[0].tag == "a": | |
return extract_el_a(sel[0]) | |
elif sel.tag == "a": | |
return [extract_el_a(a) for a in el.xpath(".//a")] | |
elif sel.tag == "table": | |
return extract_nested_el_table(sel) | |
return el.text_content().strip() | |
def extract_el_a_input_filename(el): | |
el = el.getparent().find("input") | |
if el is None: | |
return [] | |
return [("filename", el.attrib.get("value", ""))] | |
if is_fiction: | |
url = f"https://libgen.rs/fiction/{md5}" | |
else: | |
url = f"https://libgen.rs/book/index.php?md5={md5}" | |
info: dict = {"url": url} | |
etree = fecth_as_etree(url) | |
if is_fiction: | |
div: HtmlElement = etree.find('.//div[@class="record_side"]') | |
info["cover_url"] = urljoin(url, div.find('img').attrib["src"]) | |
info["download_page_url"] = f"https://library.lol/fiction/{md5}" | |
info["hashes"] = dict(zip( | |
div.xpath('./table[@class="hashes"]/tr/th/text()'), | |
div.xpath('./table[@class="hashes"]/tr/td/text()'), | |
)) | |
detail = info["detail"] = {} | |
table = div.getnext() | |
detail.update(zip( | |
map( | |
extract_field_text, | |
table.xpath("tr/td[position() mod 2 = 1]") | |
), | |
map( | |
extract_field_value, | |
table.xpath("tr/td[position() mod 2 = 0]") | |
), | |
)) | |
else: | |
table = etree.find(".//table") | |
td = table[1][0] | |
info["cover_url"] = urljoin(url, td.find("./a/img").attrib["src"]) | |
info["download_page_url"] = urljoin(url, td.find("./a").attrib["href"]) | |
info["hashes"] = dict(zip( | |
td.xpath("./table/tr/th/text()"), | |
td.xpath("./table/tr/td/text()"), | |
)) | |
detail = info["detail"] = {} | |
detail[extract_field_text(table[1][1])] = table[1][2].find(".//a").text | |
detail[extract_field_text(table[1][3][0])] = table[1][3][0].tail | |
detail.update(zip( | |
map( | |
extract_field_text, | |
table.xpath( | |
"tr[position()>2 and position()<18]/td[position() mod 2 = 1]") | |
), | |
map( | |
extract_field_value, | |
table.xpath( | |
"tr[position()>2 and position()<18]/td[position() mod 2 = 0]") | |
), | |
)) | |
detail[extract_field_text(table[17][0])] = [ | |
extract_el_a(el, extract_el_a_input_filename) | |
for el in table[17][1][0].xpath(".//td/a") | |
] | |
detail["introduction"] = "\n".join(table[18].itertext()) | |
detail["toc"] = "\n".join(table[19].itertext()) | |
return info | |
def get_downlinks(md5: str, is_fiction: bool = False) -> list[str]: | |
"获取下载链接列表" | |
type = "fiction" if is_fiction else "main" | |
url = f"https://library.lol/{type}/{md5}" | |
etree = fecth_as_etree(url) | |
return etree.xpath('//div[@id="download"]//*[self::h2 or self::li]/a[@href]/@href') # type: ignore | |
def get_downlink_from_libgenli(md5: str) -> str: | |
"从 https://libgen.li 获取下载链接" | |
url = f"https://libgen.li/ads.php?md5={md5}" | |
with urlopen(Request(url, headers={"User-Agent": ""})) as resp: | |
link = CREB_LIBGEN_GET_search(resp.read())[0] # type: ignore | |
return "https://libgen.li/" + link.decode() | |
if __name__ == "__main__": | |
from os import ( | |
close as fclose, open as fopen, ctermid, getenv, get_terminal_size, | |
terminal_size, O_RDONLY, | |
) | |
from platform import system | |
from sys import stderr | |
from threading import RLock | |
from time import perf_counter | |
# Reference: | |
# - [How to get Linux console window width in Python](https://stackoverflow.com/questions/566746/how-to-get-linux-console-window-width-in-python) | |
# - [How do I find the width & height of a terminal window](https://stackoverflow.com/questions/263890/how-do-i-find-the-width-height-of-a-terminal-window) | |
IS_WIN = system() == "Windows" | |
def environ_GWINSZ() -> terminal_size: | |
# COLUMNS, LINES are the working values | |
return terminal_size(int(getenv(var, 0)) for var in ("COLUMNS", "LINES")) | |
def os_GWINSZ() -> terminal_size: | |
# Reference: | |
# - [os.get_terminal_size](https://docs.python.org/3/library/os.html#os.get_terminal_size) | |
# - [shutil.get_terminal_size](https://docs.python.org/3/library/shutil.html#shutil.get_terminal_size) | |
try: | |
return get_terminal_size() | |
except (AttributeError, ValueError, OSError): | |
# fd is nonexists, closed, detached, or not a terminal, or | |
# os.get_terminal_size() is unsupported | |
# Tips: If fd is nonexists, closed, detached, or not a terminal, | |
# then it may raise the following exception | |
# OSError: [Errno 25] Inappropriate ioctl for device | |
return terminal_size((0, 0)) | |
def ioctl_GWINSZ(fd: int = stderr.fileno()) -> terminal_size: | |
try: | |
from fcntl import ioctl | |
from struct import unpack | |
from termios import TIOCGWINSZ | |
rows, columns, hp, wp = unpack('hhhh', ioctl(fd, TIOCGWINSZ, b'\0'*8)) | |
return terminal_size((columns, rows)) | |
except (ImportError, AttributeError, ValueError, OSError): | |
# fd is nonexists, closed, detached, or not a terminal, or | |
# related modules are unsupported | |
# Tips: If fd is nonexists, closed, detached, or not a terminal, | |
# then it may raise the following exception | |
# OSError: [Errno 25] Inappropriate ioctl for device | |
return terminal_size((0, 0)) | |
def ioctl_GWINSZ_auto() -> terminal_size: | |
for size in map(ioctl_GWINSZ, range(3)): | |
if size != (0, 0): | |
return size | |
try: | |
fd = fopen(ctermid(), O_RDONLY) | |
try: | |
return ioctl_GWINSZ(fd) | |
finally: | |
fclose(fd) | |
except: | |
return terminal_size((0, 0)) | |
def stty_GWINSZ() -> terminal_size: | |
import subprocess | |
try: | |
rows, columns = subprocess.check_output(['stty', 'size']).split() | |
return terminal_size((int(columns), int(rows))) | |
except: | |
# If it is working on a script that expects redirected input on stdin, | |
# and stty would complain that "stdin isn't a terminal" in that case. | |
try: | |
with open('/dev/tty') as tty: | |
rows, columns = subprocess.check_output( | |
['stty', 'size'], stdin=tty).split() | |
return terminal_size((int(columns), int(rows))) | |
except: | |
# maybe stty is unsupported | |
return terminal_size((0, 0)) | |
def tput_GWINSZ() -> terminal_size: | |
try: | |
import subprocess | |
rows = int(subprocess.check_output(['tput', 'lines'])) | |
columns = int(subprocess.check_output(['tput', 'cols'])) | |
return terminal_size((columns, rows)) | |
except: | |
# maybe tput is unsupported | |
return terminal_size((0, 0)) | |
def curses_GWINSZ() -> terminal_size: | |
try: | |
import curses | |
rows, columns = curses.initscr().getmaxyx() | |
return terminal_size((columns, rows)) | |
except: | |
return terminal_size((0, 0)) | |
def windows_GWINSZ() -> terminal_size: | |
if not IS_WIN: | |
return terminal_size((0, 0)) | |
try: | |
from ctypes import windll, create_string_buffer # type: ignore | |
# stdin handle is -10 | |
# stdout handle is -11 | |
# stderr handle is -12 | |
h = windll.kernel32.GetStdHandle(-12) | |
csbi = create_string_buffer(22) | |
res = windll.kernel32.GetConsoleScreenBufferInfo(h, csbi) | |
except: | |
return terminal_size((0, 0)) | |
if res: | |
import struct | |
(bufx, bufy, curx, cury, wattr, | |
left, top, right, bottom, maxx, maxy) = struct.unpack("hhhhHhhhhhh", csbi.raw) | |
sizex = right - left + 1 | |
sizey = bottom - top + 1 | |
return terminal_size((sizey, sizex)) | |
else: | |
return terminal_size((0, 0)) | |
def get_columns_size(): | |
for func in (os_GWINSZ, environ_GWINSZ, ioctl_GWINSZ): | |
columns = func().columns # type: ignore | |
if columns > 0: | |
return columns | |
# Reference: | |
# - [tqdm](https://pypi.org/project/tqdm/) | |
# - [rich](https://pypi.org/project/rich/) | |
# - [blessings](https://pypi.org/project/blessings/) | |
# - [colorama](https://pypi.org/project/colorama/) | |
# - [colored](https://pypi.org/project/colored/) | |
class ProgressInfo: | |
def __init__(self): | |
self._total: int = 0 | |
self._success: int = 0 | |
self._failed: int = 0 | |
self._str: str = '' | |
self._size: int = 0 | |
self._current_ts = self._start_ts = perf_counter() | |
self._lock = RLock() | |
@property | |
def col_total(self) -> str: | |
return f'🤔 Total: {self._total}' | |
@property | |
def col_success(self) -> str: | |
return f'😂 Success: {self._success}' | |
@property | |
def col_failed(self) -> str: | |
return f'😭 Failed: {self._failed}' | |
@property | |
def col_speed(self) -> str: | |
elapsed = self._current_ts - self._start_ts | |
if elapsed == 0: | |
speed = 'nan' | |
else: | |
speed = format(self._total / elapsed, '.6f') | |
return f'🚀 Speed: {speed} i/s' | |
@property | |
def col_elapsed(self) -> str: | |
return f'🕙 Elapsed: {self._current_ts - self._start_ts:.6f} s' | |
@property | |
def col_success_rate(self) -> str: | |
if self._total: | |
rate = self._success * 100 / self._total | |
else: | |
rate = 100 | |
return f'💯 Succeess Rate: {rate:.2f}%' | |
def tostring(self) -> tuple[int, str]: | |
columns: int = get_columns_size() | |
if not columns: | |
return 0, "" | |
cols: list = [] | |
col_expand_size: int = 0 | |
while True: | |
# ' ' takes up 1 columns | |
columns -= 1 | |
if columns <= 0: | |
break | |
col = self.col_failed | |
# '😭' takes up 2 columns, 1 extra | |
columns -= len(col) + 1 | |
if columns < 0: | |
break | |
cols.append(col) | |
col_expand_size += 1 | |
# ' | ' takes up 3 columns | |
columns -= 3 | |
if columns <= 0: | |
break | |
col = self.col_success | |
# '😂' takes up 2 columns, 1 extra | |
columns -= len(col) + 1 | |
if columns < 0: | |
break | |
cols.insert(0, col) | |
col_expand_size += 1 | |
# ' | ' takes up 3 columns | |
columns -= 3 | |
if columns <= 0: | |
break | |
col = self.col_speed | |
# '🚀' takes up 2 columns, 1 extra | |
columns -= len(col) + 1 | |
if columns < 0: | |
break | |
cols.append(col) | |
col_expand_size += 1 | |
# ' | ' takes up 3 columns | |
columns -= 3 | |
if columns <= 0: | |
break | |
col = self.col_success_rate | |
# '💯' takes up 2 columns, 1 extra | |
columns -= len(col) + 1 | |
if columns < 0: | |
break | |
cols.insert(2, col) | |
col_expand_size += 1 | |
# ' | ' takes up 3 columns | |
columns -= 3 | |
if columns <= 0: | |
break | |
col = self.col_total | |
# '🤔' takes up 2 columns, 1 extra | |
columns -= len(col) + 1 | |
if columns < 0: | |
break | |
cols.insert(0, col) | |
col_expand_size += 1 | |
# ' | ' takes up 3 columns | |
columns -= 3 | |
if columns <= 0: | |
break | |
col = self.col_elapsed | |
# '🕙' takes up 2 columns, 1 extra | |
columns -= len(col) + 1 | |
if columns < 0: | |
break | |
cols.append(col) | |
col_expand_size += 1 | |
break | |
s = f' %s\r' % ' | '.join(cols) | |
# '\r' takes up 0 columns, -1 extra | |
return len(s) - 1 + col_expand_size, s | |
def update(self): | |
with self._lock: | |
self.clear() | |
self._current_ts = perf_counter() | |
self._size, self._str = self.tostring() | |
self.output() | |
def inc_success(self): | |
with self._lock: | |
self._success += 1 | |
self._total += 1 | |
self.update() | |
def inc_failed(self): | |
with self._lock: | |
self._failed += 1 | |
self._total += 1 | |
self.update() | |
def clear(self): | |
if self._size: | |
with self._lock: | |
stderr.write(' '*self._size) | |
#stderr.write('\b'*self._size) | |
stderr.write('\r') | |
stderr.flush() | |
def output(self): | |
with self._lock: | |
stderr.write(self._str) | |
stderr.flush() | |
def pure_print(self, *args, **kwds) -> None: | |
kwds["flush"] = True | |
with self._lock: | |
self.clear() | |
print(*args, **kwds) | |
self._size = 0 | |
def print(self, *args, **kwds) -> None: | |
with self._lock: | |
self.pure_print(*args, **kwds) | |
self.output() | |
from itertools import islice | |
from json import dumps | |
begin = args.begin | |
end = args.end | |
max_workers = args.max_workers | |
select = args.select | |
if select: | |
select = eval("lambda item:" + select) | |
p = ProgressInfo() | |
detail_level = args.detail_level | |
if detail_level == 0: | |
def make_output(item): | |
return item["md5"] | |
p.inc_success() | |
elif detail_level == 1: | |
def make_output(item): | |
return dumps(item, ensure_ascii=False) | |
elif detail_level == 2: | |
def make_output(item): | |
item["detail"] = info(md5=item["md5"], is_fiction=item["type"]=="fiction") | |
return dumps(item, ensure_ascii=False) | |
elif detail_level == 3: | |
def make_output(item): | |
item["download_links"] = get_downlinks(md5=item["md5"], is_fiction=item["type"]=="fiction") | |
return dumps(item, ensure_ascii=False) | |
else: | |
def make_output(item): | |
item["detail"] = info(md5=item["md5"], is_fiction=item["type"]=="fiction") | |
item["download_links"] = get_downlinks(md5=item["md5"], is_fiction=item["type"]=="fiction") | |
return dumps(item, ensure_ascii=False) | |
def output(item): | |
try: | |
p.pure_print(make_output(item)) | |
p.inc_success() | |
except BaseException as exc: | |
p.pure_print("\x1b[38;5;1m\x1b[1m[FAILED]\x1b[0m", exc, file=stderr) | |
p.inc_failed() | |
try: | |
it = search(args.url) | |
if begin <= 1: | |
if end > 0: | |
it = islice(it, end) | |
elif begin <= end: | |
it = islice(it, begin-1, end) | |
from concurrent.futures import ThreadPoolExecutor | |
e = ThreadPoolExecutor(None if max_workers <= 0 else max_workers) | |
try: | |
for item in filter(select, it): | |
e.submit(output, item) | |
with e: | |
pass | |
finally: | |
e.shutdown(wait=False, cancel_futures=True) | |
except BrokenPipeError: | |
stderr.close() | |
except KeyboardInterrupt: | |
pass | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# encoding: utf-8 | |
"libgen.rs 种子获取" | |
__author__ = "ChenyangGao <https://chenyanggao.github.io>" | |
__all__ = ["libgen_torrents", "libgen_plus_torrents"] | |
__version__ = (0, 0, 1) | |
if __name__ == "__main__": | |
from argparse import ArgumentParser, RawTextHelpFormatter | |
parser = ArgumentParser(description="libgen.rs 种子获取", formatter_class=RawTextHelpFormatter) | |
parser.add_argument("-t", "--type", choices=(1, 2, 3), default=1, type=int, help="""类型 | |
0. All | |
1. Non-fiction / Sci-tech | |
2. Fiction | |
3. Scientific articles | |
""") | |
parser.add_argument("-dl", "--download", default=0, type=int, help="下载文件线程数,0(默认值)不下载,小于 0 时自动确定线程数") | |
parser.add_argument("-d", "--detail", action="store_true", help="输出完整信息,json 格式") | |
parser.add_argument("-s", "--select", help="提供一个表达式(会注入一个变量 item,是一个 namedtuple),用于筛选条目") | |
args = parser.parse_args() | |
from concurrent.futures import ThreadPoolExecutor | |
from enum import Enum | |
from functools import update_wrapper | |
from posixpath import join as joinpath | |
from re import compile as re_compile | |
from typing import NamedTuple | |
from urllib.request import urlopen | |
findall_tr = re_compile("<tr[^>]*>.+?</tr>").findall | |
findall_td = re_compile("<td[^>]*>.+?</td>").findall | |
search_href = re_compile('(?<=href=")[^"]+').search | |
search_text = re_compile("(?<=>) *[^> ][^>]*(?=<)").search | |
class LibgenTorrentInfo(NamedTuple): | |
url: str | |
name: str | |
last_modified: str | |
size: str | |
def __str__(self, /) -> str: | |
return self.url | |
geturl = __str__ | |
def ensure_enum(cls, val): | |
if isinstance(val, cls): | |
return val | |
if isinstance(val, str): | |
try: | |
return cls[val] | |
except KeyError: | |
pass | |
return cls(val) | |
class LibgenType(Enum): | |
all = 0 | |
libgen = nonfiction = 1 # Non-fiction / Sci-tech | |
fiction = 2 # Fiction | |
scimag = 3 # Scientific articles | |
class LibgenPlusType(Enum): | |
all = 0 | |
libgen = nonfiction = 1 # libgen | |
fiction = 2 # fiction | |
scimag = 3 # scimag | |
comics = 4 # comics | |
internet_archive = 5 # internet_archive | |
isbndb = 6 # isbndb | |
magazines = 7 # magazines | |
pilimi = 8 # pilimi-zlib-all | |
worldcat = 9 # worldcat | |
def _libegn_torrent_iter(url): | |
with urlopen(url) as resp: | |
html = resp.read().decode() | |
for tr in findall_tr(html)[3:-1]: | |
td_name, td_mtime, td_size = findall_td(tr)[-4:-1] | |
name = search_href(td_name)[0] | |
last_modified = search_text(td_mtime) | |
if last_modified: | |
last_modified = last_modified[0].strip() | |
size = search_text(td_size) | |
if size: | |
size = size[0].strip() | |
link = joinpath(url, name) | |
if size == "-": | |
yield from _libegn_torrent_iter(link) | |
else: | |
yield LibgenTorrentInfo(link, name, last_modified, size) | |
def _gen_startup(func, /): | |
def wrapper(*args, **kwargs): | |
gen = func(*args, **kwargs) | |
next(gen) | |
return gen | |
return update_wrapper(wrapper, func) | |
@_gen_startup | |
def _download_torrent_gen(max_workers=0): | |
stopped = False | |
def download(url, path): | |
while not stopped: | |
try: | |
with urlopen(url, timeout=5) as fsrc: | |
fsrc_read = fsrc.read | |
with open(path, "wb") as fdst: | |
fdst_write = fdst.write | |
while not stopped: | |
buf = fsrc_read(1 << 16) | |
if not buf: | |
break | |
fdst_write(buf) | |
break | |
except Exception as e: | |
if stopped: | |
return | |
print(f"retrying {url!r} <= {type(e).__qualname__}: {e}") | |
print("downloaded:", path) | |
try: | |
if max_workers == 1: | |
while True: | |
torrent = yield | |
download(torrent.url, torrent.name) | |
else: | |
executor = ThreadPoolExecutor(None if max_workers <= 0 else max_workers) | |
try: | |
while True: | |
torrent = yield | |
executor.submit(download, torrent.url, torrent.name) | |
except KeyboardInterrupt: | |
pass | |
except GeneratorExit: | |
executor.shutdown(wait=True) | |
finally: | |
executor.shutdown(wait=False, cancel_futures=True) | |
finally: | |
stopped = True | |
def libgen_torrents(type=1, download=False, predicate=None, max_workers=0): | |
"罗列和下载 libgen.rs 上面的种子" | |
type = ensure_enum(LibgenType, type) | |
if type is LibgenType.all: | |
yield from libgen_torrents( | |
1, download=download, predicate=predicate, max_workers=max_workers) | |
yield from libgen_torrents( | |
2, download=download, predicate=predicate, max_workers=max_workers) | |
yield from libgen_torrents( | |
3, download=download, predicate=predicate, max_workers=max_workers) | |
else: | |
if type is LibgenType.libgen: | |
url = "http://libgen.rs/repository_torrent/" | |
elif type is LibgenType.fiction: | |
url = "https://libgen.rs/fiction/repository_torrent/" | |
elif type is LibgenType.scimag: | |
url = "http://libgen.rs/scimag/repository_torrent/" | |
torrents = _libegn_torrent_iter(url) | |
if predicate: | |
torrents = filter(predicate, torrents) | |
if download: | |
it = _download_torrent_gen(max_workers) | |
try: | |
for torrent in torrents: | |
yield torrent | |
it.send(torrent) | |
except KeyboardInterrupt: | |
it.throw(KeyboardInterrupt) | |
raise | |
finally: | |
it.close() | |
else: | |
yield from torrents | |
def libgen_plus_torrents(type=1, download=False, predicate=None, max_workers=0): | |
"罗列和下载 libgen.li 上面的种子" | |
type = ensure_enum(LibgenPlusType, type) | |
if type is LibgenPlusType.all: | |
url = "http://libgen.li/torrents/" | |
elif type is LibgenPlusType.libgen: | |
url = "http://libgen.li/torrents/libgen/" | |
elif type is LibgenPlusType.fiction: | |
url = "http://libgen.li/torrents/fiction/" | |
elif type is LibgenPlusType.scimag: | |
url = "http://libgen.li/torrents/scimag/" | |
elif type is LibgenPlusType.comics: | |
url = "http://libgen.li/torrents/comics/" | |
elif type is LibgenPlusType.internet_archive: | |
url = "http://libgen.li/torrents/internet_archive/" | |
elif type is LibgenPlusType.isbndb: | |
url = "http://libgen.li/torrents/isbndb/" | |
elif type is LibgenPlusType.magazines: | |
url = "http://libgen.li/torrents/magazines/" | |
elif type is LibgenPlusType.pilimi: | |
url = "http://libgen.li/torrents/pilimi-zlib-all/" | |
elif type is LibgenPlusType.worldcat: | |
url = "http://libgen.li/torrents/worldcat/" | |
torrents = _libegn_torrent_iter(url) | |
if predicate: | |
torrents = filter(predicate, torrents) | |
if download: | |
it = _download_torrent_gen(max_workers) | |
try: | |
for torrent in torrents: | |
yield torrent | |
it.send(torrent) | |
except KeyboardInterrupt: | |
it.throw(KeyboardInterrupt) | |
raise | |
finally: | |
it.close() | |
else: | |
yield from torrents | |
if __name__ == "__main__": | |
from json import dumps | |
max_workers = args.download | |
download = max_workers != 0 | |
show_detail = args.detail | |
select = args.select | |
if select: | |
select = eval("lambda item:" + select) | |
try: | |
for item in libgen_torrents( | |
args.type, | |
download=download, | |
max_workers=max_workers, | |
predicate=select, | |
): | |
if show_detail: | |
print(dumps(item._asdict(), ensure_ascii=False), flush=True) | |
else: | |
print(item, flush=True) | |
except BrokenPipeError: | |
stderr.close() | |
except KeyboardInterrupt: | |
pass | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment