Created
April 19, 2023 04:08
-
-
Save gau-nernst/1e6089598cc578864f65735a96748907 to your computer and use it in GitHub Desktop.
Scrape images from DuckDuckGo
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import List | |
from playwright.sync_api import sync_playwright | |
import requests | |
import re | |
import json | |
import os | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
from tqdm import tqdm | |
import argparse | |
def get_image_urls(query: str, num_results: int = 100): | |
with sync_playwright() as p: | |
browser = p.chromium.launch(channel="chrome") | |
page = browser.new_page() | |
ddg_url = f"https://duckduckgo.com/?q={query.replace(' ', '+')}&iax=images&ia=images&iaf=license%3AShare" | |
page.goto(ddg_url, wait_until="networkidle") | |
page_content = page.content() | |
vqd = re.search(r"vqd='(\d-\d+-\d+)'", page_content)[1] | |
results = [] | |
while len(results) < num_results: | |
url = f"https://duckduckgo.com/i.js?l=us-en&o=json&q={query.replace(' ', '%20')}&vqd={vqd}&f=,,,,,license:Share&p=1&s={len(results)}" | |
page.goto(url) | |
results.extend(json.loads(page.text_content("pre"))["results"]) | |
browser.close() | |
results = results[:num_results] | |
return [r["url"] for r in results], [r["image"].split("?")[0] for r in results] | |
def detect_image_ext(data: bytes): | |
if data.startswith(b"\xFF\xD8\xFF"): | |
return ".jpg" | |
elif data.startswith(b"\x89\x50\x4E\x47\x0D\x0A\x1A\x0A"): | |
return ".png" | |
else: | |
return None | |
def batch_download(urls: List[str], save_dir): | |
with ThreadPoolExecutor(4) as executor: | |
def job(img_url, i): | |
try: | |
resp = requests.get(img_url) | |
resp.raise_for_status() | |
data = resp.content | |
ext = os.path.splitext(img_url)[-1] or detect_image_ext(data) | |
with open(f"{save_dir}/image_{i:04d}{ext}", "wb") as f: | |
f.write(data) | |
except: | |
print(f"Fail to download {img_url}") | |
futures = [executor.submit(job, img_url, i) for i, img_url in enumerate(urls)] | |
for future in tqdm(as_completed(futures), total=len(futures)): | |
future.result() | |
def main(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument("--query", required=True) | |
parser.add_argument("--num_results", type=int, default=100) | |
parser.add_argument("--save_txt", default="search_results.txt") | |
parser.add_argument("--download_dir", default="search_images") | |
args = parser.parse_args() | |
urls, image_urls = get_image_urls(args.query, num_results=args.num_results) | |
with open(args.save_txt, "w") as f: | |
for url, img_url in zip(urls, image_urls): | |
f.write(f"{url},{img_url}\n") | |
if args.download_dir is not None: | |
os.makedirs(args.download_dir, exist_ok=True) | |
batch_download(image_urls, args.download_dir) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment