Skip to content

Instantly share code, notes, and snippets.

@Bungeetaco
Last active August 2, 2024 01:09
Show Gist options
  • Save Bungeetaco/e40839aea234e5d8c47929444c5f5175 to your computer and use it in GitHub Desktop.
Save Bungeetaco/e40839aea234e5d8c47929444c5f5175 to your computer and use it in GitHub Desktop.
windows10spotlight.com Image Collector
import os
import time
import requests
import logging
import argparse
import json
import signal
import sys
import platform
import shutil
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import NoSuchElementException, TimeoutException, StaleElementReferenceException
from concurrent.futures import ThreadPoolExecutor, as_completed, wait
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from webdriver_manager.chrome import ChromeDriverManager
from colorama import init, Fore
from ratelimit import limits, sleep_and_retry
from PIL import Image
from tqdm import tqdm
init(autoreset=True)
should_exit = False
driver = None
progress = None
executor = None
class ProgressManager:
def __init__(self, total_pages):
self.total_pages = total_pages
self.current_page = 0
self.current_operation = "Initializing"
self.pbar = tqdm(total=total_pages, position=0, leave=True, bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt}")
self.status_bar = tqdm(total=0, position=1, bar_format="{desc}", leave=True)
def update_progress(self, page=None, operation=None):
if page is not None and page > self.current_page:
self.current_page = page
self.pbar.update(1)
if operation is not None:
self.current_operation = operation
self._update_status()
def _update_status(self):
status = f"Page {self.current_page}/{self.total_pages} | {self.current_operation}"
self.status_bar.set_description_str(status)
def clear_progress(self):
self.pbar.clear()
self.status_bar.clear()
def close(self):
self.pbar.close()
self.status_bar.close()
def custom_logger(progress_manager):
class ProgressBarHandler(logging.Handler):
def emit(self, record):
progress_manager.clear_progress()
print(self.format(record))
progress_manager.pbar.refresh()
progress_manager.status_bar.refresh()
logger = logging.getLogger('custom_logger')
logger.setLevel(logging.INFO)
handler = ProgressBarHandler()
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def signal_handler(signum, frame):
global should_exit, driver, progress, executor
print(f"\n{Fore.YELLOW}Received interrupt signal. Cleaning up and exiting gracefully...")
should_exit = True
if executor:
executor.shutdown(wait=False, cancel_futures=True)
if progress:
progress.close()
if driver:
driver.quit()
sys.exit(0)
def init_driver():
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--log-level=3")
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service, options=chrome_options)
return driver
def get_last_page_number(driver, base_url):
driver.get(base_url)
try:
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, '.page-numbers'))
)
last_page_element = driver.find_elements(By.CSS_SELECTOR, '.page-numbers')[-2]
last_page_number = int(last_page_element.get_attribute('href').split('/')[-1])
return last_page_number
except (NoSuchElementException, IndexError, TimeoutException):
custom_log.error("Failed to find the last page number.")
return None
def retry_on_stale_element(func):
def wrapper(*args, **kwargs):
retries = 3
while retries > 0:
try:
return func(*args, **kwargs)
except StaleElementReferenceException:
retries -= 1
custom_log.warning(f"StaleElementReferenceException encountered. Retrying... ({3 - retries} retries left)")
time.sleep(2)
raise StaleElementReferenceException(f"Max retries exceeded for function {func.__name__}")
return wrapper
@retry_on_stale_element
def locate_element(driver, xpath):
return driver.find_element(By.XPATH, xpath)
def scrape_image_page(driver, image_page_url, retries=3):
global should_exit
xpath_options = [
'//*[@id="content"]/article/div/div[2]/p[1]/a',
'/html/body/div[1]/div/main/article/div/div[2]/a',
'/html/body/div[1]/div/main/article/div/div[2]/figure/a'
]
for attempt in range(retries):
if should_exit:
return None
try:
driver.get(image_page_url)
for xpath in xpath_options:
try:
WebDriverWait(driver, 5).until(
EC.presence_of_element_located((By.XPATH, xpath))
)
img_element = locate_element(driver, xpath)
img_url = img_element.get_attribute('href')
return img_url
except (NoSuchElementException, TimeoutException, StaleElementReferenceException):
continue
else:
custom_log.error(f"{Fore.RED}Failed to find image element on page: {image_page_url}")
except Exception as e:
custom_log.error(f"{Fore.RED}Error accessing page {image_page_url}: {str(e)}")
if attempt < retries - 1:
time.sleep(2)
return None
def scrape_image_links(driver, page_url):
global should_exit
if should_exit:
return []
driver.get(page_url)
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, '.entry-box a'))
)
links = driver.find_elements(By.CSS_SELECTOR, '.entry-box a')
image_page_urls = [link.get_attribute('href') for link in links]
return image_page_urls
def create_session(proxy=None):
session = requests.Session()
retries = Retry(total=5, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504])
session.mount('http://', HTTPAdapter(max_retries=retries, pool_maxsize=10))
session.mount('https://', HTTPAdapter(max_retries=retries, pool_maxsize=10))
if proxy:
session.proxies = {'http': proxy, 'https': proxy}
return session
@sleep_and_retry
@limits(calls=5, period=1)
def rate_limited_request(session, url, headers):
return session.get(url, headers=headers)
def verify_image(file_path):
try:
with Image.open(file_path) as img:
img.verify()
return True, img.size
except:
return False, None
def get_resolution_folder(download_dir, width, height):
if width == 1920 and height == 1080:
return download_dir
else:
folder_name = f"other_resolutions_{width}x{height}"
folder_path = os.path.join(download_dir, folder_name)
os.makedirs(folder_path, exist_ok=True)
return folder_path
def download_image(image_info):
global should_exit
image_url, download_dir, retries, progress = image_info
if should_exit:
return
session = create_session()
for attempt in range(retries + 1):
if should_exit:
return
try:
custom_log.info(f"{Fore.GREEN}Downloading image from: {Fore.YELLOW}{image_url}")
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
response = rate_limited_request(session, image_url, headers)
response.raise_for_status()
file_name = image_url.split('/')[-1]
final_file_path = os.path.join(download_dir, file_name)
if not os.path.exists(final_file_path):
with open(final_file_path, 'wb') as img_file:
progress.update_progress(operation=f"Downloading: {file_name}")
for data in response.iter_content(1024):
if should_exit:
img_file.close()
os.remove(final_file_path)
return
img_file.write(data)
progress.update_progress()
is_valid, dimensions = verify_image(final_file_path)
if is_valid:
width, height = dimensions
custom_log.info(f"{Fore.BLUE}Downloaded and verified: {Fore.CYAN}{file_name} ({width}x{height})")
if width == 1080 and height == 1920:
resolution_folder = get_resolution_folder(download_dir, width, height)
final_resolution_path = os.path.join(resolution_folder, file_name)
shutil.move(final_file_path, final_resolution_path)
custom_log.info(f"{Fore.BLUE}Moved {Fore.CYAN}{file_name} to {resolution_folder}")
else:
custom_log.error(f"{Fore.RED}Downloaded file is not a valid image: {Fore.YELLOW}{file_name}")
os.remove(final_file_path)
else:
custom_log.info(f"{Fore.YELLOW}File already exists: {Fore.CYAN}{file_name}")
return
except Exception as e:
custom_log.error(f"{Fore.RED}Failed to download image from {Fore.YELLOW}{image_url}: {e}")
if attempt < retries:
wait_time = 2 ** attempt
custom_log.info(f"{Fore.YELLOW}Retrying download for {Fore.YELLOW}{image_url} in {wait_time} seconds ({Fore.RED}{retries - attempt} retries left)...")
time.sleep(wait_time)
else:
custom_log.error(f"{Fore.RED}Max retries reached for {Fore.YELLOW}{image_url}")
def save_progress(completed_urls, filename='progress.json'):
with open(filename, 'w') as f:
json.dump(list(completed_urls), f)
def load_progress(filename='progress.json'):
try:
with open(filename, 'r') as f:
return set(json.load(f))
except FileNotFoundError:
return set()
def scrape_and_download_page(driver, page_url, download_dir, progress, current_page, last_page):
global should_exit
if should_exit:
return
progress.update_progress(page=current_page, operation="Fetching image links")
image_page_urls = scrape_image_links(driver, page_url)
custom_log.info(f"{Fore.MAGENTA}Found {Fore.YELLOW}{len(image_page_urls)} image page links on page: {Fore.MAGENTA}{page_url}")
progress.update_progress(operation="Collecting image URLs")
image_urls = []
for image_page_url in image_page_urls:
if should_exit:
return
img_url = scrape_image_page(driver, image_page_url)
if img_url:
image_urls.append(img_url)
progress.update_progress(operation="Downloading images")
completed_urls = load_progress()
with ThreadPoolExecutor(max_workers=5) as executor:
futures = []
for img_url in image_urls:
if should_exit:
return
if img_url not in completed_urls:
futures.append(executor.submit(download_image, (img_url, download_dir, 3, progress)))
for future in as_completed(futures):
if should_exit:
return
try:
future.result()
completed_urls.add(img_url)
save_progress(completed_urls)
except Exception as e:
custom_log.error(f"Error occurred during image download: {e}")
def scrape_all_images(base_url, start_page, end_page, download_dir, num_threads=5):
global driver, progress, custom_log, should_exit, executor
if not os.path.exists(download_dir):
os.makedirs(download_dir)
driver = init_driver()
if end_page is None:
end_page = get_last_page_number(driver, base_url)
if end_page is None:
custom_log.error("Failed to determine the last page number. Exiting.")
driver.quit()
return
progress = ProgressManager(end_page - start_page + 1)
custom_log = custom_logger(progress)
try:
executor = ThreadPoolExecutor(max_workers=num_threads)
futures = []
for page_number in range(start_page, end_page + 1):
if should_exit:
break
page_url = f"{base_url}/page/{page_number}"
futures.append(executor.submit(scrape_and_download_page, driver, page_url, download_dir, progress, page_number, end_page))
wait(futures) # Wait for all futures to complete
except Exception as e:
custom_log.error(f"{Fore.RED}An error occurred: {str(e)}")
finally:
if executor:
executor.shutdown(wait=False, cancel_futures=True)
if driver:
driver.quit()
if progress:
progress.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Scrape and download images from Windows Spotlight website")
parser.add_argument("--base_url", type=str, default="https://windows10spotlight.com", help="Base URL of the website")
parser.add_argument("--start_page", type=int, default=169, help="Start page number")
parser.add_argument("--end_page", type=int, default=None, help="End page number")
parser.add_argument("--download_dir", type=str, default="downloaded_images", help="Directory to save downloaded images")
parser.add_argument("--proxy", type=str, help="Proxy to use for requests (e.g., http://10.10.1.10:3128)")
parser.add_argument("--log_level", type=str, choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], default='INFO', help="Set the logging level")
parser.add_argument("--threads", type=int, default=5, help="Number of threads for concurrent scraping")
args = parser.parse_args()
# Set up signal handlers
signal.signal(signal.SIGINT, signal_handler)
if platform.system() == "Windows":
signal.signal(signal.SIGBREAK, signal_handler)
else:
signal.signal(signal.SIGTSTP, signal_handler)
try:
scrape_all_images(args.base_url, args.start_page, args.end_page, args.download_dir, args.threads)
except KeyboardInterrupt:
print(f"\n{Fore.YELLOW}Keyboard interrupt received. Exiting...")
should_exit = True
finally:
if executor:
executor.shutdown(wait=False, cancel_futures=True)
if driver:
driver.quit()
if progress:
progress.close()
if should_exit:
sys.exit(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment