Skip to content

Instantly share code, notes, and snippets.

@ehzawad
Created February 20, 2025 05:46
Show Gist options
  • Save ehzawad/b75d14c733b23c5e86b60e31c317c91b to your computer and use it in GitHub Desktop.
Save ehzawad/b75d14c733b23c5e86b60e31c317c91b to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import sys
import os
import subprocess
from datetime import datetime
import re
import shutil
import logging
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
class DataProcessingError(Exception):
"""Custom exception for data processing errors"""
pass
def parse_date(date_input):
"""
Parse various date formats and return MM_DD format
Handles formats like:
- Feb 20, February 20
- Feb_20, feb_20
- 02_20, 2_20
"""
try:
# Remove any extra spaces and convert to lowercase
date_input = date_input.strip().lower()
# Dictionary for month names to numbers
month_dict = {
'jan': '01', 'january': '01',
'feb': '02', 'february': '02',
'mar': '03', 'march': '03',
'apr': '04', 'april': '04',
'may': '05',
'jun': '06', 'june': '06',
'jul': '07', 'july': '07',
'aug': '08', 'august': '08',
'sep': '09', 'september': '09',
'oct': '10', 'october': '10',
'nov': '11', 'november': '11',
'dec': '12', 'december': '12'
}
# Case 1: Already in MM_DD format
if re.match(r'^\d{1,2}[_-]\d{1,2}$', date_input):
month, day = date_input.replace('-', '_').split('_')
month = month.zfill(2)
day = day.zfill(2)
return f"{month}_{day}"
# Case 2: Month name formats (Feb 20, February 20, Feb_20, etc.)
patterns = [
r'^([a-z]+)[_\s-](\d{1,2})$', # Feb_20, Feb 20
r'^([a-z]+)(\d{1,2})$' # Feb20
]
for pattern in patterns:
match = re.match(pattern, date_input)
if match:
month_str, day = match.groups()
if month_str in month_dict:
return f"{month_dict[month_str]}_{day.zfill(2)}"
raise ValueError(f"Unable to parse date format: {date_input}")
except Exception as e:
logging.error(f"Error parsing date: {str(e)}")
return None
def validate_date(date_str):
"""Validate the date format (MM_DD)"""
try:
# Add dummy year to make a complete date
datetime.strptime(f"2025_{date_str}", "%Y_%m_%d")
return True
except ValueError as e:
logging.error(f"Invalid date format: {str(e)}")
return False
def get_month_name(month_num):
"""Convert month number to abbreviated month name"""
month_names = {
'01': 'jan', '02': 'feb', '03': 'mar', '04': 'apr',
'05': 'may', '06': 'jun', '07': 'jul', '08': 'aug',
'09': 'sep', '10': 'oct', '11': 'nov', '12': 'dec'
}
return month_names.get(month_num, 'unknown')
def ensure_directory_exists(directory):
"""Ensure directory exists, create if necessary"""
try:
if not os.path.exists(directory):
os.makedirs(directory)
logging.info(f"Created directory: {directory}")
return True
except OSError as e:
logging.error(f"Error creating directory {directory}: {str(e)}")
return False
def clean_existing_files(zip_path):
"""Remove existing zip file if it exists"""
try:
if os.path.exists(zip_path):
os.remove(zip_path)
logging.info(f"Removed existing zip file: {zip_path}")
except OSError as e:
logging.error(f"Error removing existing file {zip_path}: {str(e)}")
raise DataProcessingError(f"Could not remove existing file: {str(e)}")
def process_and_zip_folders(date_str):
"""Zip the downloaded folders with new names and organize them"""
try:
month, day = date_str.split('_')
month_name = get_month_name(month)
# Ensure zipped_data directory exists
if not ensure_directory_exists("zipped_data"):
raise DataProcessingError("Failed to create zipped_data directory")
# Original folder names
vlog_folder = f"vlog/2025_{date_str}"
transcript_folder = f"transcript/2025_{date_str}"
# New zip file paths
audio_zip = f"zipped_data/audio_{month_name}_{day}"
transcript_zip = f"zipped_data/transcript_{month_name}_{day}"
# Process vlog folder
if os.path.exists(vlog_folder):
clean_existing_files(f"{audio_zip}.zip")
shutil.make_archive(audio_zip, 'zip', vlog_folder)
logging.info(f"Created {audio_zip}.zip")
else:
logging.warning(f"Vlog folder not found: {vlog_folder}")
# Process transcript folder
if os.path.exists(transcript_folder):
clean_existing_files(f"{transcript_zip}.zip")
shutil.make_archive(transcript_zip, 'zip', transcript_folder)
logging.info(f"Created {transcript_zip}.zip")
else:
logging.warning(f"Transcript folder not found: {transcript_folder}")
return True
except Exception as e:
logging.error(f"Error in process_and_zip_folders: {str(e)}")
raise DataProcessingError(f"Failed to process and zip folders: {str(e)}")
def download_folders(date_str):
"""Download vlog and transcript folders for given date"""
try:
# Remote server
remote_host = "mtb"
# Base paths
vlog_base = "/usr/local/ccpro/AA/vlog/2025"
transcript_base = "/usr/local/ccpro/AA/transcript/2025"
# Full folder names
year = "2025"
folder_name = f"{year}_{date_str}"
# Ensure local directories exist
for directory in ["vlog", "transcript"]:
if not ensure_directory_exists(directory):
raise DataProcessingError(f"Failed to create {directory} directory")
# Download commands
vlog_cmd = f"scp -r {remote_host}:{vlog_base}/{folder_name} ./vlog/"
transcript_cmd = f"scp -r {remote_host}:{transcript_base}/{folder_name} ./transcript/"
logging.info(f"Downloading vlog folder for date: {date_str}")
vlog_result = subprocess.run(vlog_cmd, shell=True, stderr=subprocess.PIPE)
logging.info(f"Downloading transcript folder for date: {date_str}")
transcript_result = subprocess.run(transcript_cmd, shell=True, stderr=subprocess.PIPE)
if vlog_result.returncode != 0:
logging.error(f"Vlog download error: {vlog_result.stderr.decode()}")
if transcript_result.returncode != 0:
logging.error(f"Transcript download error: {transcript_result.stderr.decode()}")
if vlog_result.returncode == 0 and transcript_result.returncode == 0:
logging.info("Successfully downloaded both folders")
# Process and zip the downloaded folders
return process_and_zip_folders(date_str)
else:
raise DataProcessingError("Failed to download one or both folders")
except Exception as e:
logging.error(f"Error in download_folders: {str(e)}")
return False
def main():
try:
if len(sys.argv) < 2:
print("Usage: ./download_logs.py DATE")
print("Supported date formats:")
print(" - MM_DD (e.g., 02_19)")
print(" - Month DD (e.g., Feb 20, February 20)")
print(" - Month_DD (e.g., Feb_20, feb_20)")
sys.exit(1)
# Join all arguments after the script name to handle unquoted dates with spaces
date_input = ' '.join(sys.argv[1:])
date_str = parse_date(date_input)
if not date_str or not validate_date(date_str):
print("Invalid date format! Please use one of the following formats:")
print(" - MM_DD (e.g., 02_19)")
print(" - Month DD (e.g., Feb 20, February 20)")
print(" - Month_DD (e.g., Feb_20, feb_20)")
sys.exit(1)
success = download_folders(date_str)
if not success:
logging.error("Script execution failed")
sys.exit(1)
except Exception as e:
logging.error(f"Unexpected error: {str(e)}")
sys.exit(1)
if __name__ == "__main__":
main()
@ehzawad
Copy link
Author

ehzawad commented Feb 20, 2025

#!/usr/bin/env python3
import sys
import os
import subprocess
from datetime import datetime
import re
import shutil
import logging

Set up logging

logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)

class DataProcessingError(Exception):
"""Custom exception for data processing errors"""
pass

def parse_date(date_input):
"""
Parse various date formats and return MM_DD format
Handles formats like:
- Feb 20, February 20
- Feb_20, feb_20
- 02_20, 2_20
"""
try:
# Remove any extra spaces and convert to lowercase
date_input = date_input.strip().lower()

    # Dictionary for month names to numbers
    month_dict = {
        'jan': '01', 'january': '01',
        'feb': '02', 'february': '02',
        'mar': '03', 'march': '03',
        'apr': '04', 'april': '04',
        'may': '05',
        'jun': '06', 'june': '06',
        'jul': '07', 'july': '07',
        'aug': '08', 'august': '08',
        'sep': '09', 'september': '09',
        'oct': '10', 'october': '10',
        'nov': '11', 'november': '11',
        'dec': '12', 'december': '12'
    }
    
    # Case 1: Already in MM_DD format
    if re.match(r'^\d{1,2}[_-]\d{1,2}$', date_input):
        month, day = date_input.replace('-', '_').split('_')
        month = month.zfill(2)
        day = day.zfill(2)
        return f"{month}_{day}"
        
    # Case 2: Month name formats (Feb 20, February 20, Feb_20, etc.)
    patterns = [
        r'^([a-z]+)[_\s-](\d{1,2})$',  # Feb_20, Feb 20
        r'^([a-z]+)(\d{1,2})$'         # Feb20
    ]
    
    for pattern in patterns:
        match = re.match(pattern, date_input)
        if match:
            month_str, day = match.groups()
            if month_str in month_dict:
                return f"{month_dict[month_str]}_{day.zfill(2)}"
    
    raise ValueError(f"Unable to parse date format: {date_input}")
    
except Exception as e:
    logging.error(f"Error parsing date: {str(e)}")
    return None

def validate_date(date_str):
"""Validate the date format (MM_DD)"""
try:
# Add dummy year to make a complete date
datetime.strptime(f"2025_{date_str}", "%Y_%m_%d")
return True
except ValueError as e:
logging.error(f"Invalid date format: {str(e)}")
return False

def get_month_name(month_num):
"""Convert month number to abbreviated month name"""
month_names = {
'01': 'jan', '02': 'feb', '03': 'mar', '04': 'apr',
'05': 'may', '06': 'jun', '07': 'jul', '08': 'aug',
'09': 'sep', '10': 'oct', '11': 'nov', '12': 'dec'
}
return month_names.get(month_num, 'unknown')

def ensure_directory_exists(directory):
"""Ensure directory exists, create if necessary"""
try:
if not os.path.exists(directory):
os.makedirs(directory)
logging.info(f"Created directory: {directory}")
return True
except OSError as e:
logging.error(f"Error creating directory {directory}: {str(e)}")
return False

def clean_existing_files(zip_path):
"""Remove existing zip file if it exists"""
try:
if os.path.exists(zip_path):
os.remove(zip_path)
logging.info(f"Removed existing zip file: {zip_path}")
except OSError as e:
logging.error(f"Error removing existing file {zip_path}: {str(e)}")
raise DataProcessingError(f"Could not remove existing file: {str(e)}")

def process_and_zip_folders(date_str):
"""Zip the downloaded folders with new names and organize them"""
try:
month, day = date_str.split('_')
month_name = get_month_name(month)

    # Ensure zipped_data directory exists
    if not ensure_directory_exists("zipped_data"):
        raise DataProcessingError("Failed to create zipped_data directory")
    
    # Original folder names
    vlog_folder = f"vlog/2025_{date_str}"
    transcript_folder = f"transcript/2025_{date_str}"
    
    # New zip file paths
    audio_zip = f"zipped_data/audio_{month_name}_{day}"
    transcript_zip = f"zipped_data/transcript_{month_name}_{day}"
    
    # Process vlog folder
    if os.path.exists(vlog_folder):
        clean_existing_files(f"{audio_zip}.zip")
        shutil.make_archive(audio_zip, 'zip', vlog_folder)
        logging.info(f"Created {audio_zip}.zip")
    else:
        logging.warning(f"Vlog folder not found: {vlog_folder}")
    
    # Process transcript folder
    if os.path.exists(transcript_folder):
        clean_existing_files(f"{transcript_zip}.zip")
        shutil.make_archive(transcript_zip, 'zip', transcript_folder)
        logging.info(f"Created {transcript_zip}.zip")
    else:
        logging.warning(f"Transcript folder not found: {transcript_folder}")
        
    return True
        
except Exception as e:
    logging.error(f"Error in process_and_zip_folders: {str(e)}")
    raise DataProcessingError(f"Failed to process and zip folders: {str(e)}")

def download_folders(date_str):
"""Download vlog and transcript folders for given date"""
try:
# Remote server
remote_host = "mtb"

    # Base paths
    vlog_base = "/usr/local/ccpro/AA/vlog/2025"
    transcript_base = "/usr/local/ccpro/AA/transcript/2025"
    
    # Full folder names
    year = "2025"
    folder_name = f"{year}_{date_str}"
    
    # Ensure local directories exist
    for directory in ["vlog", "transcript"]:
        if not ensure_directory_exists(directory):
            raise DataProcessingError(f"Failed to create {directory} directory")
    
    # Download commands
    vlog_cmd = f"scp -r {remote_host}:{vlog_base}/{folder_name} ./vlog/"
    transcript_cmd = f"scp -r {remote_host}:{transcript_base}/{folder_name} ./transcript/"
    
    logging.info(f"Downloading vlog folder for date: {date_str}")
    vlog_result = subprocess.run(vlog_cmd, shell=True, stderr=subprocess.PIPE)
    
    logging.info(f"Downloading transcript folder for date: {date_str}")
    transcript_result = subprocess.run(transcript_cmd, shell=True, stderr=subprocess.PIPE)
    
    if vlog_result.returncode != 0:
        logging.error(f"Vlog download error: {vlog_result.stderr.decode()}")
    if transcript_result.returncode != 0:
        logging.error(f"Transcript download error: {transcript_result.stderr.decode()}")
    
    if vlog_result.returncode == 0 and transcript_result.returncode == 0:
        logging.info("Successfully downloaded both folders")
        # Process and zip the downloaded folders
        return process_and_zip_folders(date_str)
    else:
        raise DataProcessingError("Failed to download one or both folders")
        
except Exception as e:
    logging.error(f"Error in download_folders: {str(e)}")
    return False

def clean_directories():
"""Remove all created directories"""
try:
directories = ['transcript', 'vlog', 'zipped_data']
for directory in directories:
if os.path.exists(directory):
shutil.rmtree(directory)
logging.info(f"Removed directory: {directory}")
print("Successfully cleaned all directories!")
return True
except Exception as e:
logging.error(f"Error cleaning directories: {str(e)}")
return False

def main():
try:
if len(sys.argv) < 2:
print("Usage: ./download_logs.py [DATE|clean]")
print("Options:")
print(" DATE - Download and process logs for the given date")
print(" clean - Remove all created directories")
print("\nSupported date formats:")
print(" - MM_DD (e.g., 02_19)")
print(" - Month DD (e.g., Feb 20, February 20)")
print(" - Month_DD (e.g., Feb_20, feb_20)")
sys.exit(1)

    # Check if the command is 'clean'
    if sys.argv[1].lower() == 'clean':
        if clean_directories():
            sys.exit(0)
        else:
            sys.exit(1)
    
    # Join all arguments after the script name to handle unquoted dates with spaces
    date_input = ' '.join(sys.argv[1:])
    date_str = parse_date(date_input)
    
    if not date_str or not validate_date(date_str):
        print("Invalid date format! Please use one of the following formats:")
        print("  - MM_DD (e.g., 02_19)")
        print("  - Month DD (e.g., Feb 20, February 20)")
        print("  - Month_DD (e.g., Feb_20, feb_20)")
        sys.exit(1)
    
    success = download_folders(date_str)
    if not success:
        logging.error("Script execution failed")
        sys.exit(1)
    
except Exception as e:
    logging.error(f"Unexpected error: {str(e)}")
    sys.exit(1)

if name == "main":
main()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment