Last active
June 1, 2024 12:50
-
-
Save ChenyangGao/b7d2567ca6790b947622514232ca19b9 to your computer and use it in GitHub Desktop.
采集盐虎的电子书网站的文章 https://salttiger.com
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# coding: utf-8 | |
"salttiger.com 文章采集" | |
__version__ = (0, 0, 3) | |
__author__ = "ChenyangGao <https://github.com/ChenyangGao>" | |
__all__ = [ | |
"get_archive_list", "get_archive_detail", "update_archives", "update_json_db", | |
"update_sqlite_db", "sqlite_to_json", "json_to_sqlite" | |
] | |
if __name__ == "__main__": | |
from argparse import ArgumentParser, RawTextHelpFormatter | |
parser = ArgumentParser(description=" salttiger.com 文章采集", formatter_class=RawTextHelpFormatter) | |
parser.add_argument( | |
"db_path", nargs="?", default="salttiger.db", | |
help="数据库文件,只支持 .json 和 .db (sqlite) 后缀,默认值:salttiger.db", | |
) | |
parser.add_argument("-b", "--begin-date", help="开始日期") | |
parser.add_argument("-e", "--end-date", help="结束日期(含)") | |
parser.add_argument("-l", "--list-files", action="store_true", help="(从 ed2k 或者 百度网盘 链接中)获取文件列表,但已经采集的会被跳过") | |
parser.add_argument("-m", "--max-workers", default=1, type=int, help="最多并发线程数,默认值是 1,小于或等于 0 时自动确定合适的值") | |
parser.add_argument("-c", "--clear-files-first", action="store_true", help="清空文件列表,这样就可以重新采集") | |
parser.add_argument("-u", "--update-detail", action="store_true", help="强制更新数据,而不是在数据已存在时跳过") | |
parser.add_argument("-v", "--version", action="store_true", help="输出版本号") | |
args = parser.parse_args() | |
if args.version: | |
print(".".join(map(str, __version__))) | |
raise SystemExit(0) | |
try: | |
from dupan import DuPanShareList | |
from lxml.etree import iselement, Comment | |
from lxml.html import parse, fromstring, tostring, HtmlElement | |
from wcwidth import wcwidth | |
except ImportError: | |
from sys import executable | |
from subprocess import run | |
run([executable, "-m", "pip", "install", "-U", "python-dupan", "lxml", "wcwidth"], check=True) | |
from dupan import DuPanShareList | |
from lxml.etree import iselement, Comment | |
from lxml.html import parse, fromstring, tostring, HtmlElement | |
from wcwidth import wcwidth # type: ignore | |
import json | |
import sqlite3 | |
from collections.abc import Callable | |
from concurrent.futures import ThreadPoolExecutor | |
from datetime import date, datetime | |
from http.client import IncompleteRead | |
from html import unescape | |
from itertools import cycle | |
from os import get_terminal_size, remove | |
from os.path import exists | |
from re import compile as re_compile | |
from sys import stdout | |
from threading import Lock | |
from textwrap import indent | |
from time import perf_counter | |
from typing import cast, Final | |
from urllib.error import URLError | |
from urllib.parse import unquote, urlparse, urlunparse | |
from urllib.request import urlopen | |
CRE_BACKTICKS: Final = re_compile(r"`+") | |
CRE_YEAR_MONTH: Final = re_compile(r'(?P<year>\d{4})年(?P<month>\d{1,2})月') | |
CRE_PWD: Final = re_compile(r"(?m:提取码.*?\b(?P<pwd1>[0-9a-zA-Z]{4})\b.*)") | |
def html_to_markdown( | |
el: bytes | bytearray | str | HtmlElement, | |
/, | |
) -> str: | |
"""html 转 markdown | |
""" | |
if isinstance(el, (bytes, bytearray, str)): | |
el = fromstring(el) | |
parts: list[str] = [] | |
add = parts.append | |
def add_part(s, indent_level=0): | |
if indent_level and (not parts or parts[-1][-1:] == "\n") and "\n" in s: | |
s = indent(s, " " * (4 * indent_level)).lstrip(" ") | |
if s: | |
add(s) | |
def extract(el, indent_level=0): | |
if not iselement(el) or el.tag is Comment: | |
return | |
el = cast(HtmlElement, el) | |
match el.tag: | |
case "br": | |
if parts: | |
if parts[-1][-1:] == "\n": | |
pass | |
elif parts[-1]: | |
add(" \n") | |
else: | |
add("\n") | |
case "h1" | "h2" | "h3" | "h4" | "h5" | "h6" as tag: | |
add_part("#" * int(tag[1]), indent_level) | |
add(" ") | |
text = (el.text or "").strip() | |
if text: | |
add(text) | |
for sel in el.iterfind("*"): | |
text = html_to_markdown(sel) | |
if text: | |
add(text) | |
case "a": | |
add_part("[", indent_level) | |
text = (el.text or "").strip() | |
if text: | |
add(text.replace("]", "]")) | |
for sel in el.iterfind("*"): | |
text = html_to_markdown(sel) | |
if text: | |
add(text.replace("]", "]")) | |
add("](") | |
add(el.attrib.get("href", "").replace(")", "%29")) | |
add(")") | |
case "img": | |
add_part("![", indent_level) | |
text = el.attrib.get("alt", "").strip() | |
if text: | |
add(text.replace("]", "]")) | |
add("](") | |
add(el.attrib.get("src", "").replace(")", "%29")) | |
title = el.attrib.get("title", "").strip() | |
if title: | |
add(' "') | |
add(title.replace('"', """)) | |
add('"') | |
add(")") | |
case "code": | |
max_backtick_len = max(map(len, CRE_BACKTICKS.findall(el.text))) | |
if max_backtick_len: | |
backticks = "`" * (max_backtick_len + 1) | |
add_part("%s %s %s" % el.text.replace(backticks, el.text, backticks), indent_level) | |
else: | |
add_part("`%s`" % el.text, indent_level) | |
case "strong" | "em" as tag: | |
text = (el.text or "").strip() | |
children = el.findall("*") | |
if children: | |
add_part(f"<{tag}>", indent_level) | |
if text: | |
add(text) | |
for sel in children: | |
extract(sel, indent_level) | |
add(f"</{tag}>") | |
elif text: | |
if tag == "em": | |
add_part("*%s*" % text.replace("*", r"\*")) | |
else: | |
add_part("**%s**" % text.replace("*", r"\*")) | |
case "svg" | "audio" | "video": | |
add_part(tostring(el, encoding="utf-8", with_tail=False).decode("utf-8"), indent_level) | |
case "script" | "style" | "link": | |
pass | |
case "li": | |
if not parts or parts[-1][:-1] == "\n": | |
add_part("- ", indent_level) | |
else: | |
add_part("\n- ", indent_level) | |
text = (el.text or "").strip() | |
if text: | |
add(text) | |
for sel in el: | |
extract(sel, indent_level + 1) | |
# TODO: case "table": ... | |
case _: | |
text = (el.text or "").strip() | |
add_part(text, indent_level) | |
for sel in el: | |
extract(sel, indent_level) | |
if el.tag in ( | |
"address", "article", "aside", "blockquote", "canvas", "dd", "div", "dl", | |
"fieldset", "figcaption", "figure", "footer", "form", "h1", "h2", "h3", | |
"h4", "h5", "h6", "header", "hr", "main", "nav", "noscript", "ol", "output", | |
"p", "pre", "section", "table", "tfoot", "ul", "video", | |
): | |
if parts: | |
if parts[-1][-2:] == "\n\n": | |
pass | |
elif parts[-1][-1:] == "\n": | |
add("\n") | |
else: | |
add("\n\n") | |
text = (el.tail or "").strip() | |
if text: | |
add_part(text, indent_level) | |
extract(el) | |
return "".join(parts).strip() | |
def get_archive_list( | |
begin_date: None | str = None, | |
end_date: None | str = None, | |
) -> list[dict]: | |
"""采集 https://salttiger.com/archives/ 页面罗列的条目(但不采集详情页) | |
""" | |
url = "https://salttiger.com/archives/" | |
etree = parse(urlopen(url, timeout=5)) | |
datalist = [] | |
if begin_date: | |
begin = datetime.strptime(begin_date, "%Y-%m-%d").date() | |
else: | |
begin = type("", (), {"__le__": staticmethod(lambda _: True)})() | |
if end_date: | |
end = datetime.strptime(end_date, "%Y-%m-%d").date() | |
else: | |
end = type("", (), {"__ge__": staticmethod(lambda _: True)})() | |
for car in etree.iterfind('.//ul[@class="car-list"]/li'): | |
year, month = map(int, CRE_YEAR_MONTH.search(car[0].text).groups()) # type: ignore | |
for m in car.findall("ul/li"): | |
day = int(m.text.rstrip(": ")) | |
if not begin <= date(year, month, day) <= end: | |
continue | |
datalist.append(dict( | |
title = m.find("a").text, | |
url = m.find("a").attrib["href"], | |
year = year, | |
month = month, | |
day = day, | |
)) | |
return datalist | |
def get_archive_detail(url: str, /) -> dict: | |
"""采集 url 对应的详情页信息 | |
""" | |
etree = parse(urlopen(url, timeout=5)) | |
entry_content = etree.find(f'.//*[@class="entry-content"]') | |
entry_meta = etree.find(f'.//*[@class="entry-meta"]') | |
attribute_content = entry_content[0] | |
try: | |
cover_el = attribute_content[0] | |
if cover_el.tag != "img": | |
raise | |
cover = cover_el.attrib["src"] | |
download_links = extract_download_links(attribute_content) | |
except: | |
cover = "" | |
download_links = None | |
try: | |
datetime = entry_meta.find('.//time[@class="entry-date"]').attrib["datetime"] | |
except: | |
datetime = "" | |
return { | |
"cover": cover, | |
"description": html_to_markdown(entry_content), | |
"datetime": datetime, | |
"tags": [ | |
{"tag": el.text, "href": el.attrib["href"], "rel": el.attrib["rel"]} | |
for el in entry_meta.xpath(".//a[@rel and contains(concat(' ', normalize-space(@rel), ' '), ' tag ')]") | |
], | |
"download_links": download_links, | |
} | |
def ed2k_extract(link: str, /) -> dict: | |
"""从 ed2k 链接中提取文件名和文件大小等信息 | |
""" | |
parts = link.split("|", 4) | |
return { | |
"link": link, | |
"name": parts[2], | |
"size": int(parts[3]), | |
} | |
def extract_download_links(el: HtmlElement, /) -> None | list[str]: | |
def dupan_append_pwd(urlp, pwd): | |
query = urlp.query | |
pwd = "pwd=" + pwd | |
if query: | |
if pwd in query: | |
return "" | |
return "&" + pwd | |
else: | |
return pwd | |
for br in el.iterfind(".//br"): | |
text = br.tail or "" | |
try: | |
text += (br.getnext().text or "") | |
except: | |
pass | |
if not text: | |
continue | |
text = text.lower() | |
if "download" in text or "下载" in text: | |
break | |
else: | |
return None | |
ls: list[str] = [] | |
for sel in br.xpath("following-sibling::a[@href] | following-sibling::*/descendant-or-self::a[@href]"): | |
href = unquote(unescape(sel.attrib["href"])) | |
if href.startswith(("magnet:", "ed2k:")): | |
ls.append(href) | |
else: | |
urlp = urlparse(href) | |
if not urlp.scheme or urlp.scheme not in ("http", "https"): | |
continue | |
if urlp.netloc == "pan.baidu.com" and not (urlp.query.startswith("pwd=") or "&pwd=" in urlp.query): | |
match = None | |
text = sel.text_content() + (sel.tail or "") | |
if text: | |
match = CRE_PWD.search(text) | |
nsel: None | HtmlElement | |
if match is None: | |
nsel = sel.getnext() | |
if iselement(nsel) and nsel.tag == "br": | |
text = nsel.tail or "" | |
match = CRE_PWD.search(text) | |
if match is None: | |
nsel = nsel.getnext() | |
if iselement(nsel) and nsel.tag is not Comment and nsel.tag != "br": | |
nsel = cast(HtmlElement, nsel) | |
text = nsel.text_content().lstrip() | |
if text.startswith("提取码"): | |
match = CRE_PWD.search(text) | |
if match is not None: | |
pwd = dupan_append_pwd(urlp, match[cast(str, match.lastgroup)]) | |
if pwd: | |
href = urlunparse(urlp._replace(query=urlp.query+pwd)) | |
ls.append(href) | |
return ls | |
def to_time_str(t: int | float, /, precision: int = 6) -> str: | |
s: int | float | str | |
m, s = divmod(t, 60) | |
if isinstance(t, float): | |
s = format(s, ">0%d.%df" % (3 + precision, precision)) | |
m = int(m) | |
else: | |
s = format(s, ">02d") | |
h, m = divmod(m, 60) | |
if h >= 24: | |
d, h = divmod(h, 24) | |
return f"{d} d {h:02d}:{m:02d}:{s}" | |
return f"{h:02d}:{m:02d}:{s}" | |
def calc_lines(s: str, /, columns: None | int = None) -> int: | |
"""计算文字会输出的长度(请预先去除 escape sequence 并进行 'NFC' 或 'NFKC' normalize) | |
""" | |
if columns is None or columns <= 0: | |
columns = get_terminal_size().columns | |
colsize = 0 | |
lines = 0 | |
for ch in s: | |
if ch == "\n": | |
lines += 1 | |
colsize = 0 | |
c = wcwidth(ch) | |
if c: | |
if c < 0: | |
c = 2 | |
if not colsize: | |
lines += 1 | |
colsize += c | |
if colsize >= columns: | |
if colsize > columns: | |
colsize = c | |
lines += 1 | |
else: | |
colsize = 0 | |
return lines | |
def make_progress_output(total: None | int = None): | |
"""创建一个 println 函数,可向控制台输出消息,同时输出进度条 | |
""" | |
lock = Lock() | |
write = stdout.write | |
flush = stdout.flush | |
count = 0 | |
success = 0 | |
cost = 0.0 | |
get_msg_fns = [ | |
cycle("😃😄😁😆😅🤣😂🙂🙃😉😊😇🫠🥰😍🤩😘😗😚😙😋😛😜🤪😝🤑🤗🤭🤫🤔🤤").__next__, | |
lambda: f" {count}", | |
] | |
if total is not None and total > 0: | |
get_msg_fns.append(lambda: f" of {total}") | |
get_msg_fns.append(lambda: f" | 🧮 {count/total*100:.2f} %") | |
get_msg_fns.append(lambda: f" | ✅ {success}") | |
get_msg_fns.append(lambda: f" | ❎ {count - success}") | |
get_msg_fns.append(lambda: f" | 🕙 {to_time_str(cost)}") | |
get_msg_fns.append(lambda: f" | 🚀 {cost and (count / cost):.2f} it/s") | |
last_columns = 0 | |
last_progress = "" | |
try: | |
get_terminal_size() | |
except OSError: | |
def println(msg: str = "", update: None | bool = None): | |
with lock: | |
write(msg + "\n") | |
else: | |
def println(msg: str = "", update: None | bool = None): | |
nonlocal count, success, cost, last_columns, last_progress | |
with lock: | |
if update is not None: | |
count += 1 | |
if update: | |
success += 1 | |
cost = perf_counter() - start_t | |
columns = get_terminal_size().columns | |
write("\r\x1b[K") | |
if columns < last_columns: | |
last_lines = calc_lines(last_progress) | |
if last_lines > 1: | |
write("\x1b[A\x1b[K"*(last_lines-1)) | |
last_columns = columns | |
write(msg + "\n") | |
cw = 0 | |
progress = "" | |
for fn in get_msg_fns: | |
s = fn() | |
columns -= len(s) + 1 | |
if columns >= 0: | |
progress += s | |
if columns <= 0: | |
break | |
write(progress) | |
write("\r") | |
flush() | |
last_progress = progress | |
start_t = perf_counter() | |
println() | |
return println | |
def update_archives( | |
archive_list, | |
list_files: bool = False, | |
update_detail: bool = False, | |
clear_files_first: bool = False, | |
callabck: None | Callable = None, | |
max_workers: None | int = None, | |
): | |
"""更新数据 | |
""" | |
total = len(archive_list) | |
print = make_progress_output(total) | |
def update(item): | |
if clear_files_first and "files" in item: | |
del item["files"] | |
if callabck: | |
callabck(item) | |
url = item["url"] | |
if update_detail or "description" not in item: | |
while True: | |
try: | |
detail = get_archive_detail(url) | |
if update_detail: | |
if item.get("description") != detail["description"]: | |
item.update(detail) | |
item.pop("files", None) | |
if callabck: | |
callabck(item) | |
elif "description" not in item: | |
item.update(detail) | |
if callabck: | |
callabck(item) | |
break | |
except KeyboardInterrupt: | |
raise | |
except (URLError, TimeoutError, IncompleteRead) as e: | |
print(f"\x1b[1m\x1b[38;5;3mRETRY\x1b[0m \x1b[4m\x1b[38;5;4m{url}\x1b[0m\n |_ \x1b[1m\x1b[38;5;1m{type(e).__qualname__}\x1b[0m: {e}") | |
except BaseException as e: | |
print(f"\x1b[1m\x1b[38;5;1mNA\x1b[0m \x1b[4m\x1b[38;5;4m{url}\x1b[0m\n |_ \x1b[1m\x1b[38;5;1m{type(e).__qualname__}\x1b[0m: {e}", update=False) | |
raise | |
if list_files and "files" not in item: | |
try: | |
download_links = item["download_links"] | |
if download_links: | |
files = [] | |
for link in download_links: | |
if link.startswith("ed2k://"): | |
attr = ed2k_extract(link) | |
attr["link"] = link | |
files.append(attr) | |
elif "://pan.baidu.com/" in link: | |
try: | |
files = [{**attr, "link": link} for attr in DuPanShareList(link) if not attr["isdir"]] | |
print(f"\x1b[1m\x1b[38;5;2mOK\x1b[0m \x1b[4m\x1b[38;5;4m{link}\x1b[0m") | |
except: | |
print(f"\x1b[1m\x1b[38;5;1mNA\x1b[0m \x1b[4m\x1b[38;5;4m{link}\x1b[0m") | |
raise | |
break | |
item["files"] = files | |
else: | |
item["files"] = [] | |
if callabck: | |
callabck(item) | |
except KeyboardInterrupt: | |
raise | |
except BaseException as e: | |
print(f"\x1b[1m\x1b[38;5;1mNA\x1b[0m \x1b[4m\x1b[38;5;4m{url}\x1b[0m\n |_ \x1b[1m\x1b[38;5;1m{type(e).__qualname__}\x1b[0m: {e}", update=False) | |
raise | |
print(f"\x1b[1m\x1b[38;5;2mOK\x1b[0m \x1b[4m\x1b[38;5;4m{url}\x1b[0m", update=True) | |
if max_workers == 1: | |
for item in archive_list: | |
update(item) | |
else: | |
executor = ThreadPoolExecutor(max_workers) | |
try: | |
for item in archive_list: | |
executor.submit(update, item) | |
with executor: | |
pass | |
finally: | |
executor.shutdown(wait=False, cancel_futures=True) | |
def update_json_db( | |
path: str = "salttiger.json", | |
list_files: bool = False, | |
update_detail: bool = False, | |
clear_files_first: bool = False, | |
max_workers: None | int = None, | |
begin_date: None | str = None, | |
end_date: None | str = None, | |
): | |
"""采集或更新数据到 json 数据库 | |
""" | |
archive_list = get_archive_list(begin_date, end_date) | |
try: | |
archives = json.load(open(path, "r", encoding="utf-8")) | |
except FileNotFoundError: | |
archives = {item["url"].rsplit('/', 2)[-2]: item for item in archive_list} | |
tasks = archive_list | |
else: | |
tasks = [] | |
for item in archive_list: | |
key = item["url"].rsplit('/', 2)[-2] | |
if key in archives: | |
tasks.append(archives["key"]) | |
else: | |
tasks.append(item) | |
archives[key] = item | |
update_archives( | |
tasks, | |
list_files=list_files, | |
update_detail=update_detail, | |
clear_files_first=clear_files_first, | |
max_workers=max_workers, | |
) | |
json.dump(archives, open(path, "w", encoding="utf-8"), ensure_ascii=False) | |
def update_sqlite_db( | |
path: str = "salttiger.db", | |
list_files: bool = False, | |
update_detail: bool = False, | |
clear_files_first: bool = False, | |
max_workers: None | int = None, | |
begin_date: None | str = None, | |
end_date: None | str = None, | |
): | |
"""采集或更新数据到 sqlite 数据库 | |
""" | |
sql = """\ | |
CREATE TABLE "data" ( | |
"id" TEXT NOT NULL, | |
"data" JSON NOT NULL, | |
"datetime" TEXT DEFAULT '', | |
PRIMARY KEY("id") ON CONFLICT REPLACE | |
);""" | |
changed: dict[str, dict] = {} | |
def update(item): | |
changed[item["url"].rsplit('/', 2)[-2]] = item | |
archive_list = get_archive_list(begin_date, end_date) | |
with sqlite3.connect(path) as con: | |
try: | |
archives = {k: json.loads(v) for k, v in con.execute("SELECT id, data FROM data")} | |
except Exception: | |
con.execute(sql) | |
archives = {} | |
tasks = [] | |
for item in archive_list: | |
key = item["url"].rsplit('/', 2)[-2] | |
if key in archives: | |
tasks.append(archives[key]) | |
else: | |
tasks.append(item) | |
changed[key] = item | |
update_archives( | |
tasks, | |
callabck=update, | |
list_files=list_files, | |
update_detail=update_detail, | |
clear_files_first=clear_files_first, | |
max_workers=max_workers, | |
) | |
if changed: | |
con.executemany( | |
"INSERT INTO data (id, data, datetime) VALUES (?, ?, ?)", | |
( | |
(key, json.dumps(item, ensure_ascii=False), item.get("datetime", "")) | |
for key, item in changed.items() | |
) | |
) | |
con.commit() | |
def sqlite_to_json(db_path: str, json_path: str): | |
"""sqlite 数据库转 json 数据库 | |
""" | |
with sqlite3.connect(db_path) as con: | |
archives = {k: json.loads(v) for k, v in con.execute("SELECT id, data FROM data")} | |
json.dump(archives, open(json_path, "w", encoding="utf-8"), ensure_ascii=False) | |
def json_to_sqlite(json_path: str, db_path: str): | |
"""json 数据库转 sqlite 数据库 | |
""" | |
archives = json.load(open(json_path, "r", encoding="utf-8")) | |
if exists(db_path): | |
remove(db_path) | |
with sqlite3.connect(db_path) as con: | |
con.execute("""\ | |
CREATE TABLE "data" ( | |
"id" TEXT NOT NULL, | |
"data" JSON NOT NULL, | |
"datetime" TEXT DEFAULT '', | |
PRIMARY KEY("id") ON CONFLICT REPLACE | |
);""") | |
con.executemany( | |
"INSERT INTO data (id, data, datetime) VALUES (?, ?, ?)", | |
( | |
(key, json.dumps(item, ensure_ascii=False), item.get("datetime", "")) | |
for key, item in archives.items() | |
) | |
) | |
con.commit() | |
if __name__ == "__main__": | |
db_path = args.db_path | |
max_workers = args.max_workers if args.max_workers > 0 else None | |
if db_path.endswith(".json"): | |
update_db = update_json_db | |
else: | |
update_db = update_sqlite_db | |
update_db( | |
db_path, | |
list_files=args.list_files, | |
update_detail=args.update_detail, | |
clear_files_first=args.clear_files_first, | |
max_workers=max_workers, | |
begin_date=args.begin_date, | |
end_date=args.end_date, | |
) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment