Last active
December 25, 2024 08:53
-
-
Save PhantomOffKanagawa/8e9c7cd149f0af078e25d69bf54b1c25 to your computer and use it in GitHub Desktop.
Lego New Product Scraper
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Description: | |
This script scrapes the LEGO website for new, pre-order, coming soon, available, out of stock, and back order sets. It sends notifications to a Discord channel when new sets are found or when the status of existing sets changes. The script uses BeautifulSoup for web scraping and requests for making HTTP requests. It also uses a JSON file to keep track of notified sets and their statuses. | |
Functions: | |
- setup_storage(): Initializes the storage file for notified sets if it doesn't exist. | |
- load_notified_sets(): Loads the notified sets from the storage file. | |
- save_notified_sets(notified_sets): Saves the notified sets to the storage file. | |
- is_set_notified(set_id, notified_sets): Checks if a set has already been notified. | |
- mark_set_as_notified(set_id, status, notified_sets): Marks a set as notified and saves the status. | |
- send_discord_notification(set_name, set_image, set_price, set_url, status): Sends a notification to a Discord channel. | |
- get_url(page_type="new", page_number=1): Constructs the URL for the LEGO website based on the page type and number. | |
- scrape_lego(page_type, page_number=None): Scrapes the LEGO website for sets and sends notifications for new or status-changed sets. | |
- main(): Main execution function that sets up storage and scrapes various pages on the LEGO website. | |
""" | |
# TODO: Garbage collect old log and notified_sets files (e.g. after 30 days or no longer scraped on page from non-coming-soon-state) | |
# TODO: Add non-discord notification options (e.g. email, SMS, etc.) | |
# TODO: Scrape as many pages as are shown in the pagination | |
# TODO: Scrape as many pages as are new | |
# TODO: Get date of availability for sets (recurse into set page) | |
import requests | |
from bs4 import BeautifulSoup | |
import os | |
import json | |
import logging | |
from dotenv import load_dotenv | |
from time import sleep | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, filename="lego_scraper.log", filemode="a", | |
format="%(asctime)s - %(levelname)s - %(message)s") | |
# Load environment variables | |
load_dotenv() | |
DB_FILE = os.getenv("DB_FILE", "notified_sets.json") | |
DISCORD_WEBHOOK_URL = os.getenv("DISCORD_WEBHOOK_URL") | |
if not DISCORD_WEBHOOK_URL: | |
raise ValueError("Discord webhook URL is not set in the .env file.") | |
def setup_storage(): | |
if not os.path.exists(DB_FILE): | |
with open(DB_FILE, "w") as f: | |
json.dump({}, f) | |
def load_notified_sets(): | |
with open(DB_FILE, "r") as f: | |
return json.load(f) | |
def save_notified_sets(notified_sets): | |
with open(DB_FILE, "w") as f: | |
json.dump(notified_sets, f) | |
# Check if the set is already notified | |
def is_set_notified(set_id, notified_sets): | |
return set_id in notified_sets | |
# Mark a set as notified | |
def mark_set_as_notified(set_id, status, notified_sets): | |
notified_sets[set_id] = status | |
save_notified_sets(notified_sets) | |
# Send Discord notification | |
def send_discord_notification(set_name, set_image, set_price, set_url, status, set_date): | |
payload = { | |
"content": None, | |
"embeds": [ | |
{ | |
"title": set_name, | |
"url": set_url, | |
"color": 15608105, | |
"image": {"url": set_image}, | |
"fields": [ | |
{"name": "Price", "value": set_price, "inline": True}, | |
{"name": "Status", "value": status, "inline": True}, | |
# {"name": "Pieces", "value": "Unknown", "inline": True}, | |
], | |
"footer": { | |
"text": set_date, | |
# "icon_url": "https://assets.lego.com/logos/v4.5.0/brand-lego.svg" | |
}, | |
# "timestamp": "2024-12-25T06:00:00.000Z", | |
}, # TODO: Combine multiple sets into one embed | |
] | |
} | |
try: | |
response = requests.post(DISCORD_WEBHOOK_URL, json=payload) | |
response.raise_for_status() | |
logging.info(f"Notification sent for {set_name}.") | |
except requests.exceptions.RequestException as e: | |
logging.error(f"Failed to send Discord notification: {e}") | |
# Get the URL for the LEGO website to scrape new sets or coming soon sets based on page number | |
def get_url(page_type = "coming_soon", page_number = 1): | |
# Ensure page number is valid | |
if page_number == None or page_number < 1: | |
page_number = 1 | |
# Return the URL based on the page type | |
if page_type == "pre_order": | |
# Return the URL for new sets sorted by availability date | |
return f"https://www.lego.com/en-us/categories/new-sets-and-products?page={page_number}&sort.key=AVAILABILITY_DATE&sort.direction=DESC&filters.i0.key=variants.attributes.availabilityStatus.zxx-US&filters.i0.values.i0=%22A_PRE_ORDER_FOR_DATE%22%2C%22C_PRE_ORDER%22&offset=0" | |
elif page_type == "coming_soon": | |
# Return the URL for coming soon sets sorted by availability | |
return f"https://www.lego.com/en-us/categories/new-sets-and-products?page={page_number}&sort.key=AVAILABILITY_DATE&sort.direction=DESC&filters.i0.key=variants.attributes.availabilityStatus.zxx-US&filters.i0.values.i0=%22B_COMING_SOON_AT_DATE%22%2C%22D_COMING_SOON%22&offset=0" | |
elif page_type == "available": | |
# Return the URL for available sets sorted by availability | |
return f"https://www.lego.com/en-us/categories/new-sets-and-products?page={page_number}&sort.key=AVAILABILITY_DATE&sort.direction=DESC&filters.i0.key=variants.attributes.availabilityStatus.zxx-US&filters.i0.values.i0=%22E_AVAILABLE%22&offset=0" | |
elif page_type == "out_of_stock": | |
# Return the URL for out of stock sets sorted by availability | |
return f"https://www.lego.com/en-us/categories/new-sets-and-products?page={page_number}&sort.key=AVAILABILITY_DATE&sort.direction=DESC&filters.i0.key=variants.attributes.availabilityStatus.zxx-US&filters.i0.values.i0=\"H_OUT_OF_STOCK\"%2C\"K_SOLD_OUT\"%2C\"L_READ_ONLY\"" | |
elif page_type == "back_order": | |
# Return the URL for back order sets sorted by availability | |
return f"https://www.lego.com/en-us/categories/new-sets-and-products?page={page_number}&sort.key=AVAILABILITY_DATE&sort.direction=DESC&filters.i0.key=variants.attributes.availabilityStatus.zxx-US&filters.i0.values.i0=\"F_BACKORDER_FOR_DATE\"%2C\"G_BACKORDER\"" | |
else: | |
raise ValueError("Invalid page type. Must be 'new' or 'coming_soon'.") | |
# Scrape LEGO website for new sets | |
def scrape_lego(page_type, page_number=None): | |
logging.info(f"Scraping page {page_number} of {page_type} sets.") | |
sleep(0.5) # Sleep for 0.5 seconds to avoid rate limiting | |
# Fetch the website content | |
try: | |
url = get_url(page_type, page_number) | |
response = requests.get(url) | |
response.raise_for_status() | |
except requests.exceptions.RequestException as e: | |
logging.error(f"Error fetching LEGO website: {e}") | |
return | |
soup = BeautifulSoup(response.text, "html.parser") | |
sets = soup.select("#product-listing-grid > li > article") | |
notified_sets = load_notified_sets() | |
new_sets = process_sets(sets, notified_sets) | |
for set_id, set_name, set_image, set_price, set_url, current_status, set_date in new_sets: | |
send_discord_notification(set_name, set_image, set_price, set_url, current_status, set_date) | |
sleep(0.1) # Sleep for 0.1 second to avoid rate limiting | |
logging.info(f"Scraped {len(new_sets)} new sets of {len(sets)} total sets from page {page_number} of {page_type} sets.") | |
def process_sets(sets, notified_sets): | |
new_sets = [] | |
for set_div in sets: | |
try: | |
set_id, set_name, set_image, set_price, set_url, current_status = extract_set_details(set_div) | |
if set_id in notified_sets: | |
previous_status = notified_sets[set_id] | |
if previous_status == "Coming Soon" and current_status != "Coming Soon": | |
set_date = extract_date(set_div, set_url, current_status) | |
new_sets.append((set_id, set_name, set_image, set_price, set_url, current_status, set_date)) | |
mark_set_as_notified(set_id, current_status, notified_sets) | |
else: | |
set_date = extract_date(set_div, set_url, current_status) | |
print(set_date) | |
new_sets.append((set_id, set_name, set_image, set_price, set_url, current_status, set_date)) | |
mark_set_as_notified(set_id, current_status, notified_sets) | |
except Exception as e: | |
logging.warning(f"Failed to parse a set: {e}") | |
return new_sets | |
def extract_set_details(set_div, image_number: int = 2): | |
""" | |
Extracts details of a LEGO set from a given HTML division element. | |
Args: | |
set_div (bs4.element.Tag): The HTML division element containing the set details. | |
image_number (int, optional): The position of the image element to extract the set image from. Defaults to 2. | |
1 is just the set, 2 is the set with the box | |
Returns: | |
tuple: A tuple containing the following details of the LEGO set: | |
- set_id (str): The unique identifier of the set. | |
- set_name (str): The name of the set. | |
- set_image (str): The URL of the set image. | |
- set_price (str): The price of the set. | |
- set_url (str): The URL to the set's page on the LEGO website. | |
- current_status (str): The current status of the set. | |
""" | |
set_id = set_div["data-test-key"] | |
set_name = set_div.find("h3").text.strip() | |
set_url = "https://www.lego.com" + set_div.find("a")["href"] | |
source_element = set_div.select_one(f"li:nth-of-type({image_number}) picture source") | |
set_image = source_element["srcset"].split()[0] if source_element and "srcset" in source_element.attrs else "Image not available" | |
price_div = set_div.select_one("div:nth-of-type(2)") | |
set_price = price_div.text.strip() if price_div else "Price not listed" | |
coded_status = extract_status(set_div) | |
current_status = map_status(coded_status) | |
return set_id, set_name, set_image, set_price, set_url, current_status | |
def extract_status(set_div): | |
status_div = set_div.select_one("div:nth-of-type(3) > div > button") | |
if status_div: | |
return status_div.text.strip().lower() | |
status_div = set_div.select_one("div:nth-of-type(3) > a") | |
if status_div: | |
return status_div.text.strip().lower() | |
return "Unknown" | |
def map_status(coded_status): | |
status_mapping = { | |
"backorder": "Back Order", | |
"out of stock": "Out of Stock", | |
"pre-order": "Pre-Order", | |
"add to bag": "In Stock", | |
"coming soon": "Coming Soon" | |
} | |
return status_mapping.get(coded_status, "Unknown") | |
def extract_date(set_div, set_url, current_status): | |
# If known None return None | |
if current_status != "Coming Soon" and current_status != "Back Order" and current_status != "Pre-Order": | |
return None | |
print(f"Fetching date for {set_url}") | |
# Fetch the website content for the set page | |
try: | |
sleep(0.5) # Sleep for 0.5 seconds to avoid rate limiting | |
response = requests.get(set_url) | |
response.raise_for_status() | |
except requests.exceptions.RequestException as e: | |
logging.error(f"Error fetching Set Details website {set_url}: {e}") | |
return None | |
detail_soup = BeautifulSoup(response.text, "html.parser") | |
date_div = detail_soup.select_one("div:nth-of-type(2) > div:nth-of-type(2) > p > span") | |
if date_div: | |
return date_div.text.strip() | |
return None | |
# Main execution | |
def main(): | |
setup_storage() | |
scrape_lego("pre_order", page_number=1) | |
scrape_lego("out_of_stock", page_number=1) | |
scrape_lego("back_order", page_number=1) | |
scrape_lego("available", page_number=1) | |
scrape_lego("available", page_number=2) | |
scrape_lego("available", page_number=3) | |
scrape_lego("coming_soon", page_number=1) | |
scrape_lego("coming_soon", page_number=2) | |
scrape_lego("coming_soon", page_number=3) | |
if __name__ == "__main__": | |
main() |
Author
PhantomOffKanagawa
commented
Dec 25, 2024
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment