Skip to content

Instantly share code, notes, and snippets.

@thuanpham582002
Last active November 23, 2025 06:18
Show Gist options
  • Select an option

  • Save thuanpham582002/50eed425bf2fc5c745bb50c73ca5297a to your computer and use it in GitHub Desktop.

Select an option

Save thuanpham582002/50eed425bf2fc5c745bb50c73ca5297a to your computer and use it in GitHub Desktop.
version: '3.8'
services:
gpu-monitor:
build: .
container_name: gpu-monitor
restart: unless-stopped
environment:
# Prometheus Configuration
- PROMETHEUS_URL=http://10.24.10.14:30005
# Netchat API Configuration
- NETCHAT_URL=http://10.255.62.213/netchat-netmind/send-message
- NETCHAT_API_KEY=NOmTvIZOVsLIAfdlYkAKfMoyQEyjxDao
- NETCHAT_CHANNEL_ID=i6shkznnstnjfmh9ga9s4cf75o
- NETCHAT_SERVICE_NAME=GPU-Monitoring
- NETCHAT_SENDER=system
# Scheduling Configuration
- SCHEDULE_TIME=08:00
# Retry Configuration
- MAX_RETRIES=3
- RETRY_DELAY=5
# Debug Configuration - single option for immediate debugging
- DEBUG_IMMEDIATE=false
# Namespace Filter - optional filter for specific namespace
- NAMESPACE_FILTER=
volumes:
- ./code:/app/code
- ./logs:/app/logs
- ./data:/app/data
networks:
- monitoring-network
healthcheck:
test: ["CMD", "python", "/app/code/test_gpu_monitor.py"]
interval: 30s
timeout: 10s
retries: 3
start_period: 40s
# Optional: Add a simple log viewer
log-viewer:
image: alpine:latest
container_name: gpu-monitor-logs
command: tail -f /app/logs/gpu_monitor.log
volumes:
- ./logs:/app/logs
depends_on:
- gpu-monitor
networks:
- monitoring-network
networks:
monitoring-network:
driver: bridge
ipam:
config:
- subnet: 172.20.0.0/16
volumes:
logs:
driver: local
data:
driver: local
#!/usr/bin/env python3
"""
GPU Monitoring & Netchat Notification Service
Standalone Python script to:
1. Query GPU metrics from Prometheus
2. Send formatted notifications to Netchat API
3. Run on scheduled intervals using cron
Requirements:
pip install requests schedule python-dotenv
"""
import os
import requests
import json
import logging
import time
import schedule
from datetime import datetime
from typing import Optional, Dict, List, Any
from dataclasses import dataclass
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('gpu_monitor.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
@dataclass
class Config:
"""Configuration class"""
prometheus_url: str = os.getenv('PROMETHEUS_URL', 'http://10.24.10.14:30005')
netchat_url: str = os.getenv('NETCHAT_URL', 'http://10.255.62.213/netchat-netmind/send-message')
netchat_api_key: str = os.getenv('NETCHAT_API_KEY', 'NOmTvIZOVsLIAfdlYkAKfMoyQEyjxDao')
netchat_channel_id: str = os.getenv('NETCHAT_CHANNEL_ID', 'i6shkznnstnjfmh9ga9s4cf75o')
netchat_service_name: str = os.getenv('NETCHAT_SERVICE_NAME', 'GPU-Monitoring')
netchat_sender_name: str = os.getenv('NETCHAT_SENDER_NAME', 'system')
# Scheduling
schedule_time: str = os.getenv('SCHEDULE_TIME', '08:00') # Default 8:00 AM
# Retry settings
max_retries: int = int(os.getenv('MAX_RETRIES', '3'))
retry_delay: int = int(os.getenv('RETRY_DELAY', '5')) # seconds
# Debug mode - single option for immediate debugging
debug_immediate: bool = os.getenv('DEBUG_IMMEDIATE', 'false').lower() == 'true'
# Namespace filter
namespace_filter: Optional[str] = os.getenv('NAMESPACE_FILTER') # If None, get all namespaces
class PrometheusClient:
"""Prometheus API client for GPU metrics"""
def __init__(self, base_url: str, namespace_filter: Optional[str] = None):
self.base_url = base_url.rstrip('/')
self.session = requests.Session()
self.session.timeout = 30
self.namespace_filter = namespace_filter
def query_gpu_quota_and_utilization(self) -> List[Dict[str, Any]]:
"""Query GPU quota and utilization metrics from Prometheus"""
# 1. Get GPU quota metrics
quota_query = 'process_nvidia_gpu_requests_Count'
# Add namespace filter to query if specified
if self.namespace_filter:
quota_query += f'{{pod_namespace="{self.namespace_filter}"}}'
logger.info(f"Filtering by namespace: {self.namespace_filter}")
quota_url = f"{self.base_url}/api/v1/query?query={quota_query}"
try:
logger.info(f"Querying GPU quota from: {quota_url}")
quota_response = self.session.get(quota_url)
quota_response.raise_for_status()
quota_metrics = quota_response.json().get('data', {}).get('result', [])
if not quota_metrics:
logger.warning("No GPU quota metrics found")
return []
logger.info(f"Found {len(quota_metrics)} GPU quota metrics")
# 2. Group by Pod and collect hostnames
pod_map = {}
unique_hostnames = set()
for metric in quota_metrics:
pod_name = metric.get('metric', {}).get('pod_name', 'unknown')
namespace = metric.get('metric', {}).get('pod_namespace', 'unknown')
hostname = metric.get('metric', {}).get('hostname', 'unknown')
quota = float(metric.get('value', [0, '0'])[1])
# Skip if namespace filter is set and doesn't match
if self.namespace_filter and namespace != self.namespace_filter:
continue
key = f"{namespace}/{pod_name}"
if key not in pod_map:
pod_map[key] = {
'pod_name': pod_name,
'namespace': namespace,
'gpu_quota': 0,
'hostname': hostname
}
unique_hostnames.add(hostname)
pod_map[key]['gpu_quota'] += quota
# 3. Get utilization metrics for all hostnames
hostname_list = list(unique_hostnames)
if not hostname_list:
logger.warning("No hostnames found for utilization query")
return list(pod_map.values())
utilization_data = self._query_utilization_by_hostnames(hostname_list)
# 4. Map utilization back to pods
result = []
for pod_info in pod_map.values():
hostname = pod_info['hostname']
util_data = utilization_data.get(hostname, {
'gpu_utilization': 0,
'gpu_vram': 0
})
result.append({
'pod_name': pod_info['pod_name'],
'namespace': pod_info['namespace'],
'gpu_quota': pod_info['gpu_quota'],
'gpu_utilization': util_data['gpu_utilization'],
'gpu_vram': util_data['gpu_vram'],
'hostname': hostname
})
return result
except Exception as e:
logger.error(f"Error querying GPU metrics: {str(e)}")
raise
def _query_utilization_by_hostnames(self, hostnames: List[str]) -> Dict[str, Dict[str, Any]]:
"""Query GPU utilization and VRAM for multiple hostnames"""
hostname_regex = '|'.join(hostnames)
# Use your advanced time aggregation queries
vram_query = f"avg(max_over_time(quantile_over_time(0.95, gpu_memory_percent_Percentage{{hostname=~\"{hostname_regex}\"}}[1h])[1d:])) by (hostname)"
utilization_query = f"avg(max_over_time(quantile_over_time(0.95, gpu_utilization_Percentage{{hostname=~\"{hostname_regex}\"}}[1h])[1d:])) by (hostname)"
result = {}
# Query VRAM metrics
vram_url = f"{self.base_url}/api/v1/query?query={vram_query}"
try:
vram_response = self.session.get(vram_url)
vram_response.raise_for_status()
vram_metrics = vram_response.json().get('data', {}).get('result', [])
for metric in vram_metrics:
hostname = metric.get('metric', {}).get('hostname')
vram_value = float(metric.get('value', [0, '0'])[1])
if hostname not in result:
result[hostname] = {'gpu_utilization': 0, 'gpu_vram': 0}
result[hostname]['gpu_vram'] = vram_value
except Exception as e:
logger.error(f"Error querying VRAM metrics: {str(e)}")
# Query utilization metrics
utilization_url = f"{self.base_url}/api/v1/query?query={utilization_query}"
try:
util_response = self.session.get(utilization_url)
util_response.raise_for_status()
util_metrics = util_response.json().get('data', {}).get('result', [])
for metric in util_metrics:
hostname = metric.get('metric', {}).get('hostname')
util_value = float(metric.get('value', [0, '0'])[1])
if hostname not in result:
result[hostname] = {'gpu_utilization': 0, 'gpu_vram': 0}
result[hostname]['gpu_utilization'] = util_value
except Exception as e:
logger.error(f"Error querying utilization metrics: {str(e)}")
return result
class NetchatNotifier:
"""Netchat API client for sending notifications"""
def __init__(self, config: Config):
self.config = config
self.session = requests.Session()
self.session.timeout = 30
self.session.headers.update({
'Cache-Control': 'no-cache',
'Content-Type': 'application/json',
'x-api-key': config.netchat_api_key
})
def send_notification(self, message: str) -> bool:
"""Send notification to Netchat with retry logic"""
payload = {
'msg': message,
'channel_id': self.config.netchat_channel_id,
'service_name': self.config.netchat_service_name,
'sender_name': self.config.netchat_sender_name
}
logger.info(f"Sending notification to Netchat: {self.config.netchat_url}")
logger.debug(f"Message preview: {message[:100]}...")
last_error = None
for attempt in range(1, self.config.max_retries + 1):
try:
response = self.session.post(self.config.netchat_url, json=payload)
response.raise_for_status()
logger.info(f"✅ Notification sent successfully on attempt {attempt}")
return True
except Exception as e:
last_error = str(e)
logger.warning(f"❌ Attempt {attempt}/{self.config.max_retries} failed: {last_error}")
if attempt < self.config.max_retries:
logger.info(f"Retrying in {self.config.retry_delay} seconds...")
time.sleep(self.config.retry_delay)
logger.error(f"❌ Failed to send notification after {self.config.max_retries} attempts")
return False
class GPUMessageFormatter:
"""Format GPU metrics into readable messages"""
@staticmethod
def format_gpu_report(pod_data: List[Dict[str, Any]]) -> str:
"""Format GPU data into a readable message"""
if not pod_data:
return "⚠️ **Không có dữ liệu GPU**\n\nKhông tìm thấy GPU metrics từ Prometheus."
# Group by namespace
namespace_groups = {}
for pod in pod_data:
namespace = pod['namespace']
if namespace not in namespace_groups:
namespace_groups[namespace] = []
namespace_groups[namespace].append(pod)
message = f"### 🎮 GPU Usage Report\n\n"
message += f"> *Dữ liệu được tổng hợp trong 24h với 95th percentile aggregation*\n"
message += f"> *Thời gian: {datetime.now().strftime('%H:%M:%S %d/%m/%Y')}*\n"
# Add namespace filter info if specified
if pod_data and len(set(pod['namespace'] for pod in pod_data)) == 1:
single_namespace = pod_data[0]['namespace']
message += f"> *Namespace Filter: `{single_namespace}`*\n\n"
else:
message += "\n"
# Format each namespace
for namespace, pods in namespace_groups.items():
message += f"#### 🏷️ Namespace: `{namespace}`\n\n"
for i, pod in enumerate(pods, 1):
utilization = pod['gpu_utilization']
vram = pod['gpu_vram']
quota = pod['gpu_quota']
# Status emoji
if utilization >= 80:
emoji = '🔴' # High
elif utilization >= 50:
emoji = '🟡' # Medium
elif utilization >= 20:
emoji = '🟢' # Low
else:
emoji = '⚪' # Idle
message += f"{emoji} **{i}. `{pod['pod_name']}`**\n"
message += f" - GPU Utilization: **{utilization:.1f}%**\n"
message += f" - VRAM: **{vram:.1f}%**\n"
message += f" - GPU Quota: **{quota}** GPU(s)\n\n"
message += "---\n\n"
# Summary statistics
total_quota = sum(pod['gpu_quota'] for pod in pod_data)
avg_utilization = sum(pod['gpu_utilization'] for pod in pod_data) / len(pod_data)
max_utilization = max(pod['gpu_utilization'] for pod in pod_data)
min_utilization = min(pod['gpu_utilization'] for pod in pod_data)
avg_vram = sum(pod['gpu_vram'] for pod in pod_data) / len(pod_data)
max_vram = max(pod['gpu_vram'] for pod in pod_data)
min_vram = min(pod['gpu_vram'] for pod in pod_data)
message += "### 📈 Tổng Hợp\n\n"
message += "**GPU Quota:**\n"
message += f"- 💾 Tổng GPU Quota: **{total_quota}** GPU(s)\n\n"
message += "**GPU Utilization:**\n"
message += f"- 📊 Trung bình: **{avg_utilization:.1f}%**\n"
message += f"- 🔴 Cao nhất: **{max_utilization:.1f}%**\n"
message += f"- 🟢 Thấp nhất: **{min_utilization:.1f}%**\n\n"
message += "**GPU VRAM:**\n"
message += f"- 📊 Trung bình: **{avg_vram:.1f}%**\n"
message += f"- 🔴 Cao nhất: **{max_vram:.1f}%**\n"
message += f"- 🟢 Thấp nhất: **{min_vram:.1f}%**\n\n"
message += "**Thông tin khác:**\n"
message += f"- 🎯 Tổng số Namespace: **{len(namespace_groups)}**\n"
message += f"- 📦 Tổng số Pod: **{len(pod_data)}**\n"
message += f"- ⏱️ Time Range: **95th percentile over 1h windows, averaged over 24h**\n"
return message
class GPUMonitor:
"""Main GPU monitoring service"""
def __init__(self):
self.config = Config()
self.prometheus_client = PrometheusClient(self.config.prometheus_url, self.config.namespace_filter)
self.netchat_notifier = NetchatNotifier(self.config)
self.formatter = GPUMessageFormatter()
# Validate configuration
self._validate_config()
def _validate_config(self):
"""Validate required configuration"""
if not self.config.prometheus_url:
raise ValueError("PROMETHEUS_URL is required")
if not self.config.netchat_url:
raise ValueError("NETCHAT_URL is required")
if not self.config.netchat_api_key:
raise ValueError("NETCHAT_API_KEY is required")
if not self.config.netchat_channel_id:
raise ValueError("NETCHAT_CHANNEL_ID is required")
logger.info("✅ Configuration validation passed")
def run_monitoring(self) -> bool:
"""Run the complete monitoring cycle"""
try:
logger.info("🚀 Starting GPU monitoring cycle...")
# 1. Query GPU metrics
logger.info("📊 Querying GPU metrics from Prometheus...")
pod_data = self.prometheus_client.query_gpu_quota_and_utilization()
if not pod_data:
logger.warning("⚠️ No GPU data found, sending warning notification")
warning_msg = "⚠️ **Không có dữ liệu GPU Quota**\n\nKhông tìm thấy metrics process_nvidia_gpu_requests_Count từ Prometheus."
return self.netchat_notifier.send_notification(warning_msg)
logger.info(f"✅ Retrieved data for {len(pod_data)} GPU pods")
# 2. Format message
logger.info("📝 Formatting GPU report message...")
message = self.formatter.format_gpu_report(pod_data)
# 3. Send notification
logger.info("📤 Sending notification to Netchat...")
success = self.netchat_notifier.send_notification(message)
if success:
logger.info("✅ GPU monitoring cycle completed successfully")
return True
else:
logger.error("❌ GPU monitoring cycle failed")
return False
except Exception as e:
logger.error(f"❌ Error in monitoring cycle: {str(e)}")
# Send error notification
try:
error_msg = f"❌ **Lỗi GPU Monitor Cronjob**\n\nLỗi: {str(e)}\n\nThời gian: {datetime.now().strftime('%H:%M:%S %d/%m/%Y')}"
return self.netchat_notifier.send_notification(error_msg)
except Exception as e2:
logger.error(f"❌ Failed to send error notification: {str(e2)}")
return False
def scheduled_job():
"""Function to be called by scheduler"""
logger.info("🕐 [Cronjob] Starting scheduled GPU monitoring...")
monitor = GPUMonitor()
success = monitor.run_monitoring()
if success:
logger.info("✅ [Cronjob] Scheduled job completed successfully")
else:
logger.error("❌ [Cronjob] Scheduled job failed")
def main():
"""Main function"""
try:
# Load environment variables
load_dotenv()
# Print configuration
config = Config()
logger.info("🎮 GPU Monitor Service Started")
logger.info(f"📡 Prometheus URL: {config.prometheus_url}")
logger.info(f"💬 Netchat URL: {config.netchat_url}")
logger.info(f"⏰ Schedule time: {config.schedule_time}")
logger.info(f"🔄 Max retries: {config.max_retries}")
logger.info(f"⏱️ Retry delay: {config.retry_delay}s")
# Debug mode handling
if config.debug_immediate:
logger.info("🐛 DEBUG IMMEDIATE MODE - Send notification now, skip scheduling")
logger.info("-" * 50)
# Set logging level to DEBUG for verbose output
logging.getLogger().setLevel(logging.DEBUG)
# Run monitoring immediately
monitor = GPUMonitor()
monitor.run_monitoring()
logger.info("🚀 Debug mode: Monitor running in foreground (Ctrl+C to stop)")
try:
while True:
time.sleep(300) # Check every 5 minutes
except KeyboardInterrupt:
pass
else:
logger.info("-" * 50)
logger.info(f"⏰ Setting up daily schedule at {config.schedule_time}")
# Setup normal scheduling
schedule.every().day.at(config.schedule_time).do(scheduled_job)
logger.info("✅ Scheduler configured. Waiting for next scheduled run...")
logger.info("Press Ctrl+C to stop")
# Keep the script running
while True:
schedule.run_pending()
time.sleep(60) # Check every minute
except KeyboardInterrupt:
logger.info("👋 Stopping GPU Monitor Service")
except Exception as e:
logger.error(f"❌ Fatal error: {str(e)}")
return 1
return 0
if __name__ == "__main__":
exit(main())
version: '3.8'
services:
nginx-proxy:
image: nginx:alpine
container_name: netchat-proxy
restart: unless-stopped
ports:
- "80:80" # HTTP
- "443:443" # HTTPS (nếu cần)
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf:ro
- ./logs:/var/log/nginx
environment:
- NGINX_HOST=0.0.0.0
networks:
- default
networks:
default:
driver: bridge
events {
worker_connections 1024;
}
http {
include /etc/nginx/mime.types;
default_type application/octet-stream;
# Log format
log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for"';
access_log /var/log/nginx/access.log main;
error_log /var/log/nginx/error.log warn;
# Basic settings
sendfile on;
tcp_nopush on;
tcp_nodelay on;
keepalive_timeout 65;
types_hash_max_size 2048;
# Proxy settings
proxy_connect_timeout 30s;
proxy_send_timeout 30s;
proxy_read_timeout 30s;
# Gzip compression
gzip on;
gzip_vary on;
gzip_min_length 1024;
gzip_types text/plain text/css application/json application/javascript text/xml application/xml application/xml+rss text/javascript;
# Rate limiting (optional - prevent abuse)
limit_req_zone $binary_remote_addr zone=netchat:10m rate=10r/s;
server {
listen 80;
server_name _;
# Health check endpoint
location /health {
access_log off;
return 200 "healthy\n";
add_header Content-Type text/plain;
}
# Netchat API proxy
location /netchat-netmind/ {
# Rate limiting
limit_req zone=netchat burst=20 nodelay;
# Proxy to actual Netchat API (Máy C)
proxy_pass http://10.255.62.213/netchat-netmind/;
# Headers forwarding
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# Handle CORS if needed
proxy_set_header Access-Control-Allow-Origin *;
proxy_set_header Access-Control-Allow-Methods "GET, POST, PUT, DELETE, OPTIONS";
proxy_set_header Access-Control-Allow-Headers "DNT,User-Agent,X-Requested-With,If-Modified-Since,Cache-Control,Content-Type,Range,x-api-key";
# JSON content type
proxy_set_header Content-Type application/json;
# Timeout settings
proxy_connect_timeout 10s;
proxy_send_timeout 10s;
proxy_read_timeout 10s;
# Buffer settings
proxy_buffering on;
proxy_buffer_size 4k;
proxy_buffers 8 4k;
}
# Default location
location / {
return 404;
}
# Security headers
add_header X-Frame-Options DENY;
add_header X-Content-Type-Options nosniff;
add_header X-XSS-Protection "1; mode=block";
}
# Optional: HTTPS config (nếu cần)
# server {
# listen 443 ssl http2;
# server_name _;
#
# ssl_certificate /etc/nginx/ssl/cert.pem;
# ssl_certificate_key /etc/nginx/ssl/key.pem;
#
# location /netchat-netmind/ {
# proxy_pass http://10.255.62.213/netchat-netmind/;
# # ... same proxy settings as above
# }
# }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment