Last active
August 2, 2024 01:09
-
-
Save Bungeetaco/e40839aea234e5d8c47929444c5f5175 to your computer and use it in GitHub Desktop.
windows10spotlight.com Image Collector
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import time | |
import requests | |
import logging | |
import argparse | |
import json | |
import signal | |
import sys | |
import platform | |
import shutil | |
from selenium import webdriver | |
from selenium.webdriver.chrome.service import Service | |
from selenium.webdriver.chrome.options import Options | |
from selenium.webdriver.common.by import By | |
from selenium.webdriver.support.ui import WebDriverWait | |
from selenium.webdriver.support import expected_conditions as EC | |
from selenium.common.exceptions import NoSuchElementException, TimeoutException, StaleElementReferenceException | |
from concurrent.futures import ThreadPoolExecutor, as_completed, wait | |
from requests.adapters import HTTPAdapter | |
from requests.packages.urllib3.util.retry import Retry | |
from webdriver_manager.chrome import ChromeDriverManager | |
from colorama import init, Fore | |
from ratelimit import limits, sleep_and_retry | |
from PIL import Image | |
from tqdm import tqdm | |
init(autoreset=True) | |
should_exit = False | |
driver = None | |
progress = None | |
executor = None | |
class ProgressManager: | |
def __init__(self, total_pages): | |
self.total_pages = total_pages | |
self.current_page = 0 | |
self.current_operation = "Initializing" | |
self.pbar = tqdm(total=total_pages, position=0, leave=True, bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt}") | |
self.status_bar = tqdm(total=0, position=1, bar_format="{desc}", leave=True) | |
def update_progress(self, page=None, operation=None): | |
if page is not None and page > self.current_page: | |
self.current_page = page | |
self.pbar.update(1) | |
if operation is not None: | |
self.current_operation = operation | |
self._update_status() | |
def _update_status(self): | |
status = f"Page {self.current_page}/{self.total_pages} | {self.current_operation}" | |
self.status_bar.set_description_str(status) | |
def clear_progress(self): | |
self.pbar.clear() | |
self.status_bar.clear() | |
def close(self): | |
self.pbar.close() | |
self.status_bar.close() | |
def custom_logger(progress_manager): | |
class ProgressBarHandler(logging.Handler): | |
def emit(self, record): | |
progress_manager.clear_progress() | |
print(self.format(record)) | |
progress_manager.pbar.refresh() | |
progress_manager.status_bar.refresh() | |
logger = logging.getLogger('custom_logger') | |
logger.setLevel(logging.INFO) | |
handler = ProgressBarHandler() | |
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') | |
handler.setFormatter(formatter) | |
logger.addHandler(handler) | |
return logger | |
def signal_handler(signum, frame): | |
global should_exit, driver, progress, executor | |
print(f"\n{Fore.YELLOW}Received interrupt signal. Cleaning up and exiting gracefully...") | |
should_exit = True | |
if executor: | |
executor.shutdown(wait=False, cancel_futures=True) | |
if progress: | |
progress.close() | |
if driver: | |
driver.quit() | |
sys.exit(0) | |
def init_driver(): | |
chrome_options = Options() | |
chrome_options.add_argument("--headless") | |
chrome_options.add_argument("--no-sandbox") | |
chrome_options.add_argument("--disable-dev-shm-usage") | |
chrome_options.add_argument("--log-level=3") | |
service = Service(ChromeDriverManager().install()) | |
driver = webdriver.Chrome(service=service, options=chrome_options) | |
return driver | |
def get_last_page_number(driver, base_url): | |
driver.get(base_url) | |
try: | |
WebDriverWait(driver, 10).until( | |
EC.presence_of_element_located((By.CSS_SELECTOR, '.page-numbers')) | |
) | |
last_page_element = driver.find_elements(By.CSS_SELECTOR, '.page-numbers')[-2] | |
last_page_number = int(last_page_element.get_attribute('href').split('/')[-1]) | |
return last_page_number | |
except (NoSuchElementException, IndexError, TimeoutException): | |
custom_log.error("Failed to find the last page number.") | |
return None | |
def retry_on_stale_element(func): | |
def wrapper(*args, **kwargs): | |
retries = 3 | |
while retries > 0: | |
try: | |
return func(*args, **kwargs) | |
except StaleElementReferenceException: | |
retries -= 1 | |
custom_log.warning(f"StaleElementReferenceException encountered. Retrying... ({3 - retries} retries left)") | |
time.sleep(2) | |
raise StaleElementReferenceException(f"Max retries exceeded for function {func.__name__}") | |
return wrapper | |
@retry_on_stale_element | |
def locate_element(driver, xpath): | |
return driver.find_element(By.XPATH, xpath) | |
def scrape_image_page(driver, image_page_url, retries=3): | |
global should_exit | |
xpath_options = [ | |
'//*[@id="content"]/article/div/div[2]/p[1]/a', | |
'/html/body/div[1]/div/main/article/div/div[2]/a', | |
'/html/body/div[1]/div/main/article/div/div[2]/figure/a' | |
] | |
for attempt in range(retries): | |
if should_exit: | |
return None | |
try: | |
driver.get(image_page_url) | |
for xpath in xpath_options: | |
try: | |
WebDriverWait(driver, 5).until( | |
EC.presence_of_element_located((By.XPATH, xpath)) | |
) | |
img_element = locate_element(driver, xpath) | |
img_url = img_element.get_attribute('href') | |
return img_url | |
except (NoSuchElementException, TimeoutException, StaleElementReferenceException): | |
continue | |
else: | |
custom_log.error(f"{Fore.RED}Failed to find image element on page: {image_page_url}") | |
except Exception as e: | |
custom_log.error(f"{Fore.RED}Error accessing page {image_page_url}: {str(e)}") | |
if attempt < retries - 1: | |
time.sleep(2) | |
return None | |
def scrape_image_links(driver, page_url): | |
global should_exit | |
if should_exit: | |
return [] | |
driver.get(page_url) | |
WebDriverWait(driver, 10).until( | |
EC.presence_of_element_located((By.CSS_SELECTOR, '.entry-box a')) | |
) | |
links = driver.find_elements(By.CSS_SELECTOR, '.entry-box a') | |
image_page_urls = [link.get_attribute('href') for link in links] | |
return image_page_urls | |
def create_session(proxy=None): | |
session = requests.Session() | |
retries = Retry(total=5, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504]) | |
session.mount('http://', HTTPAdapter(max_retries=retries, pool_maxsize=10)) | |
session.mount('https://', HTTPAdapter(max_retries=retries, pool_maxsize=10)) | |
if proxy: | |
session.proxies = {'http': proxy, 'https': proxy} | |
return session | |
@sleep_and_retry | |
@limits(calls=5, period=1) | |
def rate_limited_request(session, url, headers): | |
return session.get(url, headers=headers) | |
def verify_image(file_path): | |
try: | |
with Image.open(file_path) as img: | |
img.verify() | |
return True, img.size | |
except: | |
return False, None | |
def get_resolution_folder(download_dir, width, height): | |
if width == 1920 and height == 1080: | |
return download_dir | |
else: | |
folder_name = f"other_resolutions_{width}x{height}" | |
folder_path = os.path.join(download_dir, folder_name) | |
os.makedirs(folder_path, exist_ok=True) | |
return folder_path | |
def download_image(image_info): | |
global should_exit | |
image_url, download_dir, retries, progress = image_info | |
if should_exit: | |
return | |
session = create_session() | |
for attempt in range(retries + 1): | |
if should_exit: | |
return | |
try: | |
custom_log.info(f"{Fore.GREEN}Downloading image from: {Fore.YELLOW}{image_url}") | |
headers = { | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
} | |
response = rate_limited_request(session, image_url, headers) | |
response.raise_for_status() | |
file_name = image_url.split('/')[-1] | |
final_file_path = os.path.join(download_dir, file_name) | |
if not os.path.exists(final_file_path): | |
with open(final_file_path, 'wb') as img_file: | |
progress.update_progress(operation=f"Downloading: {file_name}") | |
for data in response.iter_content(1024): | |
if should_exit: | |
img_file.close() | |
os.remove(final_file_path) | |
return | |
img_file.write(data) | |
progress.update_progress() | |
is_valid, dimensions = verify_image(final_file_path) | |
if is_valid: | |
width, height = dimensions | |
custom_log.info(f"{Fore.BLUE}Downloaded and verified: {Fore.CYAN}{file_name} ({width}x{height})") | |
if width == 1080 and height == 1920: | |
resolution_folder = get_resolution_folder(download_dir, width, height) | |
final_resolution_path = os.path.join(resolution_folder, file_name) | |
shutil.move(final_file_path, final_resolution_path) | |
custom_log.info(f"{Fore.BLUE}Moved {Fore.CYAN}{file_name} to {resolution_folder}") | |
else: | |
custom_log.error(f"{Fore.RED}Downloaded file is not a valid image: {Fore.YELLOW}{file_name}") | |
os.remove(final_file_path) | |
else: | |
custom_log.info(f"{Fore.YELLOW}File already exists: {Fore.CYAN}{file_name}") | |
return | |
except Exception as e: | |
custom_log.error(f"{Fore.RED}Failed to download image from {Fore.YELLOW}{image_url}: {e}") | |
if attempt < retries: | |
wait_time = 2 ** attempt | |
custom_log.info(f"{Fore.YELLOW}Retrying download for {Fore.YELLOW}{image_url} in {wait_time} seconds ({Fore.RED}{retries - attempt} retries left)...") | |
time.sleep(wait_time) | |
else: | |
custom_log.error(f"{Fore.RED}Max retries reached for {Fore.YELLOW}{image_url}") | |
def save_progress(completed_urls, filename='progress.json'): | |
with open(filename, 'w') as f: | |
json.dump(list(completed_urls), f) | |
def load_progress(filename='progress.json'): | |
try: | |
with open(filename, 'r') as f: | |
return set(json.load(f)) | |
except FileNotFoundError: | |
return set() | |
def scrape_and_download_page(driver, page_url, download_dir, progress, current_page, last_page): | |
global should_exit | |
if should_exit: | |
return | |
progress.update_progress(page=current_page, operation="Fetching image links") | |
image_page_urls = scrape_image_links(driver, page_url) | |
custom_log.info(f"{Fore.MAGENTA}Found {Fore.YELLOW}{len(image_page_urls)} image page links on page: {Fore.MAGENTA}{page_url}") | |
progress.update_progress(operation="Collecting image URLs") | |
image_urls = [] | |
for image_page_url in image_page_urls: | |
if should_exit: | |
return | |
img_url = scrape_image_page(driver, image_page_url) | |
if img_url: | |
image_urls.append(img_url) | |
progress.update_progress(operation="Downloading images") | |
completed_urls = load_progress() | |
with ThreadPoolExecutor(max_workers=5) as executor: | |
futures = [] | |
for img_url in image_urls: | |
if should_exit: | |
return | |
if img_url not in completed_urls: | |
futures.append(executor.submit(download_image, (img_url, download_dir, 3, progress))) | |
for future in as_completed(futures): | |
if should_exit: | |
return | |
try: | |
future.result() | |
completed_urls.add(img_url) | |
save_progress(completed_urls) | |
except Exception as e: | |
custom_log.error(f"Error occurred during image download: {e}") | |
def scrape_all_images(base_url, start_page, end_page, download_dir, num_threads=5): | |
global driver, progress, custom_log, should_exit, executor | |
if not os.path.exists(download_dir): | |
os.makedirs(download_dir) | |
driver = init_driver() | |
if end_page is None: | |
end_page = get_last_page_number(driver, base_url) | |
if end_page is None: | |
custom_log.error("Failed to determine the last page number. Exiting.") | |
driver.quit() | |
return | |
progress = ProgressManager(end_page - start_page + 1) | |
custom_log = custom_logger(progress) | |
try: | |
executor = ThreadPoolExecutor(max_workers=num_threads) | |
futures = [] | |
for page_number in range(start_page, end_page + 1): | |
if should_exit: | |
break | |
page_url = f"{base_url}/page/{page_number}" | |
futures.append(executor.submit(scrape_and_download_page, driver, page_url, download_dir, progress, page_number, end_page)) | |
wait(futures) # Wait for all futures to complete | |
except Exception as e: | |
custom_log.error(f"{Fore.RED}An error occurred: {str(e)}") | |
finally: | |
if executor: | |
executor.shutdown(wait=False, cancel_futures=True) | |
if driver: | |
driver.quit() | |
if progress: | |
progress.close() | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description="Scrape and download images from Windows Spotlight website") | |
parser.add_argument("--base_url", type=str, default="https://windows10spotlight.com", help="Base URL of the website") | |
parser.add_argument("--start_page", type=int, default=169, help="Start page number") | |
parser.add_argument("--end_page", type=int, default=None, help="End page number") | |
parser.add_argument("--download_dir", type=str, default="downloaded_images", help="Directory to save downloaded images") | |
parser.add_argument("--proxy", type=str, help="Proxy to use for requests (e.g., http://10.10.1.10:3128)") | |
parser.add_argument("--log_level", type=str, choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], default='INFO', help="Set the logging level") | |
parser.add_argument("--threads", type=int, default=5, help="Number of threads for concurrent scraping") | |
args = parser.parse_args() | |
# Set up signal handlers | |
signal.signal(signal.SIGINT, signal_handler) | |
if platform.system() == "Windows": | |
signal.signal(signal.SIGBREAK, signal_handler) | |
else: | |
signal.signal(signal.SIGTSTP, signal_handler) | |
try: | |
scrape_all_images(args.base_url, args.start_page, args.end_page, args.download_dir, args.threads) | |
except KeyboardInterrupt: | |
print(f"\n{Fore.YELLOW}Keyboard interrupt received. Exiting...") | |
should_exit = True | |
finally: | |
if executor: | |
executor.shutdown(wait=False, cancel_futures=True) | |
if driver: | |
driver.quit() | |
if progress: | |
progress.close() | |
if should_exit: | |
sys.exit(0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment