Created
August 29, 2025 00:16
-
-
Save sugatoray/d22a49c566b8b52576d5577baf1cdb05 to your computer and use it in GitHub Desktop.
Python Scheduler and File Watcher
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| ## install: | |
| ## > pip install schedule watchfiles | |
| import os | |
| import time | |
| from datetime import datetime | |
| import schedule | |
| import stat | |
| from watchfiles import watch | |
| import pathlib | |
| import logging | |
| import pwd | |
| # Configure logging for better output | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s' | |
| ) | |
| # --- Configuration (User-definable) --- | |
| # Replace with the actual directory you want to watch | |
| DIRECTORY_TO_WATCH = "./watch_test_dir" | |
| # Replace with the filename pattern you are looking for (e.g., '.txt', 'report', etc.) | |
| FILENAME_PATTERN = ".txt" | |
| def get_file_owner(filepath): | |
| """ | |
| Attempts to get the username of the file owner. | |
| Returns 'Unknown' if not available. | |
| """ | |
| try: | |
| # Get the UID of the file owner | |
| file_stat = os.stat(filepath) | |
| uid = file_stat.st_uid | |
| # Look up the username from the UID | |
| username = pwd.getpwuid(uid).pw_name | |
| return username | |
| except (FileNotFoundError, KeyError): | |
| return 'Unknown' | |
| except Exception as e: | |
| logging.error(f"Error getting file owner for {filepath}: {e}") | |
| return 'Unknown' | |
| def bytes_to_human(n): | |
| """ | |
| Converts bytes to a human-readable format (KB, MB, GB). | |
| """ | |
| symbols = ('K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y') | |
| prefix = {} | |
| for i, s in enumerate(symbols): | |
| prefix[s] = 1 << (i + 1) * 10 | |
| for s in reversed(symbols): | |
| if n >= prefix[s]: | |
| value = float(n) / prefix[s] | |
| return f'{value:.2f} {s}B' | |
| return f'{n:.2f} B' | |
| def watch_and_report(directory_path, filename_pattern): | |
| """ | |
| Watches a directory for new files and reports on them if they match the pattern. | |
| This function is designed to be called by the scheduler. | |
| """ | |
| logging.info(f"Hourly job started. Watching '{directory_path}' for new files matching '{filename_pattern}'.") | |
| try: | |
| # watchfiles.watch() yields changes in a directory indefinitely | |
| # We will iterate through the changes and process only the first one found | |
| # to demonstrate the logic. The scheduler will call this every hour. | |
| for changes in watch(directory_path): | |
| for change, path in changes: | |
| if change.name == 'added' and pathlib.Path(path).name.endswith(filename_pattern): | |
| # Found a new file that matches the pattern | |
| p = pathlib.Path(path) | |
| try: | |
| file_stat = p.stat() | |
| file_path = str(p.resolve()) | |
| file_size = bytes_to_human(file_stat.st_size) | |
| creation_time = datetime.fromtimestamp(file_stat.st_ctime).strftime('%Y-%m-%d %H:%M:%S') | |
| created_by = get_file_owner(file_path) | |
| report_message = ( | |
| f"\n" | |
| f"--- New File Detected ---\n" | |
| f"File Path: {file_path}\n" | |
| f"File Size: {file_size}\n" | |
| f"Created By: {created_by}\n" | |
| f"Creation Time: {creation_time}\n" | |
| f"-------------------------\n" | |
| ) | |
| print(report_message) | |
| # Stop watching after the first match to allow the scheduler to run again | |
| return | |
| except FileNotFoundError: | |
| logging.warning(f"File {path} was created but then deleted quickly.") | |
| except Exception as e: | |
| logging.error(f"An error occurred while processing file {path}: {e}") | |
| except FileNotFoundError: | |
| logging.error(f"The directory '{directory_path}' was not found. Please create it.") | |
| except Exception as e: | |
| logging.error(f"An unexpected error occurred during the watch process: {e}") | |
| def schedule_hourly_job(): | |
| """ | |
| Checks the current day and schedules the hourly job if it's between the 1st and 5th. | |
| This function is designed to be called once a day by the main scheduler. | |
| """ | |
| current_day = datetime.now().day | |
| if 1 <= current_day <= 5: | |
| logging.info(f"Today is the {current_day}th. Scheduling the file watching job to run every hour.") | |
| # Schedule the watch_and_report function to run every hour | |
| schedule.every(1).hour.do(watch_and_report, directory_path=DIRECTORY_TO_WATCH, filename_pattern=FILENAME_PATTERN) | |
| else: | |
| # Clear any existing hourly jobs from previous days | |
| schedule.clear() | |
| logging.info(f"Today is the {current_day}th. No hourly job will be scheduled.") | |
| # --- Main Execution Loop --- | |
| if __name__ == "__main__": | |
| logging.info("Scheduler program started.") | |
| logging.info(f"Watching directory: {DIRECTORY_TO_WATCH}") | |
| logging.info(f"Matching file pattern: '{FILENAME_PATTERN}'") | |
| # Create the directory if it doesn't exist for demonstration purposes | |
| if not os.path.exists(DIRECTORY_TO_WATCH): | |
| os.makedirs(DIRECTORY_TO_WATCH) | |
| logging.info(f"Created a test directory: {DIRECTORY_TO_WATCH}") | |
| # Schedule the daily check. This job runs once a day to decide if the hourly job should be active. | |
| schedule.every().day.at("00:00").do(schedule_hourly_job) | |
| # Run the daily check immediately to set up the hourly job for the current day | |
| schedule_hourly_job() | |
| try: | |
| # The main loop that keeps the scheduler running | |
| while True: | |
| schedule.run_pending() | |
| time.sleep(1) | |
| except KeyboardInterrupt: | |
| logging.info("Scheduler stopped by user.") | |
| except Exception as e: | |
| logging.error(f"An unexpected error occurred in the main loop: {e}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment