Skip to content

Instantly share code, notes, and snippets.

@thuanpham582002
Created November 27, 2025 13:23
Show Gist options
  • Select an option

  • Save thuanpham582002/fa9d830d3bb451f39667f486dbb075bc to your computer and use it in GitHub Desktop.

Select an option

Save thuanpham582002/fa9d830d3bb451f39667f486dbb075bc to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
GPU Monitoring & Netchat Notification Service
Standalone Python script to:
1. Query GPU metrics from Prometheus
2. Send formatted notifications to Netchat API
3. Run on scheduled intervals using cron
Requirements:
pip install requests schedule python-dotenv
"""
import os
import requests
import json
import logging
import time
import schedule
import math
from datetime import datetime, timedelta, timezone
from typing import Optional, Dict, List, Any, Tuple
from dataclasses import dataclass
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('gpu_monitor.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Namespace Performance Constants
TOTAL_MINUTES_PER_DAY = 24 * 60
# Query Functions for Namespace Performance
def get_yesterday_range() -> Tuple[int, int]:
"""Get yesterday's timestamp range."""
now = datetime.now(timezone.utc).astimezone()
start_of_today = now.replace(hour=0, minute=0, second=0, microsecond=0)
start_of_yesterday = start_of_today - timedelta(days=1)
end_of_yesterday = start_of_today - timedelta(milliseconds=1)
return int(start_of_yesterday.timestamp()), int(end_of_yesterday.timestamp())
def query_namespace_low_performance(prometheus_url: str, namespace: Optional[str], timestamp: int) -> Dict[str, float]:
"""Query namespace low performance metrics."""
if namespace:
query = (
f'count_over_time(('
f'(avg by (namespace) (process_gpu_sm_utilization_Percentage{{namespace="{namespace}"}}) >= 0 '
f'and avg by (namespace) (process_gpu_sm_utilization_Percentage{{namespace="{namespace}"}}) < 15)'
f' and (avg by (namespace) (process_gpu_memory_utilization_Percentage{{namespace="{namespace}"}}) >= 0 '
f'and avg by (namespace) (process_gpu_memory_utilization_Percentage{{namespace="{namespace}"}}) < 15)'
f')[24h:1m])'
)
else:
query = """
count_over_time(
(
(avg by (namespace) (process_gpu_sm_utilization_Percentage) >= 0 and avg by (namespace) (process_gpu_sm_utilization_Percentage) < 15)
and (avg by (namespace) (process_gpu_memory_utilization_Percentage) >= 0 and avg by (namespace) (process_gpu_memory_utilization_Percentage) < 15)
)[24h:1m]
)
""".strip()
session = requests.Session()
session.trust_env = False # Ignore proxy environment variables
resp = session.get(f'{prometheus_url}/api/v1/query', params={'query': query, 'time': timestamp}, timeout=30)
resp.raise_for_status()
data = resp.json()
if data.get('status') != 'success':
raise RuntimeError('Prometheus query failed')
namespace_map = {}
for item in data['data']['result']:
ns = item.get('metric', {}).get('namespace') or 'unknown'
value = float(item.get('value', [0, '0'])[1])
namespace_map[ns] = value
return namespace_map
def query_namespace_high_performance(prometheus_url: str, namespace: Optional[str], timestamp: int) -> Dict[str, float]:
"""Query namespace high performance metrics."""
if namespace:
query = (
f'count_over_time(('
f'(avg by (namespace) (process_gpu_sm_utilization_Percentage{{namespace="{namespace}"}}) >= 90 '
f'and avg by (namespace) (process_gpu_sm_utilization_Percentage{{namespace="{namespace}"}}) < 100)'
f' and (avg by (namespace) (process_gpu_memory_utilization_Percentage{{namespace="{namespace}"}}) >= 90 '
f'and avg by (namespace) (process_gpu_memory_utilization_Percentage{{namespace="{namespace}"}}) < 100)'
f')[24h:1m])'
)
else:
query = """
count_over_time(
(
(avg by (namespace) (process_gpu_sm_utilization_Percentage) >= 90 and avg by (namespace) (process_gpu_sm_utilization_Percentage) < 100)
and (avg by (namespace) (process_gpu_memory_utilization_Percentage) >= 90 and avg by (namespace) (process_gpu_memory_utilization_Percentage) < 100)
)[24h:1m]
)
""".strip()
session = requests.Session()
session.trust_env = False # Ignore proxy environment variables
resp = session.get(f'{prometheus_url}/api/v1/query', params={'query': query, 'time': timestamp}, timeout=30)
resp.raise_for_status()
data = resp.json()
if data.get('status') != 'success':
raise RuntimeError('Prometheus query failed')
namespace_map = {}
for item in data['data']['result']:
ns = item.get('metric', {}).get('namespace') or 'unknown'
value = float(item.get('value', [0, '0'])[1])
namespace_map[ns] = value
return namespace_map
# Formatting Functions
def format_hours(hours: float) -> str:
"""Convert decimal hours to 'Xh Ym' format."""
total_hours = math.floor(hours)
minutes = round((hours - total_hours) * 60)
if total_hours and minutes:
return f'{total_hours}h {minutes}m'
if total_hours:
return f'{total_hours}h'
return f'{minutes}m'
def format_single_namespace_message(
namespace: str,
low_minutes: float,
high_minutes: float,
) -> str:
"""Format single namespace performance message."""
medium_minutes = TOTAL_MINUTES_PER_DAY - low_minutes - high_minutes
start_ts, _ = get_yesterday_range()
yesterday = datetime.fromtimestamp(start_ts).astimezone().strftime('%d/%m/%Y')
today = datetime.now().astimezone().strftime('%d/%m/%Y')
low_percent = (low_minutes / TOTAL_MINUTES_PER_DAY) * 100
high_percent = (high_minutes / TOTAL_MINUTES_PER_DAY) * 100
medium_percent = (medium_minutes / TOTAL_MINUTES_PER_DAY) * 100
message = f'### 📊 Báo Cáo Hiệu Quả Namespace - {today}\n\n'
message += f'**Namespace:** `{namespace}`\n'
message += f'**Thời gian đánh giá:** {yesterday} \n\n'
message += '---\n\n'
message += f'#### 🏷️ Namespace: `{namespace}`\n\n'
message += '| Mức độ | Thời gian | % |\n'
message += '|--------|-----------|---|\n'
message += f'| 🟡 Hiệu suất thấp | {format_hours(low_minutes / 60)} | {low_percent:.1f}% |\n'
message += f'| 🟢 Trung bình | {format_hours(medium_minutes / 60)} | {medium_percent:.1f}% |\n'
message += f'| 🔴 Cao | {format_hours(high_minutes / 60)} | {high_percent:.1f}% |\n\n'
message += '---\n\n'
if low_percent > max(high_percent, medium_percent):
dominant = '🟡 Thấp'
elif high_percent > max(low_percent, medium_percent):
dominant = '🔴 Cao'
else:
dominant = '🟢 Trung bình'
message += f'**Đánh giá:** Namespace này chủ yếu hoạt động ở mức **{dominant}** trong ngày.\n\n'
return message
def format_multiple_namespace_message(
namespaces: List[str],
namespace_metrics: Dict[str, Tuple[float, float]],
) -> str:
"""Format multiple namespace performance message."""
start_ts, _ = get_yesterday_range()
yesterday = datetime.fromtimestamp(start_ts).astimezone().strftime('%d/%m/%Y')
today = datetime.now().astimezone().strftime('%d/%m/%Y')
message = f'### 📊 Báo Cáo Hiệu Quả Namespace - {today}\n\n'
message += f'**Thời gian đánh giá:** {yesterday} \n'
message += f'**Số Namespace:** {len(namespaces)}\n'
message += '**Danh sách Namespace:** ' + ', '.join(f'`{ns}`' for ns in namespaces) + '\n\n'
message += '---\n\n'
for namespace in namespaces:
low_minutes, high_minutes = namespace_metrics.get(namespace, (0.0, 0.0))
message += format_single_namespace_message(namespace, low_minutes, high_minutes)
total_low = sum(namespace_metrics.get(ns, (0.0, 0.0))[0] for ns in namespaces) / 60
total_high = sum(namespace_metrics.get(ns, (0.0, 0.0))[1] for ns in namespaces) / 60
total_medium = len(namespaces) * 24 - total_low - total_high
avg_low_percent = (
sum((namespace_metrics.get(ns, (0.0, 0.0))[0] / TOTAL_MINUTES_PER_DAY) * 100 for ns in namespaces) / len(namespaces)
) if namespaces else 0.0
avg_high_percent = (
sum((namespace_metrics.get(ns, (0.0, 0.0))[1] / TOTAL_MINUTES_PER_DAY) * 100 for ns in namespaces) / len(namespaces)
) if namespaces else 0.0
avg_medium_percent = 100 - avg_low_percent - avg_high_percent
message += '### 📈 Tổng Hợp\n\n'
message += '| Mức độ | Tổng thời gian | Trung bình % |\n'
message += '|--------|----------------|--------------|\n'
message += f'| 🟡 Thấp | {format_hours(total_low)} | {avg_low_percent:.1f}% |\n'
message += f'| 🟢 Trung bình | {format_hours(total_medium)} | {avg_medium_percent:.1f}% |\n'
message += f'| 🔴 Cao | {format_hours(total_high)} | {avg_high_percent:.1f}% |\n\n'
return message
@dataclass
class Config:
"""Configuration class"""
prometheus_url: str = os.getenv('PROMETHEUS_URL', 'http://10.24.10.14:30005')
netchat_url: str = os.getenv('NETCHAT_URL', 'http://10.255.62.213/netchat-netmind/send-message')
netchat_api_key: str = os.getenv('NETCHAT_API_KEY', 'NOmTvIZOVsLIAfdlYkAKfMoyQEyjxDao')
netchat_channel_id: str = os.getenv('NETCHAT_CHANNEL_ID', 'i6shkznnstnjfmh9ga9s4cf75o')
netchat_service_name: str = os.getenv('NETCHAT_SERVICE_NAME', 'GPU-Monitoring')
netchat_sender_name: str = os.getenv('NETCHAT_SENDER_NAME', 'system')
# Scheduling
schedule_time: str = os.getenv('SCHEDULE_TIME', '08:00') # Default 8:00 AM
# Retry settings
max_retries: int = int(os.getenv('MAX_RETRIES', '3'))
retry_delay: int = int(os.getenv('RETRY_DELAY', '5')) # seconds
# Debug mode - single option for immediate debugging
debug_immediate: bool = os.getenv('DEBUG_IMMEDIATE', 'false').lower() == 'true'
# Namespace filter
namespace_filter: Optional[str] = os.getenv('NAMESPACE_FILTER') # If None, get all namespaces
class PrometheusClient:
"""Prometheus API client for GPU metrics"""
def __init__(self, base_url: str, namespace_filter: Optional[str] = None):
self.base_url = base_url.rstrip('/')
self.session = requests.Session()
self.session.timeout = 30
self.namespace_filter = namespace_filter
def query_gpu_quota_and_utilization(self) -> List[Dict[str, Any]]:
"""Query GPU quota and utilization metrics from Prometheus"""
# 1. Get GPU quota metrics
quota_query = 'process_nvidia_gpu_requests_Count'
# Add namespace filter to query if specified
if self.namespace_filter:
quota_query += f'{{pod_namespace="{self.namespace_filter}"}}'
logger.info(f"Filtering by namespace: {self.namespace_filter}")
quota_url = f"{self.base_url}/api/v1/query?query={quota_query}"
try:
logger.info(f"Querying GPU quota from: {quota_url}")
quota_response = self.session.get(quota_url)
quota_response.raise_for_status()
quota_metrics = quota_response.json().get('data', {}).get('result', [])
if not quota_metrics:
logger.warning("No GPU quota metrics found")
return []
logger.info(f"Found {len(quota_metrics)} GPU quota metrics")
# 2. Group by Pod and collect hostnames
pod_map = {}
unique_hostnames = set()
for metric in quota_metrics:
pod_name = metric.get('metric', {}).get('pod_name', 'unknown')
namespace = metric.get('metric', {}).get('pod_namespace', 'unknown')
hostname = metric.get('metric', {}).get('hostname', 'unknown')
quota = float(metric.get('value', [0, '0'])[1])
# Skip if namespace filter is set and doesn't match
if self.namespace_filter and namespace != self.namespace_filter:
continue
key = f"{namespace}/{pod_name}"
if key not in pod_map:
pod_map[key] = {
'pod_name': pod_name,
'namespace': namespace,
'gpu_quota': 0,
'hostname': hostname
}
unique_hostnames.add(hostname)
pod_map[key]['gpu_quota'] += quota
# 3. Get utilization metrics for all hostnames
hostname_list = list(unique_hostnames)
if not hostname_list:
logger.warning("No hostnames found for utilization query")
return list(pod_map.values())
utilization_data = self._query_utilization_by_hostnames(hostname_list)
# 4. Map utilization back to pods
result = []
for pod_info in pod_map.values():
hostname = pod_info['hostname']
util_data = utilization_data.get(hostname, {
'gpu_utilization': 0,
'gpu_vram': 0
})
result.append({
'pod_name': pod_info['pod_name'],
'namespace': pod_info['namespace'],
'gpu_quota': pod_info['gpu_quota'],
'gpu_utilization': util_data['gpu_utilization'],
'gpu_vram': util_data['gpu_vram'],
'hostname': hostname
})
return result
except Exception as e:
logger.error(f"Error querying GPU metrics: {str(e)}")
raise
def _query_utilization_by_hostnames(self, hostnames: List[str]) -> Dict[str, Dict[str, Any]]:
"""Query GPU utilization and VRAM for multiple hostnames"""
hostname_regex = '|'.join(hostnames)
# Use your advanced time aggregation queries
vram_query = f"avg(max_over_time(quantile_over_time(0.95, gpu_memory_percent_Percentage{{hostname=~\"{hostname_regex}\"}}[1h])[1d:])) by (hostname)"
utilization_query = f"avg(max_over_time(quantile_over_time(0.95, gpu_utilization_Percentage{{hostname=~\"{hostname_regex}\"}}[1h])[1d:])) by (hostname)"
result = {}
# Query VRAM metrics
vram_url = f"{self.base_url}/api/v1/query?query={vram_query}"
try:
vram_response = self.session.get(vram_url)
vram_response.raise_for_status()
vram_metrics = vram_response.json().get('data', {}).get('result', [])
for metric in vram_metrics:
hostname = metric.get('metric', {}).get('hostname')
vram_value = float(metric.get('value', [0, '0'])[1])
if hostname not in result:
result[hostname] = {'gpu_utilization': 0, 'gpu_vram': 0}
result[hostname]['gpu_vram'] = vram_value
except Exception as e:
logger.error(f"Error querying VRAM metrics: {str(e)}")
# Query utilization metrics
utilization_url = f"{self.base_url}/api/v1/query?query={utilization_query}"
try:
util_response = self.session.get(utilization_url)
util_response.raise_for_status()
util_metrics = util_response.json().get('data', {}).get('result', [])
for metric in util_metrics:
hostname = metric.get('metric', {}).get('hostname')
util_value = float(metric.get('value', [0, '0'])[1])
if hostname not in result:
result[hostname] = {'gpu_utilization': 0, 'gpu_vram': 0}
result[hostname]['gpu_utilization'] = util_value
except Exception as e:
logger.error(f"Error querying utilization metrics: {str(e)}")
return result
class NetchatNotifier:
"""Netchat API client for sending notifications"""
def __init__(self, config: Config):
self.config = config
self.session = requests.Session()
self.session.timeout = 30
self.session.trust_env = False # Ignore proxy environment variables
self.session.headers.update({
'Cache-Control': 'no-cache',
'Content-Type': 'application/json',
'x-api-key': config.netchat_api_key
})
def send_notification(self, message: str) -> bool:
"""Send notification to Netchat with retry logic"""
payload = {
'msg': message,
'channel_id': self.config.netchat_channel_id,
'service_name': self.config.netchat_service_name,
'sender_name': self.config.netchat_sender_name
}
logger.info(f"Sending notification to Netchat: {self.config.netchat_url}")
logger.debug(f"Message preview: {message[:100]}...")
last_error = None
for attempt in range(1, self.config.max_retries + 1):
try:
response = self.session.post(self.config.netchat_url, json=payload)
response.raise_for_status()
logger.info(f"✅ Notification sent successfully on attempt {attempt}")
return True
except Exception as e:
last_error = str(e)
logger.warning(f"❌ Attempt {attempt}/{self.config.max_retries} failed: {last_error}")
if attempt < self.config.max_retries:
logger.info(f"Retrying in {self.config.retry_delay} seconds...")
time.sleep(self.config.retry_delay)
logger.error(f"❌ Failed to send notification after {self.config.max_retries} attempts")
return False
class GPUMessageFormatter:
"""Format GPU metrics into readable messages"""
@staticmethod
def format_gpu_report(pod_data: List[Dict[str, Any]]) -> str:
"""Format GPU data into a readable message"""
if not pod_data:
return "⚠️ **Không có dữ liệu GPU**\n\nKhông tìm thấy GPU metrics từ Prometheus."
# Group by namespace
namespace_groups = {}
for pod in pod_data:
namespace = pod['namespace']
if namespace not in namespace_groups:
namespace_groups[namespace] = []
namespace_groups[namespace].append(pod)
message = f"### 🎮 GPU Usage Report\n\n"
message += f"> *Dữ liệu được tổng hợp trong 24h với 95th percentile aggregation*\n"
message += f"> *Thời gian: {datetime.now().strftime('%H:%M:%S %d/%m/%Y')}*\n"
# Add namespace filter info if specified
if pod_data and len(set(pod['namespace'] for pod in pod_data)) == 1:
single_namespace = pod_data[0]['namespace']
message += f"> *Namespace Filter: `{single_namespace}`*\n\n"
else:
message += "\n"
# Format each namespace
for namespace, pods in namespace_groups.items():
message += f"#### 🏷️ Namespace: `{namespace}`\n\n"
for i, pod in enumerate(pods, 1):
utilization = pod['gpu_utilization']
vram = pod['gpu_vram']
quota = pod['gpu_quota']
# Status emoji
if utilization >= 80:
emoji = '🔴' # High
elif utilization >= 50:
emoji = '🟡' # Medium
elif utilization >= 20:
emoji = '🟢' # Low
else:
emoji = '⚪' # Idle
message += f"{emoji} **{i}. `{pod['pod_name']}`**\n"
message += f" - GPU Utilization: **{utilization:.1f}%**\n"
message += f" - VRAM: **{vram:.1f}%**\n"
message += f" - GPU Quota: **{quota}** GPU(s)\n\n"
message += "---\n\n"
# Summary statistics
total_quota = sum(pod['gpu_quota'] for pod in pod_data)
avg_utilization = sum(pod['gpu_utilization'] for pod in pod_data) / len(pod_data)
max_utilization = max(pod['gpu_utilization'] for pod in pod_data)
min_utilization = min(pod['gpu_utilization'] for pod in pod_data)
avg_vram = sum(pod['gpu_vram'] for pod in pod_data) / len(pod_data)
max_vram = max(pod['gpu_vram'] for pod in pod_data)
min_vram = min(pod['gpu_vram'] for pod in pod_data)
message += "### 📈 Tổng Hợp\n\n"
message += "**GPU Quota:**\n"
message += f"- 💾 Tổng GPU Quota: **{total_quota}** GPU(s)\n\n"
message += "**GPU Utilization:**\n"
message += f"- 📊 Trung bình: **{avg_utilization:.1f}%**\n"
message += f"- 🔴 Cao nhất: **{max_utilization:.1f}%**\n"
message += f"- 🟢 Thấp nhất: **{min_utilization:.1f}%**\n\n"
message += "**GPU VRAM:**\n"
message += f"- 📊 Trung bình: **{avg_vram:.1f}%**\n"
message += f"- 🔴 Cao nhất: **{max_vram:.1f}%**\n"
message += f"- 🟢 Thấp nhất: **{min_vram:.1f}%**\n\n"
message += "**Thông tin khác:**\n"
message += f"- 🎯 Tổng số Namespace: **{len(namespace_groups)}**\n"
message += f"- 📦 Tổng số Pod: **{len(pod_data)}**\n"
message += f"- ⏱️ Time Range: **95th percentile over 1h windows, averaged over 24h**\n"
return message
class GPUMonitor:
"""Main GPU monitoring service"""
def __init__(self):
self.config = Config()
self.prometheus_client = PrometheusClient(self.config.prometheus_url, self.config.namespace_filter)
self.netchat_notifier = NetchatNotifier(self.config)
self.formatter = GPUMessageFormatter()
# Validate configuration
self._validate_config()
def _validate_config(self):
"""Validate required configuration"""
if not self.config.prometheus_url:
raise ValueError("PROMETHEUS_URL is required")
if not self.config.netchat_url:
raise ValueError("NETCHAT_URL is required")
if not self.config.netchat_api_key:
raise ValueError("NETCHAT_API_KEY is required")
if not self.config.netchat_channel_id:
raise ValueError("NETCHAT_CHANNEL_ID is required")
logger.info("✅ Configuration validation passed")
def run_monitoring(self) -> bool:
"""Run the complete monitoring cycle"""
try:
logger.info("🚀 Starting GPU monitoring cycle...")
# 1. Query GPU metrics
logger.info("📊 Querying GPU metrics from Prometheus...")
pod_data = self.prometheus_client.query_gpu_quota_and_utilization()
if not pod_data:
logger.warning("⚠️ No GPU data found, sending warning notification")
warning_msg = "⚠️ **Không có dữ liệu GPU Quota**\n\nKhông tìm thấy metrics process_nvidia_gpu_requests_Count từ Prometheus."
return self.netchat_notifier.send_notification(warning_msg)
logger.info(f"✅ Retrieved data for {len(pod_data)} GPU pods")
# 2. Format message
logger.info("📝 Formatting GPU report message...")
message = self.formatter.format_gpu_report(pod_data)
# 3. Send notification
logger.info("📤 Sending notification to Netchat...")
success = self.netchat_notifier.send_notification(message)
if success:
logger.info("✅ GPU monitoring cycle completed successfully")
return True
else:
logger.error("❌ GPU monitoring cycle failed")
return False
except Exception as e:
logger.error(f"❌ Error in monitoring cycle: {str(e)}")
# Send error notification
try:
error_msg = f"❌ **Lỗi GPU Monitor Cronjob**\n\nLỗi: {str(e)}\n\nThời gian: {datetime.now().strftime('%H:%M:%S %d/%m/%Y')}"
return self.netchat_notifier.send_notification(error_msg)
except Exception as e2:
logger.error(f"❌ Failed to send error notification: {str(e2)}")
return False
def scheduled_job():
"""Function to be called by scheduler"""
logger.info("🕐 [Cronjob] Starting scheduled GPU monitoring...")
monitor = GPUMonitor()
success = monitor.run_monitoring()
if success:
logger.info("✅ [Cronjob] Scheduled job completed successfully")
else:
logger.error("❌ [Cronjob] Scheduled job failed")
def main():
"""Main function"""
try:
# Load environment variables
load_dotenv()
# Print configuration
config = Config()
logger.info("🎮 GPU Monitor Service Started")
logger.info(f"📡 Prometheus URL: {config.prometheus_url}")
logger.info(f"💬 Netchat URL: {config.netchat_url}")
logger.info(f"⏰ Schedule time: {config.schedule_time}")
logger.info(f"🔄 Max retries: {config.max_retries}")
logger.info(f"⏱️ Retry delay: {config.retry_delay}s")
# Debug mode handling
if config.debug_immediate:
logger.info("🐛 DEBUG IMMEDIATE MODE - Send notification now, skip scheduling")
logger.info("-" * 50)
# Set logging level to DEBUG for verbose output
logging.getLogger().setLevel(logging.DEBUG)
# Run monitoring immediately
monitor = GPUMonitor()
monitor.run_monitoring()
logger.info("🚀 Debug mode: Monitor running in foreground (Ctrl+C to stop)")
try:
while True:
time.sleep(300) # Check every 5 minutes
except KeyboardInterrupt:
pass
else:
logger.info("-" * 50)
logger.info(f"⏰ Setting up daily schedule at {config.schedule_time}")
# Setup normal scheduling
schedule.every().day.at(config.schedule_time).do(scheduled_job)
logger.info("✅ Scheduler configured. Waiting for next scheduled run...")
logger.info("Press Ctrl+C to stop")
# Keep the script running
while True:
schedule.run_pending()
time.sleep(60) # Check every minute
except KeyboardInterrupt:
logger.info("👋 Stopping GPU Monitor Service")
except Exception as e:
logger.error(f"❌ Fatal error: {str(e)}")
return 1
return 0
if __name__ == "__main__":
exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment