Last active
August 2, 2025 22:17
-
-
Save captainGeech42/71fde7756bb4144c22e797cbcc50aa24 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import os | |
import time | |
import traceback | |
import requests | |
from dateutil.parser import parse as dt_parse | |
WEBHOOK_URL = "https://discord.com/api/webhooks/xxxxxx" | |
CTFD_URL = "https://xxxxxxxxxxx" | |
ACCESS_TOKEN = "ctfd_xxxxxxxxxxxxxx" | |
SLEEP_TIME = 60 | |
BLOODS_FILE_ENVVAR = "BLOODS_FILE" | |
logging.basicConfig( | |
format="%(asctime)s [%(levelname)s] %(message)s", | |
level=logging.INFO, | |
datefmt="%Y/%m/%d %H:%M:%S", | |
handlers=[ | |
logging.StreamHandler() | |
] | |
) | |
s = requests.Session() | |
s.headers = { | |
"Authorization": f"Token {ACCESS_TOKEN}" | |
} | |
def get_solved_chals() -> list[tuple[str, int]]: | |
"""returns a list of tuples of (name, id)""" | |
r = s.get(CTFD_URL+"/api/v1/challenges") | |
r.raise_for_status() | |
return [(x["name"], x["id"]) for x in filter(lambda x: x["solves"] > 0, r.json()["data"])] | |
def get_bloods_fn() -> str: | |
fn = os.getenv(BLOODS_FILE_ENVVAR) | |
if fn is None: | |
fn = "bloods.txt" | |
return fn | |
def get_already_notified_chals() -> list[int]: | |
"""get the IDs of the chals already notified""" | |
fn = get_bloods_fn() | |
try: | |
with open(fn, "r") as f: | |
return [int(x) for x in filter(lambda x: len(x) > 0, [x.strip() for x in f.read()])] | |
except FileNotFoundError: | |
return [] | |
def get_pwning_team(id: int) -> str: | |
r = s.get(CTFD_URL+f"/api/v1/challenges/{id}/solves") | |
r.raise_for_status() | |
return sorted(map(lambda x: (x["name"], dt_parse(x["date"])), r.json()["data"]), key=lambda x: x[1])[0][0] | |
def notify_solve(name, team): | |
r = requests.post(WEBHOOK_URL, json={"content": f"New challenge blood! {name} - Pwned by {team}"}) | |
r.raise_for_status() | |
def notify_error(exc): | |
r = requests.post(WEBHOOK_URL, json={"content": f"Bot error encountered\n```{exc}```"}) | |
try: | |
r.raise_for_status() | |
except requests.exceptions.HTTPError: | |
logging.exception("failed to post full error: %s", r.text) | |
requests.post(WEBHOOK_URL, json={"content": f"Bot error encountered, check server"}) | |
def record_notification(id: int): | |
with open(get_bloods_fn(), "a+") as f: | |
f.write(str(id)+"\n") | |
def loop(): | |
solved_chals = get_solved_chals() | |
already_notified_chals = get_already_notified_chals() | |
for (name, id) in solved_chals: | |
if id in already_notified_chals: | |
continue | |
team = get_pwning_team(id) | |
logging.info("newly solved chal, reporting - %s (ID: %d); pwned by %s", name, id, team) | |
notify_solve(name, team) | |
record_notification(id) | |
def main(): | |
logging.info("using blood ids at %s", get_bloods_fn()) | |
while True: | |
logging.info("checking for new solves") | |
try: | |
loop() | |
except: | |
logging.exception("loop iteration failure, reporting webhook") | |
notify_error(traceback.format_exc()) | |
logging.info("done, sleeping for %d secs", SLEEP_TIME) | |
time.sleep(SLEEP_TIME) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment