Created
November 27, 2025 13:23
-
-
Save thuanpham582002/fa9d830d3bb451f39667f486dbb075bc to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| GPU Monitoring & Netchat Notification Service | |
| Standalone Python script to: | |
| 1. Query GPU metrics from Prometheus | |
| 2. Send formatted notifications to Netchat API | |
| 3. Run on scheduled intervals using cron | |
| Requirements: | |
| pip install requests schedule python-dotenv | |
| """ | |
| import os | |
| import requests | |
| import json | |
| import logging | |
| import time | |
| import schedule | |
| import math | |
| from datetime import datetime, timedelta, timezone | |
| from typing import Optional, Dict, List, Any, Tuple | |
| from dataclasses import dataclass | |
| from dotenv import load_dotenv | |
| # Load environment variables | |
| load_dotenv() | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.FileHandler('gpu_monitor.log'), | |
| logging.StreamHandler() | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Namespace Performance Constants | |
| TOTAL_MINUTES_PER_DAY = 24 * 60 | |
| # Query Functions for Namespace Performance | |
| def get_yesterday_range() -> Tuple[int, int]: | |
| """Get yesterday's timestamp range.""" | |
| now = datetime.now(timezone.utc).astimezone() | |
| start_of_today = now.replace(hour=0, minute=0, second=0, microsecond=0) | |
| start_of_yesterday = start_of_today - timedelta(days=1) | |
| end_of_yesterday = start_of_today - timedelta(milliseconds=1) | |
| return int(start_of_yesterday.timestamp()), int(end_of_yesterday.timestamp()) | |
| def query_namespace_low_performance(prometheus_url: str, namespace: Optional[str], timestamp: int) -> Dict[str, float]: | |
| """Query namespace low performance metrics.""" | |
| if namespace: | |
| query = ( | |
| f'count_over_time((' | |
| f'(avg by (namespace) (process_gpu_sm_utilization_Percentage{{namespace="{namespace}"}}) >= 0 ' | |
| f'and avg by (namespace) (process_gpu_sm_utilization_Percentage{{namespace="{namespace}"}}) < 15)' | |
| f' and (avg by (namespace) (process_gpu_memory_utilization_Percentage{{namespace="{namespace}"}}) >= 0 ' | |
| f'and avg by (namespace) (process_gpu_memory_utilization_Percentage{{namespace="{namespace}"}}) < 15)' | |
| f')[24h:1m])' | |
| ) | |
| else: | |
| query = """ | |
| count_over_time( | |
| ( | |
| (avg by (namespace) (process_gpu_sm_utilization_Percentage) >= 0 and avg by (namespace) (process_gpu_sm_utilization_Percentage) < 15) | |
| and (avg by (namespace) (process_gpu_memory_utilization_Percentage) >= 0 and avg by (namespace) (process_gpu_memory_utilization_Percentage) < 15) | |
| )[24h:1m] | |
| ) | |
| """.strip() | |
| session = requests.Session() | |
| session.trust_env = False # Ignore proxy environment variables | |
| resp = session.get(f'{prometheus_url}/api/v1/query', params={'query': query, 'time': timestamp}, timeout=30) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| if data.get('status') != 'success': | |
| raise RuntimeError('Prometheus query failed') | |
| namespace_map = {} | |
| for item in data['data']['result']: | |
| ns = item.get('metric', {}).get('namespace') or 'unknown' | |
| value = float(item.get('value', [0, '0'])[1]) | |
| namespace_map[ns] = value | |
| return namespace_map | |
| def query_namespace_high_performance(prometheus_url: str, namespace: Optional[str], timestamp: int) -> Dict[str, float]: | |
| """Query namespace high performance metrics.""" | |
| if namespace: | |
| query = ( | |
| f'count_over_time((' | |
| f'(avg by (namespace) (process_gpu_sm_utilization_Percentage{{namespace="{namespace}"}}) >= 90 ' | |
| f'and avg by (namespace) (process_gpu_sm_utilization_Percentage{{namespace="{namespace}"}}) < 100)' | |
| f' and (avg by (namespace) (process_gpu_memory_utilization_Percentage{{namespace="{namespace}"}}) >= 90 ' | |
| f'and avg by (namespace) (process_gpu_memory_utilization_Percentage{{namespace="{namespace}"}}) < 100)' | |
| f')[24h:1m])' | |
| ) | |
| else: | |
| query = """ | |
| count_over_time( | |
| ( | |
| (avg by (namespace) (process_gpu_sm_utilization_Percentage) >= 90 and avg by (namespace) (process_gpu_sm_utilization_Percentage) < 100) | |
| and (avg by (namespace) (process_gpu_memory_utilization_Percentage) >= 90 and avg by (namespace) (process_gpu_memory_utilization_Percentage) < 100) | |
| )[24h:1m] | |
| ) | |
| """.strip() | |
| session = requests.Session() | |
| session.trust_env = False # Ignore proxy environment variables | |
| resp = session.get(f'{prometheus_url}/api/v1/query', params={'query': query, 'time': timestamp}, timeout=30) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| if data.get('status') != 'success': | |
| raise RuntimeError('Prometheus query failed') | |
| namespace_map = {} | |
| for item in data['data']['result']: | |
| ns = item.get('metric', {}).get('namespace') or 'unknown' | |
| value = float(item.get('value', [0, '0'])[1]) | |
| namespace_map[ns] = value | |
| return namespace_map | |
| # Formatting Functions | |
| def format_hours(hours: float) -> str: | |
| """Convert decimal hours to 'Xh Ym' format.""" | |
| total_hours = math.floor(hours) | |
| minutes = round((hours - total_hours) * 60) | |
| if total_hours and minutes: | |
| return f'{total_hours}h {minutes}m' | |
| if total_hours: | |
| return f'{total_hours}h' | |
| return f'{minutes}m' | |
| def format_single_namespace_message( | |
| namespace: str, | |
| low_minutes: float, | |
| high_minutes: float, | |
| ) -> str: | |
| """Format single namespace performance message.""" | |
| medium_minutes = TOTAL_MINUTES_PER_DAY - low_minutes - high_minutes | |
| start_ts, _ = get_yesterday_range() | |
| yesterday = datetime.fromtimestamp(start_ts).astimezone().strftime('%d/%m/%Y') | |
| today = datetime.now().astimezone().strftime('%d/%m/%Y') | |
| low_percent = (low_minutes / TOTAL_MINUTES_PER_DAY) * 100 | |
| high_percent = (high_minutes / TOTAL_MINUTES_PER_DAY) * 100 | |
| medium_percent = (medium_minutes / TOTAL_MINUTES_PER_DAY) * 100 | |
| message = f'### 📊 Báo Cáo Hiệu Quả Namespace - {today}\n\n' | |
| message += f'**Namespace:** `{namespace}`\n' | |
| message += f'**Thời gian đánh giá:** {yesterday} \n\n' | |
| message += '---\n\n' | |
| message += f'#### 🏷️ Namespace: `{namespace}`\n\n' | |
| message += '| Mức độ | Thời gian | % |\n' | |
| message += '|--------|-----------|---|\n' | |
| message += f'| 🟡 Hiệu suất thấp | {format_hours(low_minutes / 60)} | {low_percent:.1f}% |\n' | |
| message += f'| 🟢 Trung bình | {format_hours(medium_minutes / 60)} | {medium_percent:.1f}% |\n' | |
| message += f'| 🔴 Cao | {format_hours(high_minutes / 60)} | {high_percent:.1f}% |\n\n' | |
| message += '---\n\n' | |
| if low_percent > max(high_percent, medium_percent): | |
| dominant = '🟡 Thấp' | |
| elif high_percent > max(low_percent, medium_percent): | |
| dominant = '🔴 Cao' | |
| else: | |
| dominant = '🟢 Trung bình' | |
| message += f'**Đánh giá:** Namespace này chủ yếu hoạt động ở mức **{dominant}** trong ngày.\n\n' | |
| return message | |
| def format_multiple_namespace_message( | |
| namespaces: List[str], | |
| namespace_metrics: Dict[str, Tuple[float, float]], | |
| ) -> str: | |
| """Format multiple namespace performance message.""" | |
| start_ts, _ = get_yesterday_range() | |
| yesterday = datetime.fromtimestamp(start_ts).astimezone().strftime('%d/%m/%Y') | |
| today = datetime.now().astimezone().strftime('%d/%m/%Y') | |
| message = f'### 📊 Báo Cáo Hiệu Quả Namespace - {today}\n\n' | |
| message += f'**Thời gian đánh giá:** {yesterday} \n' | |
| message += f'**Số Namespace:** {len(namespaces)}\n' | |
| message += '**Danh sách Namespace:** ' + ', '.join(f'`{ns}`' for ns in namespaces) + '\n\n' | |
| message += '---\n\n' | |
| for namespace in namespaces: | |
| low_minutes, high_minutes = namespace_metrics.get(namespace, (0.0, 0.0)) | |
| message += format_single_namespace_message(namespace, low_minutes, high_minutes) | |
| total_low = sum(namespace_metrics.get(ns, (0.0, 0.0))[0] for ns in namespaces) / 60 | |
| total_high = sum(namespace_metrics.get(ns, (0.0, 0.0))[1] for ns in namespaces) / 60 | |
| total_medium = len(namespaces) * 24 - total_low - total_high | |
| avg_low_percent = ( | |
| sum((namespace_metrics.get(ns, (0.0, 0.0))[0] / TOTAL_MINUTES_PER_DAY) * 100 for ns in namespaces) / len(namespaces) | |
| ) if namespaces else 0.0 | |
| avg_high_percent = ( | |
| sum((namespace_metrics.get(ns, (0.0, 0.0))[1] / TOTAL_MINUTES_PER_DAY) * 100 for ns in namespaces) / len(namespaces) | |
| ) if namespaces else 0.0 | |
| avg_medium_percent = 100 - avg_low_percent - avg_high_percent | |
| message += '### 📈 Tổng Hợp\n\n' | |
| message += '| Mức độ | Tổng thời gian | Trung bình % |\n' | |
| message += '|--------|----------------|--------------|\n' | |
| message += f'| 🟡 Thấp | {format_hours(total_low)} | {avg_low_percent:.1f}% |\n' | |
| message += f'| 🟢 Trung bình | {format_hours(total_medium)} | {avg_medium_percent:.1f}% |\n' | |
| message += f'| 🔴 Cao | {format_hours(total_high)} | {avg_high_percent:.1f}% |\n\n' | |
| return message | |
| @dataclass | |
| class Config: | |
| """Configuration class""" | |
| prometheus_url: str = os.getenv('PROMETHEUS_URL', 'http://10.24.10.14:30005') | |
| netchat_url: str = os.getenv('NETCHAT_URL', 'http://10.255.62.213/netchat-netmind/send-message') | |
| netchat_api_key: str = os.getenv('NETCHAT_API_KEY', 'NOmTvIZOVsLIAfdlYkAKfMoyQEyjxDao') | |
| netchat_channel_id: str = os.getenv('NETCHAT_CHANNEL_ID', 'i6shkznnstnjfmh9ga9s4cf75o') | |
| netchat_service_name: str = os.getenv('NETCHAT_SERVICE_NAME', 'GPU-Monitoring') | |
| netchat_sender_name: str = os.getenv('NETCHAT_SENDER_NAME', 'system') | |
| # Scheduling | |
| schedule_time: str = os.getenv('SCHEDULE_TIME', '08:00') # Default 8:00 AM | |
| # Retry settings | |
| max_retries: int = int(os.getenv('MAX_RETRIES', '3')) | |
| retry_delay: int = int(os.getenv('RETRY_DELAY', '5')) # seconds | |
| # Debug mode - single option for immediate debugging | |
| debug_immediate: bool = os.getenv('DEBUG_IMMEDIATE', 'false').lower() == 'true' | |
| # Namespace filter | |
| namespace_filter: Optional[str] = os.getenv('NAMESPACE_FILTER') # If None, get all namespaces | |
| class PrometheusClient: | |
| """Prometheus API client for GPU metrics""" | |
| def __init__(self, base_url: str, namespace_filter: Optional[str] = None): | |
| self.base_url = base_url.rstrip('/') | |
| self.session = requests.Session() | |
| self.session.timeout = 30 | |
| self.namespace_filter = namespace_filter | |
| def query_gpu_quota_and_utilization(self) -> List[Dict[str, Any]]: | |
| """Query GPU quota and utilization metrics from Prometheus""" | |
| # 1. Get GPU quota metrics | |
| quota_query = 'process_nvidia_gpu_requests_Count' | |
| # Add namespace filter to query if specified | |
| if self.namespace_filter: | |
| quota_query += f'{{pod_namespace="{self.namespace_filter}"}}' | |
| logger.info(f"Filtering by namespace: {self.namespace_filter}") | |
| quota_url = f"{self.base_url}/api/v1/query?query={quota_query}" | |
| try: | |
| logger.info(f"Querying GPU quota from: {quota_url}") | |
| quota_response = self.session.get(quota_url) | |
| quota_response.raise_for_status() | |
| quota_metrics = quota_response.json().get('data', {}).get('result', []) | |
| if not quota_metrics: | |
| logger.warning("No GPU quota metrics found") | |
| return [] | |
| logger.info(f"Found {len(quota_metrics)} GPU quota metrics") | |
| # 2. Group by Pod and collect hostnames | |
| pod_map = {} | |
| unique_hostnames = set() | |
| for metric in quota_metrics: | |
| pod_name = metric.get('metric', {}).get('pod_name', 'unknown') | |
| namespace = metric.get('metric', {}).get('pod_namespace', 'unknown') | |
| hostname = metric.get('metric', {}).get('hostname', 'unknown') | |
| quota = float(metric.get('value', [0, '0'])[1]) | |
| # Skip if namespace filter is set and doesn't match | |
| if self.namespace_filter and namespace != self.namespace_filter: | |
| continue | |
| key = f"{namespace}/{pod_name}" | |
| if key not in pod_map: | |
| pod_map[key] = { | |
| 'pod_name': pod_name, | |
| 'namespace': namespace, | |
| 'gpu_quota': 0, | |
| 'hostname': hostname | |
| } | |
| unique_hostnames.add(hostname) | |
| pod_map[key]['gpu_quota'] += quota | |
| # 3. Get utilization metrics for all hostnames | |
| hostname_list = list(unique_hostnames) | |
| if not hostname_list: | |
| logger.warning("No hostnames found for utilization query") | |
| return list(pod_map.values()) | |
| utilization_data = self._query_utilization_by_hostnames(hostname_list) | |
| # 4. Map utilization back to pods | |
| result = [] | |
| for pod_info in pod_map.values(): | |
| hostname = pod_info['hostname'] | |
| util_data = utilization_data.get(hostname, { | |
| 'gpu_utilization': 0, | |
| 'gpu_vram': 0 | |
| }) | |
| result.append({ | |
| 'pod_name': pod_info['pod_name'], | |
| 'namespace': pod_info['namespace'], | |
| 'gpu_quota': pod_info['gpu_quota'], | |
| 'gpu_utilization': util_data['gpu_utilization'], | |
| 'gpu_vram': util_data['gpu_vram'], | |
| 'hostname': hostname | |
| }) | |
| return result | |
| except Exception as e: | |
| logger.error(f"Error querying GPU metrics: {str(e)}") | |
| raise | |
| def _query_utilization_by_hostnames(self, hostnames: List[str]) -> Dict[str, Dict[str, Any]]: | |
| """Query GPU utilization and VRAM for multiple hostnames""" | |
| hostname_regex = '|'.join(hostnames) | |
| # Use your advanced time aggregation queries | |
| vram_query = f"avg(max_over_time(quantile_over_time(0.95, gpu_memory_percent_Percentage{{hostname=~\"{hostname_regex}\"}}[1h])[1d:])) by (hostname)" | |
| utilization_query = f"avg(max_over_time(quantile_over_time(0.95, gpu_utilization_Percentage{{hostname=~\"{hostname_regex}\"}}[1h])[1d:])) by (hostname)" | |
| result = {} | |
| # Query VRAM metrics | |
| vram_url = f"{self.base_url}/api/v1/query?query={vram_query}" | |
| try: | |
| vram_response = self.session.get(vram_url) | |
| vram_response.raise_for_status() | |
| vram_metrics = vram_response.json().get('data', {}).get('result', []) | |
| for metric in vram_metrics: | |
| hostname = metric.get('metric', {}).get('hostname') | |
| vram_value = float(metric.get('value', [0, '0'])[1]) | |
| if hostname not in result: | |
| result[hostname] = {'gpu_utilization': 0, 'gpu_vram': 0} | |
| result[hostname]['gpu_vram'] = vram_value | |
| except Exception as e: | |
| logger.error(f"Error querying VRAM metrics: {str(e)}") | |
| # Query utilization metrics | |
| utilization_url = f"{self.base_url}/api/v1/query?query={utilization_query}" | |
| try: | |
| util_response = self.session.get(utilization_url) | |
| util_response.raise_for_status() | |
| util_metrics = util_response.json().get('data', {}).get('result', []) | |
| for metric in util_metrics: | |
| hostname = metric.get('metric', {}).get('hostname') | |
| util_value = float(metric.get('value', [0, '0'])[1]) | |
| if hostname not in result: | |
| result[hostname] = {'gpu_utilization': 0, 'gpu_vram': 0} | |
| result[hostname]['gpu_utilization'] = util_value | |
| except Exception as e: | |
| logger.error(f"Error querying utilization metrics: {str(e)}") | |
| return result | |
| class NetchatNotifier: | |
| """Netchat API client for sending notifications""" | |
| def __init__(self, config: Config): | |
| self.config = config | |
| self.session = requests.Session() | |
| self.session.timeout = 30 | |
| self.session.trust_env = False # Ignore proxy environment variables | |
| self.session.headers.update({ | |
| 'Cache-Control': 'no-cache', | |
| 'Content-Type': 'application/json', | |
| 'x-api-key': config.netchat_api_key | |
| }) | |
| def send_notification(self, message: str) -> bool: | |
| """Send notification to Netchat with retry logic""" | |
| payload = { | |
| 'msg': message, | |
| 'channel_id': self.config.netchat_channel_id, | |
| 'service_name': self.config.netchat_service_name, | |
| 'sender_name': self.config.netchat_sender_name | |
| } | |
| logger.info(f"Sending notification to Netchat: {self.config.netchat_url}") | |
| logger.debug(f"Message preview: {message[:100]}...") | |
| last_error = None | |
| for attempt in range(1, self.config.max_retries + 1): | |
| try: | |
| response = self.session.post(self.config.netchat_url, json=payload) | |
| response.raise_for_status() | |
| logger.info(f"✅ Notification sent successfully on attempt {attempt}") | |
| return True | |
| except Exception as e: | |
| last_error = str(e) | |
| logger.warning(f"❌ Attempt {attempt}/{self.config.max_retries} failed: {last_error}") | |
| if attempt < self.config.max_retries: | |
| logger.info(f"Retrying in {self.config.retry_delay} seconds...") | |
| time.sleep(self.config.retry_delay) | |
| logger.error(f"❌ Failed to send notification after {self.config.max_retries} attempts") | |
| return False | |
| class GPUMessageFormatter: | |
| """Format GPU metrics into readable messages""" | |
| @staticmethod | |
| def format_gpu_report(pod_data: List[Dict[str, Any]]) -> str: | |
| """Format GPU data into a readable message""" | |
| if not pod_data: | |
| return "⚠️ **Không có dữ liệu GPU**\n\nKhông tìm thấy GPU metrics từ Prometheus." | |
| # Group by namespace | |
| namespace_groups = {} | |
| for pod in pod_data: | |
| namespace = pod['namespace'] | |
| if namespace not in namespace_groups: | |
| namespace_groups[namespace] = [] | |
| namespace_groups[namespace].append(pod) | |
| message = f"### 🎮 GPU Usage Report\n\n" | |
| message += f"> *Dữ liệu được tổng hợp trong 24h với 95th percentile aggregation*\n" | |
| message += f"> *Thời gian: {datetime.now().strftime('%H:%M:%S %d/%m/%Y')}*\n" | |
| # Add namespace filter info if specified | |
| if pod_data and len(set(pod['namespace'] for pod in pod_data)) == 1: | |
| single_namespace = pod_data[0]['namespace'] | |
| message += f"> *Namespace Filter: `{single_namespace}`*\n\n" | |
| else: | |
| message += "\n" | |
| # Format each namespace | |
| for namespace, pods in namespace_groups.items(): | |
| message += f"#### 🏷️ Namespace: `{namespace}`\n\n" | |
| for i, pod in enumerate(pods, 1): | |
| utilization = pod['gpu_utilization'] | |
| vram = pod['gpu_vram'] | |
| quota = pod['gpu_quota'] | |
| # Status emoji | |
| if utilization >= 80: | |
| emoji = '🔴' # High | |
| elif utilization >= 50: | |
| emoji = '🟡' # Medium | |
| elif utilization >= 20: | |
| emoji = '🟢' # Low | |
| else: | |
| emoji = '⚪' # Idle | |
| message += f"{emoji} **{i}. `{pod['pod_name']}`**\n" | |
| message += f" - GPU Utilization: **{utilization:.1f}%**\n" | |
| message += f" - VRAM: **{vram:.1f}%**\n" | |
| message += f" - GPU Quota: **{quota}** GPU(s)\n\n" | |
| message += "---\n\n" | |
| # Summary statistics | |
| total_quota = sum(pod['gpu_quota'] for pod in pod_data) | |
| avg_utilization = sum(pod['gpu_utilization'] for pod in pod_data) / len(pod_data) | |
| max_utilization = max(pod['gpu_utilization'] for pod in pod_data) | |
| min_utilization = min(pod['gpu_utilization'] for pod in pod_data) | |
| avg_vram = sum(pod['gpu_vram'] for pod in pod_data) / len(pod_data) | |
| max_vram = max(pod['gpu_vram'] for pod in pod_data) | |
| min_vram = min(pod['gpu_vram'] for pod in pod_data) | |
| message += "### 📈 Tổng Hợp\n\n" | |
| message += "**GPU Quota:**\n" | |
| message += f"- 💾 Tổng GPU Quota: **{total_quota}** GPU(s)\n\n" | |
| message += "**GPU Utilization:**\n" | |
| message += f"- 📊 Trung bình: **{avg_utilization:.1f}%**\n" | |
| message += f"- 🔴 Cao nhất: **{max_utilization:.1f}%**\n" | |
| message += f"- 🟢 Thấp nhất: **{min_utilization:.1f}%**\n\n" | |
| message += "**GPU VRAM:**\n" | |
| message += f"- 📊 Trung bình: **{avg_vram:.1f}%**\n" | |
| message += f"- 🔴 Cao nhất: **{max_vram:.1f}%**\n" | |
| message += f"- 🟢 Thấp nhất: **{min_vram:.1f}%**\n\n" | |
| message += "**Thông tin khác:**\n" | |
| message += f"- 🎯 Tổng số Namespace: **{len(namespace_groups)}**\n" | |
| message += f"- 📦 Tổng số Pod: **{len(pod_data)}**\n" | |
| message += f"- ⏱️ Time Range: **95th percentile over 1h windows, averaged over 24h**\n" | |
| return message | |
| class GPUMonitor: | |
| """Main GPU monitoring service""" | |
| def __init__(self): | |
| self.config = Config() | |
| self.prometheus_client = PrometheusClient(self.config.prometheus_url, self.config.namespace_filter) | |
| self.netchat_notifier = NetchatNotifier(self.config) | |
| self.formatter = GPUMessageFormatter() | |
| # Validate configuration | |
| self._validate_config() | |
| def _validate_config(self): | |
| """Validate required configuration""" | |
| if not self.config.prometheus_url: | |
| raise ValueError("PROMETHEUS_URL is required") | |
| if not self.config.netchat_url: | |
| raise ValueError("NETCHAT_URL is required") | |
| if not self.config.netchat_api_key: | |
| raise ValueError("NETCHAT_API_KEY is required") | |
| if not self.config.netchat_channel_id: | |
| raise ValueError("NETCHAT_CHANNEL_ID is required") | |
| logger.info("✅ Configuration validation passed") | |
| def run_monitoring(self) -> bool: | |
| """Run the complete monitoring cycle""" | |
| try: | |
| logger.info("🚀 Starting GPU monitoring cycle...") | |
| # 1. Query GPU metrics | |
| logger.info("📊 Querying GPU metrics from Prometheus...") | |
| pod_data = self.prometheus_client.query_gpu_quota_and_utilization() | |
| if not pod_data: | |
| logger.warning("⚠️ No GPU data found, sending warning notification") | |
| warning_msg = "⚠️ **Không có dữ liệu GPU Quota**\n\nKhông tìm thấy metrics process_nvidia_gpu_requests_Count từ Prometheus." | |
| return self.netchat_notifier.send_notification(warning_msg) | |
| logger.info(f"✅ Retrieved data for {len(pod_data)} GPU pods") | |
| # 2. Format message | |
| logger.info("📝 Formatting GPU report message...") | |
| message = self.formatter.format_gpu_report(pod_data) | |
| # 3. Send notification | |
| logger.info("📤 Sending notification to Netchat...") | |
| success = self.netchat_notifier.send_notification(message) | |
| if success: | |
| logger.info("✅ GPU monitoring cycle completed successfully") | |
| return True | |
| else: | |
| logger.error("❌ GPU monitoring cycle failed") | |
| return False | |
| except Exception as e: | |
| logger.error(f"❌ Error in monitoring cycle: {str(e)}") | |
| # Send error notification | |
| try: | |
| error_msg = f"❌ **Lỗi GPU Monitor Cronjob**\n\nLỗi: {str(e)}\n\nThời gian: {datetime.now().strftime('%H:%M:%S %d/%m/%Y')}" | |
| return self.netchat_notifier.send_notification(error_msg) | |
| except Exception as e2: | |
| logger.error(f"❌ Failed to send error notification: {str(e2)}") | |
| return False | |
| def scheduled_job(): | |
| """Function to be called by scheduler""" | |
| logger.info("🕐 [Cronjob] Starting scheduled GPU monitoring...") | |
| monitor = GPUMonitor() | |
| success = monitor.run_monitoring() | |
| if success: | |
| logger.info("✅ [Cronjob] Scheduled job completed successfully") | |
| else: | |
| logger.error("❌ [Cronjob] Scheduled job failed") | |
| def main(): | |
| """Main function""" | |
| try: | |
| # Load environment variables | |
| load_dotenv() | |
| # Print configuration | |
| config = Config() | |
| logger.info("🎮 GPU Monitor Service Started") | |
| logger.info(f"📡 Prometheus URL: {config.prometheus_url}") | |
| logger.info(f"💬 Netchat URL: {config.netchat_url}") | |
| logger.info(f"⏰ Schedule time: {config.schedule_time}") | |
| logger.info(f"🔄 Max retries: {config.max_retries}") | |
| logger.info(f"⏱️ Retry delay: {config.retry_delay}s") | |
| # Debug mode handling | |
| if config.debug_immediate: | |
| logger.info("🐛 DEBUG IMMEDIATE MODE - Send notification now, skip scheduling") | |
| logger.info("-" * 50) | |
| # Set logging level to DEBUG for verbose output | |
| logging.getLogger().setLevel(logging.DEBUG) | |
| # Run monitoring immediately | |
| monitor = GPUMonitor() | |
| monitor.run_monitoring() | |
| logger.info("🚀 Debug mode: Monitor running in foreground (Ctrl+C to stop)") | |
| try: | |
| while True: | |
| time.sleep(300) # Check every 5 minutes | |
| except KeyboardInterrupt: | |
| pass | |
| else: | |
| logger.info("-" * 50) | |
| logger.info(f"⏰ Setting up daily schedule at {config.schedule_time}") | |
| # Setup normal scheduling | |
| schedule.every().day.at(config.schedule_time).do(scheduled_job) | |
| logger.info("✅ Scheduler configured. Waiting for next scheduled run...") | |
| logger.info("Press Ctrl+C to stop") | |
| # Keep the script running | |
| while True: | |
| schedule.run_pending() | |
| time.sleep(60) # Check every minute | |
| except KeyboardInterrupt: | |
| logger.info("👋 Stopping GPU Monitor Service") | |
| except Exception as e: | |
| logger.error(f"❌ Fatal error: {str(e)}") | |
| return 1 | |
| return 0 | |
| if __name__ == "__main__": | |
| exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment