-
-
Save nate-double-u/b078fb23bf47a909897879008efadf4e to your computer and use it in GitHub Desktop.
This script fetches all GitHub notifications, checks if their associated issues or pull requests are closed/merged, and marks them as done by deleting them via the GitHub API.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import os | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
# Get GitHub token from environment variable | |
GITHUB_TOKEN = os.environ.get('GITHUB_TOKEN') | |
if not GITHUB_TOKEN: | |
raise ValueError("Missing GITHUB_TOKEN environment variable") | |
# GitHub API Base URL | |
GITHUB_API_URL = 'https://api.github.com' | |
# Headers for API requests (updated to include API version) | |
HEADERS = { | |
'Authorization': f'Bearer {GITHUB_TOKEN}', | |
'Accept': 'application/vnd.github+json', | |
'X-GitHub-Api-Version': '2022-11-28' | |
} | |
def get_all_notifications(): | |
""" | |
Fetch ALL notifications with pagination, including read/archived ones | |
(because of 'all=true'). | |
For debugging, prints out each page URL and the count of notifications | |
on that page. | |
""" | |
notifications = [] | |
url = f'{GITHUB_API_URL}/notifications?all=true&per_page=100' # Start with first page | |
while url: | |
# print(f"[DEBUG] Fetching notifications from: {url}") | |
response = requests.get(url, headers=HEADERS) | |
response.raise_for_status() | |
page_data = response.json() | |
# print(f"[DEBUG] Received {len(page_data)} notifications from this page.") | |
notifications.extend(page_data) | |
# Check if there's a next page | |
url = None # Default to stopping | |
if 'Link' in response.headers: | |
links = response.headers['Link'].split(', ') | |
for link in links: | |
if 'rel="next"' in link: | |
url = link[link.find('<') + 1:link.find('>')] | |
break | |
# print(f"[DEBUG] Done retrieving all notifications. Total count: {len(notifications)}") | |
return notifications | |
def is_notification_resolved(notification): | |
""" | |
Determines if a notification is "resolved" by checking the associated | |
issue/PR/release/workflow. Prints debug info about the request and the | |
resulting data. | |
""" | |
subject = notification.get("subject", {}) | |
subject_type = subject.get("type") | |
api_url = subject.get("url") # API URL for the related resource | |
# If there's no URL, we can't check anything | |
if not api_url: | |
# print(f"[DEBUG] Notification {notification['id']} has no subject URL. Subject type: {subject_type}") | |
return False | |
# print(f"[DEBUG] Checking notification {notification['id']}: type={subject_type}, url={api_url}") | |
response = requests.get(api_url, headers=HEADERS) | |
# print(f"[DEBUG] -> HTTP status code: {response.status_code}") | |
# If we can't fetch the details (404, 403, etc.), we can't mark it resolved | |
if response.status_code != 200: | |
# print(f"[DEBUG] -> Non-200 response; body: {response.text[:300]}...") | |
return False | |
data = response.json() | |
# Print a truncated snippet to avoid flooding the console | |
snippet = str(data)[:300] | |
# print(f"[DEBUG] -> Response data (truncated): {snippet}...") | |
if subject_type == "PullRequest": | |
merged_at = data.get("merged_at") | |
state = data.get("state") | |
# print(f"[DEBUG] -> PR state={state}, merged_at={merged_at}") | |
return merged_at is not None or state == "closed" | |
elif subject_type == "Issue": | |
state = data.get("state") | |
# print(f"[DEBUG] -> Issue state={state}") | |
return state == "closed" | |
elif subject_type == "Release": | |
print("[DEBUG] -> Marking release as resolved (always).") | |
return True | |
elif subject_type in ["Workflow", "CheckRun", "CheckSuite"]: | |
# For GitHub Actions events, consider it resolved if it's completed and the conclusion is failure. | |
status = data.get("status") | |
conclusion = data.get("conclusion") | |
# print(f"[DEBUG] -> Workflow status={status}, conclusion={conclusion}") | |
return status == "completed" and conclusion == "failure" | |
else: | |
# print(f"[DEBUG] -> Subject type not explicitly handled: {subject_type}") | |
return False | |
def delete_notification(thread_id): | |
""" | |
Marks a notification as done (i.e., "deletes" it) via the GitHub API. | |
Prints debug info about the response status. | |
""" | |
url = f'{GITHUB_API_URL}/notifications/threads/{thread_id}' | |
response = requests.delete(url, headers=HEADERS) | |
if response.status_code in [200, 204]: | |
print(f"β Marked notification {thread_id} as done (deleted).") | |
return True | |
else: | |
print(f"β Failed to delete {thread_id} (HTTP {response.status_code}) - {response.text[:300]}...") | |
return False | |
def process_notification(notification): | |
""" | |
Checks a single notification and, if it is resolved based on our criteria, | |
marks it as done. | |
""" | |
thread_id = notification['id'] | |
title = notification['subject']['title'] | |
print(f"π Checking notification {thread_id}: {title}") | |
if is_notification_resolved(notification): | |
return delete_notification(thread_id) | |
return False | |
def main(): | |
notifications = get_all_notifications() | |
print(f"π© Found {len(notifications)} notifications in total.") | |
processed_count = 0 | |
# For clearer debug logs (in sequence), you might set max_workers=1 temporarily | |
max_workers = 5 | |
# max_workers = 1 | |
with ThreadPoolExecutor(max_workers=max_workers) as executor: | |
future_to_notification = { | |
executor.submit(process_notification, notif): notif | |
for notif in notifications | |
} | |
for future in as_completed(future_to_notification): | |
if future.result(): | |
processed_count += 1 | |
print(f"β Finished processing. {processed_count} notifications marked as done.") | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment