Created
February 20, 2025 05:46
-
-
Save ehzawad/b75d14c733b23c5e86b60e31c317c91b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import sys | |
import os | |
import subprocess | |
from datetime import datetime | |
import re | |
import shutil | |
import logging | |
# Set up logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s', | |
datefmt='%Y-%m-%d %H:%M:%S' | |
) | |
class DataProcessingError(Exception): | |
"""Custom exception for data processing errors""" | |
pass | |
def parse_date(date_input): | |
""" | |
Parse various date formats and return MM_DD format | |
Handles formats like: | |
- Feb 20, February 20 | |
- Feb_20, feb_20 | |
- 02_20, 2_20 | |
""" | |
try: | |
# Remove any extra spaces and convert to lowercase | |
date_input = date_input.strip().lower() | |
# Dictionary for month names to numbers | |
month_dict = { | |
'jan': '01', 'january': '01', | |
'feb': '02', 'february': '02', | |
'mar': '03', 'march': '03', | |
'apr': '04', 'april': '04', | |
'may': '05', | |
'jun': '06', 'june': '06', | |
'jul': '07', 'july': '07', | |
'aug': '08', 'august': '08', | |
'sep': '09', 'september': '09', | |
'oct': '10', 'october': '10', | |
'nov': '11', 'november': '11', | |
'dec': '12', 'december': '12' | |
} | |
# Case 1: Already in MM_DD format | |
if re.match(r'^\d{1,2}[_-]\d{1,2}$', date_input): | |
month, day = date_input.replace('-', '_').split('_') | |
month = month.zfill(2) | |
day = day.zfill(2) | |
return f"{month}_{day}" | |
# Case 2: Month name formats (Feb 20, February 20, Feb_20, etc.) | |
patterns = [ | |
r'^([a-z]+)[_\s-](\d{1,2})$', # Feb_20, Feb 20 | |
r'^([a-z]+)(\d{1,2})$' # Feb20 | |
] | |
for pattern in patterns: | |
match = re.match(pattern, date_input) | |
if match: | |
month_str, day = match.groups() | |
if month_str in month_dict: | |
return f"{month_dict[month_str]}_{day.zfill(2)}" | |
raise ValueError(f"Unable to parse date format: {date_input}") | |
except Exception as e: | |
logging.error(f"Error parsing date: {str(e)}") | |
return None | |
def validate_date(date_str): | |
"""Validate the date format (MM_DD)""" | |
try: | |
# Add dummy year to make a complete date | |
datetime.strptime(f"2025_{date_str}", "%Y_%m_%d") | |
return True | |
except ValueError as e: | |
logging.error(f"Invalid date format: {str(e)}") | |
return False | |
def get_month_name(month_num): | |
"""Convert month number to abbreviated month name""" | |
month_names = { | |
'01': 'jan', '02': 'feb', '03': 'mar', '04': 'apr', | |
'05': 'may', '06': 'jun', '07': 'jul', '08': 'aug', | |
'09': 'sep', '10': 'oct', '11': 'nov', '12': 'dec' | |
} | |
return month_names.get(month_num, 'unknown') | |
def ensure_directory_exists(directory): | |
"""Ensure directory exists, create if necessary""" | |
try: | |
if not os.path.exists(directory): | |
os.makedirs(directory) | |
logging.info(f"Created directory: {directory}") | |
return True | |
except OSError as e: | |
logging.error(f"Error creating directory {directory}: {str(e)}") | |
return False | |
def clean_existing_files(zip_path): | |
"""Remove existing zip file if it exists""" | |
try: | |
if os.path.exists(zip_path): | |
os.remove(zip_path) | |
logging.info(f"Removed existing zip file: {zip_path}") | |
except OSError as e: | |
logging.error(f"Error removing existing file {zip_path}: {str(e)}") | |
raise DataProcessingError(f"Could not remove existing file: {str(e)}") | |
def process_and_zip_folders(date_str): | |
"""Zip the downloaded folders with new names and organize them""" | |
try: | |
month, day = date_str.split('_') | |
month_name = get_month_name(month) | |
# Ensure zipped_data directory exists | |
if not ensure_directory_exists("zipped_data"): | |
raise DataProcessingError("Failed to create zipped_data directory") | |
# Original folder names | |
vlog_folder = f"vlog/2025_{date_str}" | |
transcript_folder = f"transcript/2025_{date_str}" | |
# New zip file paths | |
audio_zip = f"zipped_data/audio_{month_name}_{day}" | |
transcript_zip = f"zipped_data/transcript_{month_name}_{day}" | |
# Process vlog folder | |
if os.path.exists(vlog_folder): | |
clean_existing_files(f"{audio_zip}.zip") | |
shutil.make_archive(audio_zip, 'zip', vlog_folder) | |
logging.info(f"Created {audio_zip}.zip") | |
else: | |
logging.warning(f"Vlog folder not found: {vlog_folder}") | |
# Process transcript folder | |
if os.path.exists(transcript_folder): | |
clean_existing_files(f"{transcript_zip}.zip") | |
shutil.make_archive(transcript_zip, 'zip', transcript_folder) | |
logging.info(f"Created {transcript_zip}.zip") | |
else: | |
logging.warning(f"Transcript folder not found: {transcript_folder}") | |
return True | |
except Exception as e: | |
logging.error(f"Error in process_and_zip_folders: {str(e)}") | |
raise DataProcessingError(f"Failed to process and zip folders: {str(e)}") | |
def download_folders(date_str): | |
"""Download vlog and transcript folders for given date""" | |
try: | |
# Remote server | |
remote_host = "mtb" | |
# Base paths | |
vlog_base = "/usr/local/ccpro/AA/vlog/2025" | |
transcript_base = "/usr/local/ccpro/AA/transcript/2025" | |
# Full folder names | |
year = "2025" | |
folder_name = f"{year}_{date_str}" | |
# Ensure local directories exist | |
for directory in ["vlog", "transcript"]: | |
if not ensure_directory_exists(directory): | |
raise DataProcessingError(f"Failed to create {directory} directory") | |
# Download commands | |
vlog_cmd = f"scp -r {remote_host}:{vlog_base}/{folder_name} ./vlog/" | |
transcript_cmd = f"scp -r {remote_host}:{transcript_base}/{folder_name} ./transcript/" | |
logging.info(f"Downloading vlog folder for date: {date_str}") | |
vlog_result = subprocess.run(vlog_cmd, shell=True, stderr=subprocess.PIPE) | |
logging.info(f"Downloading transcript folder for date: {date_str}") | |
transcript_result = subprocess.run(transcript_cmd, shell=True, stderr=subprocess.PIPE) | |
if vlog_result.returncode != 0: | |
logging.error(f"Vlog download error: {vlog_result.stderr.decode()}") | |
if transcript_result.returncode != 0: | |
logging.error(f"Transcript download error: {transcript_result.stderr.decode()}") | |
if vlog_result.returncode == 0 and transcript_result.returncode == 0: | |
logging.info("Successfully downloaded both folders") | |
# Process and zip the downloaded folders | |
return process_and_zip_folders(date_str) | |
else: | |
raise DataProcessingError("Failed to download one or both folders") | |
except Exception as e: | |
logging.error(f"Error in download_folders: {str(e)}") | |
return False | |
def main(): | |
try: | |
if len(sys.argv) < 2: | |
print("Usage: ./download_logs.py DATE") | |
print("Supported date formats:") | |
print(" - MM_DD (e.g., 02_19)") | |
print(" - Month DD (e.g., Feb 20, February 20)") | |
print(" - Month_DD (e.g., Feb_20, feb_20)") | |
sys.exit(1) | |
# Join all arguments after the script name to handle unquoted dates with spaces | |
date_input = ' '.join(sys.argv[1:]) | |
date_str = parse_date(date_input) | |
if not date_str or not validate_date(date_str): | |
print("Invalid date format! Please use one of the following formats:") | |
print(" - MM_DD (e.g., 02_19)") | |
print(" - Month DD (e.g., Feb 20, February 20)") | |
print(" - Month_DD (e.g., Feb_20, feb_20)") | |
sys.exit(1) | |
success = download_folders(date_str) | |
if not success: | |
logging.error("Script execution failed") | |
sys.exit(1) | |
except Exception as e: | |
logging.error(f"Unexpected error: {str(e)}") | |
sys.exit(1) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
#!/usr/bin/env python3
import sys
import os
import subprocess
from datetime import datetime
import re
import shutil
import logging
Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
class DataProcessingError(Exception):
"""Custom exception for data processing errors"""
pass
def parse_date(date_input):
"""
Parse various date formats and return MM_DD format
Handles formats like:
- Feb 20, February 20
- Feb_20, feb_20
- 02_20, 2_20
"""
try:
# Remove any extra spaces and convert to lowercase
date_input = date_input.strip().lower()
def validate_date(date_str):
"""Validate the date format (MM_DD)"""
try:
# Add dummy year to make a complete date
datetime.strptime(f"2025_{date_str}", "%Y_%m_%d")
return True
except ValueError as e:
logging.error(f"Invalid date format: {str(e)}")
return False
def get_month_name(month_num):
"""Convert month number to abbreviated month name"""
month_names = {
'01': 'jan', '02': 'feb', '03': 'mar', '04': 'apr',
'05': 'may', '06': 'jun', '07': 'jul', '08': 'aug',
'09': 'sep', '10': 'oct', '11': 'nov', '12': 'dec'
}
return month_names.get(month_num, 'unknown')
def ensure_directory_exists(directory):
"""Ensure directory exists, create if necessary"""
try:
if not os.path.exists(directory):
os.makedirs(directory)
logging.info(f"Created directory: {directory}")
return True
except OSError as e:
logging.error(f"Error creating directory {directory}: {str(e)}")
return False
def clean_existing_files(zip_path):
"""Remove existing zip file if it exists"""
try:
if os.path.exists(zip_path):
os.remove(zip_path)
logging.info(f"Removed existing zip file: {zip_path}")
except OSError as e:
logging.error(f"Error removing existing file {zip_path}: {str(e)}")
raise DataProcessingError(f"Could not remove existing file: {str(e)}")
def process_and_zip_folders(date_str):
"""Zip the downloaded folders with new names and organize them"""
try:
month, day = date_str.split('_')
month_name = get_month_name(month)
def download_folders(date_str):
"""Download vlog and transcript folders for given date"""
try:
# Remote server
remote_host = "mtb"
def clean_directories():
"""Remove all created directories"""
try:
directories = ['transcript', 'vlog', 'zipped_data']
for directory in directories:
if os.path.exists(directory):
shutil.rmtree(directory)
logging.info(f"Removed directory: {directory}")
print("Successfully cleaned all directories!")
return True
except Exception as e:
logging.error(f"Error cleaning directories: {str(e)}")
return False
def main():
try:
if len(sys.argv) < 2:
print("Usage: ./download_logs.py [DATE|clean]")
print("Options:")
print(" DATE - Download and process logs for the given date")
print(" clean - Remove all created directories")
print("\nSupported date formats:")
print(" - MM_DD (e.g., 02_19)")
print(" - Month DD (e.g., Feb 20, February 20)")
print(" - Month_DD (e.g., Feb_20, feb_20)")
sys.exit(1)
if name == "main":
main()