Skip to content

Instantly share code, notes, and snippets.

@UBarney
Created June 19, 2025 13:16
Show Gist options
  • Save UBarney/12c122f30789d74d569da0b951ca4692 to your computer and use it in GitHub Desktop.
Save UBarney/12c122f30789d74d569da0b951ca4692 to your computer and use it in GitHub Desktop.
mem_usage.py
#!/usr/bin/env python3
import subprocess
import sys
import os
import time
import threading
from typing import List, Tuple
def get_peak_memory_usage(cmd: List[str]) -> Tuple[float, int]:
"""
Execute a command and monitor its peak memory usage using /usr/bin/time.
Returns (peak memory usage in MB, return code).
"""
try:
# Use /usr/bin/time to measure memory usage
# -f '%M' gives maximum resident set size in KB
time_cmd = ['/usr/bin/time', '-f', '%M'] + cmd
# Execute the command
result = subprocess.run(
time_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
if result.returncode != 0:
print(f"Warning: Command failed with return code {result.returncode}")
print(f"Command: {' '.join(cmd)}")
print(f"Error: {result.stderr}")
# Parse memory usage from stderr (time command outputs to stderr)
stderr_lines = result.stderr.strip().split('\n')
memory_kb = 0
for line in stderr_lines:
try:
# The last line should contain the memory usage in KB
memory_kb = int(line.strip())
break
except ValueError:
continue
# Convert KB to MB
return memory_kb / 1024.0, result.returncode
except FileNotFoundError:
print("Error: /usr/bin/time not found. Falling back to basic measurement.")
return get_memory_usage_fallback(cmd)
except Exception as e:
print(f"Error executing command {' '.join(cmd)}: {e}")
return 0, -1
def get_memory_usage_fallback(cmd: List[str]) -> Tuple[float, int]:
"""
Fallback method using /proc/pid/status when /usr/bin/time is not available.
Returns (peak memory usage in MB, return code).
"""
peak_memory = 0
process = None
def monitor_memory():
nonlocal peak_memory, process
try:
while process and process.poll() is None:
try:
# Read memory info from /proc/pid/status
with open(f'/proc/{process.pid}/status', 'r') as f:
for line in f:
if line.startswith('VmRSS:'):
# Extract memory in KB
memory_kb = int(line.split()[1])
memory_mb = memory_kb / 1024.0
peak_memory = max(peak_memory, memory_mb)
break
except (FileNotFoundError, ValueError, IndexError):
pass
time.sleep(0.01) # Check every 10ms
except Exception:
pass
try:
# Start the process
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
# Start memory monitoring in a separate thread
monitor_thread = threading.Thread(target=monitor_memory)
monitor_thread.daemon = True
monitor_thread.start()
# Wait for process to complete
stdout, stderr = process.communicate()
# Wait a bit for the monitor thread to finish
monitor_thread.join(timeout=0.1)
if process.returncode != 0:
print(f"Warning: Command failed with return code {process.returncode}")
print(f"Command: {' '.join(cmd)}")
print(f"Error: {stderr}")
return peak_memory, process.returncode
except Exception as e:
print(f"Error executing command {' '.join(cmd)}: {e}")
return 0, -1
def read_sql_file(filepath: str) -> List[str]:
"""
Read SQL file and split by newlines, filtering out empty lines.
"""
try:
with open(filepath, 'r', encoding='utf-8') as f:
content = f.read()
# Split by newlines and filter out empty lines
sqls = [sql.strip() for sql in content.split('\n') if sql.strip()]
return sqls
except Exception as e:
print(f"Error reading SQL file {filepath}: {e}")
sys.exit(1)
def format_memory(memory_mb: float) -> str:
"""
Format memory usage in a human-readable way.
"""
if memory_mb < 1:
return f"{memory_mb * 1024:.1f} KB"
elif memory_mb < 1024:
return f"{memory_mb:.1f} MB"
else:
return f"{memory_mb / 1024:.2f} GB"
def format_memory_with_status(memory_mb: float, return_code: int) -> str:
"""
Format memory usage with status indication.
"""
memory_str = format_memory(memory_mb)
if return_code != 0:
return f"{memory_str} ❌ (exit code: {return_code})"
return memory_str
def calculate_improvement(base_memory: float, opt_memory: float, base_code: int, opt_code: int) -> str:
"""
Calculate improvement percentage, considering process exit codes.
"""
if base_code != 0 or opt_code != 0:
return "N/A (process failed)"
if base_memory == 0:
return "N/A"
improvement = ((base_memory - opt_memory) / base_memory) * 100
if improvement > 0:
return f"↓ {improvement:.1f}%"
elif improvement < 0:
return f"↑ {abs(improvement):.1f}%"
else:
return "0%"
def check_time_command():
"""
Check if /usr/bin/time is available and working.
"""
try:
result = subprocess.run(['/usr/bin/time', '--version'],
capture_output=True, text=True)
return result.returncode == 0
except FileNotFoundError:
return False
def main():
if len(sys.argv) != 4:
print("Usage: ./mem_usage.py base_bin opt_bin bench_sql.sql")
print(" base_bin: Path to baseline binary")
print(" opt_bin: Path to optimized binary")
print(" bench_sql.sql: SQL file with queries separated by newlines")
sys.exit(1)
base_bin = sys.argv[1]
opt_bin = sys.argv[2]
sql_file = sys.argv[3]
# Validate input files
if not os.path.isfile(base_bin):
print(f"Error: Base binary '{base_bin}' not found")
sys.exit(1)
if not os.path.isfile(opt_bin):
print(f"Error: Optimized binary '{opt_bin}' not found")
sys.exit(1)
if not os.path.isfile(sql_file):
print(f"Error: SQL file '{sql_file}' not found")
sys.exit(1)
# Make binaries executable if needed
try:
os.chmod(base_bin, 0o755)
os.chmod(opt_bin, 0o755)
except OSError:
pass # Ignore if we can't change permissions
# Check if we have /usr/bin/time available
has_time_cmd = check_time_command()
if not has_time_cmd:
print("Warning: /usr/bin/time not available, using fallback method")
print("Note: Fallback method may be less accurate\n")
# Read SQL queries
sqls = read_sql_file(sql_file)
if not sqls:
print("Error: No SQL queries found in the file")
sys.exit(1)
print(f"Found {len(sqls)} SQL queries to benchmark")
print("Starting memory usage comparison...\n")
results = []
for i, sql in enumerate(sqls, 1):
print(f"Processing query {i}/{len(sqls)}: {sql[:50]}{'...' if len(sql) > 50 else ''}")
# Prepare commands
base_cmd = [base_bin, '--maxrows', '1', '-c', sql]
opt_cmd = [opt_bin, '--maxrows', '1', '-c', sql]
# Measure memory usage for base binary
print(f" Running base binary...")
base_memory, base_code = get_peak_memory_usage(base_cmd)
# Measure memory usage for optimized binary
print(f" Running optimized binary...")
opt_memory, opt_code = get_peak_memory_usage(opt_cmd)
# Calculate improvement
improvement = calculate_improvement(base_memory, opt_memory, base_code, opt_code)
results.append({
'sql': sql,
'base_memory': base_memory,
'opt_memory': opt_memory,
'base_code': base_code,
'opt_code': opt_code,
'improvement': improvement
})
base_status = "✓" if base_code == 0 else f"❌ (exit {base_code})"
opt_status = "✓" if opt_code == 0 else f"❌ (exit {opt_code})"
print(f" Base: {format_memory(base_memory)} {base_status}")
print(f" Opt: {format_memory(opt_memory)} {opt_status}")
print(f" Change: {improvement}\n")
# Generate markdown table
base_bin_name = os.path.basename(base_bin)
opt_bin_name = os.path.basename(opt_bin)
print("# Memory Usage Comparison Results\n")
print(f"| SQL Query | {base_bin_name} Memory | {opt_bin_name} Memory | Improvement |")
print("|-----------|" + "-" * (len(base_bin_name) + 7) + "|" + "-" * (len(opt_bin_name) + 7) + "|-------------|")
for result in results:
# Display full SQL query, escape pipe characters
sql_display = result['sql'].replace('|', '\\|')
base_mem_str = format_memory_with_status(result['base_memory'], result['base_code'])
opt_mem_str = format_memory_with_status(result['opt_memory'], result['opt_code'])
print(f"| {sql_display} | {base_mem_str} | {opt_mem_str} | {result['improvement']} |")
# Summary statistics
successful_results = [r for r in results if r['base_code'] == 0 and r['opt_code'] == 0]
failed_count = len(results) - len(successful_results)
if successful_results:
total_base = sum(r['base_memory'] for r in successful_results)
total_opt = sum(r['opt_memory'] for r in successful_results)
overall_improvement = calculate_improvement(total_base, total_opt, 0, 0)
else:
total_base = total_opt = 0
overall_improvement = "N/A (no successful runs)"
print(f"\n## Summary")
print(f"- Baseline binary: {base_bin_name}")
print(f"- Optimized binary: {opt_bin_name}")
print(f"- Total queries: {len(results)}")
print(f"- Successful runs: {len(successful_results)}")
if failed_count > 0:
print(f"- Failed runs: {failed_count}")
if successful_results:
print(f"- Total base memory (successful): {format_memory(total_base)}")
print(f"- Total optimized memory (successful): {format_memory(total_opt)}")
print(f"- Overall improvement: {overall_improvement}")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment