Created
October 3, 2022 07:31
-
-
Save i-v-s/8b28f0b217476fc269ea006d45e4028c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import re | |
from pathlib import Path | |
from argparse import ArgumentParser | |
from asyncio import run, gather, Semaphore, TimeoutError | |
from aiohttp import ClientSession, ClientError | |
from aiofile import AIOFile | |
from bs4 import BeautifulSoup | |
async def download_file(session: ClientSession, sem: Semaphore, url: str, target: Path): | |
async with sem: | |
try: | |
async with session.get(url) as response: | |
if response.status != 200: | |
print(f'Scraping {url} failed due to the return code {response.status}') | |
return | |
content = await response.read() | |
if content == b'': | |
print(f'Scraping {url} failed due to the empty content') | |
return | |
async with AIOFile(target, 'bw') as f: | |
await f.write(content) | |
print(f'Loaded {target} from {url}') | |
except (ClientError, TimeoutError) as e: | |
print(f'Exception on url {url}: {e}') | |
async def download_files(urls: set[str], out_dir: Path, limit=10): | |
tasks = [] | |
sem = Semaphore(limit) | |
async with ClientSession() as session: | |
for i, url in enumerate(urls): | |
ul = url.lower() | |
m = re.search(r'.(\w+)$', ul) | |
if m is None: | |
print('Unable to parse ext from', url) | |
ext = 'ext' | |
if 'jpg' in ul: | |
ext = 'jpg' | |
elif 'png' in ul: | |
ext = 'png' | |
else: | |
ext = m.group(1) | |
tasks.append(download_file(session, sem, url, out_dir / f'{i + 1:04}.{ext}')) | |
await gather(*tasks) | |
def main(): | |
parser = ArgumentParser('Yandex images parser') | |
parser.add_argument('source', type=Path, help='Input HTML file') | |
parser.add_argument('-o', '--out-dir', type=Path, default=Path('images'), help='Images output directory') | |
args = parser.parse_args() | |
with open(args.source, 'r') as f: | |
soup = BeautifulSoup(f, features='html.parser') | |
items = soup.find_all('div', class_='serp-item') | |
title = soup.find('title').text | |
print(f'File loaded, found {len(items)} item(s), title: {title}') | |
title = title.split(':')[0] | |
items = (json.loads(item.attrs['data-bem'])['serp-item'] for item in items) | |
urls = set() | |
for item in items: | |
dups, pre = item['dups'], item['preview'] | |
assert isinstance(dups, list) and isinstance(pre, list) | |
best = max(dups + pre, key=lambda v: (v['w'], v['h'], v['fileSizeInBytes'])) | |
if 'origin' in best: | |
best = best['origin'] | |
urls.add(best['url']) | |
print(f'Extracted {len(urls)} url(s)') | |
out_dir: Path = args.out_dir / title.replace(' ', '-') | |
print(f'Downloading to {out_dir}') | |
out_dir.mkdir(exist_ok=True) | |
run(download_files(urls, out_dir)) | |
print('Download completed') | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment