Skip to content

Instantly share code, notes, and snippets.

@h4x3rotab
Created August 9, 2025 06:16
Show Gist options
  • Select an option

  • Save h4x3rotab/c09cb53cf82d86394ee27c5d7e707b6a to your computer and use it in GitHub Desktop.

Select an option

Save h4x3rotab/c09cb53cf82d86394ee27c5d7e707b6a to your computer and use it in GitHub Desktop.
Containerd Unpack Benchmark - Python Version with CPU/IO Monitoring
#!/usr/bin/env python3
"""
Containerd unpack benchmark script with CPU and I/O monitoring.
Requirements: containerd (ctr), taskset, /usr/bin/time, Python 3.6+
Usage:
sudo python3 unpack_bench.py --image docker.io/library/ubuntu:22.04 --cpus 2 --iters 5
"""
import argparse
import subprocess
import time
import json
import os
import sys
import threading
from pathlib import Path
from typing import List, Dict, Optional, Tuple
class PerformanceMonitor:
def __init__(self, namespace: str, iteration: int):
self.namespace = namespace
self.iteration = iteration
self.monitoring = False
self.data = []
self.ctr_pid = None
def find_ctr_process(self) -> Optional[int]:
"""Find the PID of the ctr process."""
try:
# Try exact pattern first
result = subprocess.run(
["pgrep", "-f", f"ctr -n {self.namespace} image mount"],
capture_output=True, text=True, timeout=5
)
if result.stdout.strip():
pid = int(result.stdout.strip().split('\n')[0])
return pid
# Fallback: try broader pattern
result = subprocess.run(
["pgrep", "-f", f"ctr.*{self.namespace}.*image.*mount"],
capture_output=True, text=True, timeout=5
)
if result.stdout.strip():
pid = int(result.stdout.strip().split('\n')[0])
return pid
except (subprocess.TimeoutExpired, ValueError, IndexError):
pass
return None
def get_instantaneous_cpu_usage(self, pid: int) -> float:
"""Get instantaneous CPU usage like htop using /proc/stat approach."""
if not pid:
return 0.0
try:
# Read process stat file
with open(f'/proc/{pid}/stat', 'r') as f:
stat_line = f.read().strip()
# Parse the stat file (space-separated values)
# We need fields 13, 14, 15, 16 (utime, stime, cutime, cstime)
fields = stat_line.split()
if len(fields) < 17:
return 0.0
utime = int(fields[13]) # user CPU time
stime = int(fields[14]) # system CPU time
cutime = int(fields[15]) # children user CPU time
cstime = int(fields[16]) # children system CPU time
total_time = utime + stime + cutime + cstime
# Get system uptime and CPU clock ticks per second
with open('/proc/uptime', 'r') as f:
uptime = float(f.read().split()[0])
# Get number of CPU cores
with open('/proc/cpuinfo', 'r') as f:
cpu_count = len([line for line in f.readlines() if line.startswith('processor')])
# Calculate CPU usage percentage
# This gives cumulative usage, but for real-time we need to sample twice
hertz = os.sysconf(os.sysconf_names['SC_CLK_TCK']) # Usually 100
seconds = uptime - (total_time / hertz)
# For instantaneous measurement, we need to sample twice
# Let's use a simpler approach with top command instead
return self.get_top_style_cpu(pid)
except (FileNotFoundError, ValueError, KeyError):
return self.get_top_style_cpu(pid)
def get_top_style_cpu(self, pid: int) -> float:
"""Get CPU usage using top command (like htop)."""
try:
# Use top in batch mode for one iteration
result = subprocess.run(
["top", "-b", "-n1", "-p", str(pid)],
capture_output=True, text=True, timeout=3
)
if result.returncode == 0 and result.stdout:
lines = result.stdout.strip().split('\n')
for line in lines:
if str(pid) in line and not line.startswith('PID'):
# Parse top output
fields = line.split()
if len(fields) >= 9:
try:
cpu_percent = float(fields[8]) # %CPU column
return cpu_percent
except (ValueError, IndexError):
continue
return 0.0
except (subprocess.TimeoutExpired, FileNotFoundError):
return 0.0
def get_fast_cpu_usage(self, pid: int) -> float:
"""Get CPU usage using fast ps command."""
try:
# Use ps which is much faster than top
result = subprocess.run(
["ps", "-p", str(pid), "-o", "pcpu="],
capture_output=True, text=True, timeout=1
)
if result.returncode == 0 and result.stdout.strip():
return float(result.stdout.strip())
return 0.0
except (subprocess.TimeoutExpired, FileNotFoundError, ValueError):
return 0.0
def get_all_containerd_processes_cpu(self) -> float:
"""Get CPU usage of all containerd-related processes during unpack."""
try:
# Find all processes that might be involved in containerd operations
patterns = [
"containerd",
"runc",
"ctr",
"containerd-shim"
]
total_cpu = 0.0
found_processes = []
for pattern in patterns:
try:
result = subprocess.run(
["pgrep", "-f", pattern],
capture_output=True, text=True, timeout=3
)
if result.stdout.strip():
pids = [int(pid.strip()) for pid in result.stdout.strip().split('\n') if pid.strip()]
for pid in pids:
# Use instantaneous CPU measurement like before
cpu = self.get_instantaneous_cpu_usage(pid)
if cpu > 0.1: # Only count processes with meaningful CPU usage
total_cpu += cpu
found_processes.append((pid, pattern, cpu))
except (subprocess.TimeoutExpired, ValueError):
continue
# Debug: print found processes (remove this later)
# if found_processes:
# print(f" Found processes: {found_processes}")
return total_cpu
except Exception:
return 0.0
def get_process_tree_pids(self, root_pid: int) -> List[int]:
"""Get all PIDs in the process tree rooted at root_pid."""
pids = [root_pid]
try:
# Find all child processes recursively
result = subprocess.run(
["pgrep", "-P", str(root_pid)],
capture_output=True, text=True, timeout=2
)
if result.stdout.strip():
child_pids = [int(pid.strip()) for pid in result.stdout.strip().split('\n')]
for child_pid in child_pids:
pids.extend(self.get_process_tree_pids(child_pid))
except (subprocess.TimeoutExpired, ValueError):
pass
return pids
def get_process_tree_cpu(self, root_pid: int) -> float:
"""Get CPU usage for entire process tree using pstree + ps approach."""
if not root_pid:
return 0.0
try:
# Get all PIDs in the process tree using pstree
result = subprocess.run(
["pstree", "-p", str(root_pid)],
capture_output=True, text=True, timeout=3
)
if result.returncode != 0:
return 0.0
# Extract PIDs from pstree output using regex
import re
pids = re.findall(r'\((\d+)\)', result.stdout)
pids = [int(pid) for pid in pids if pid.isdigit()]
# Get CPU usage for all PIDs in the tree
total_cpu = 0.0
for pid in pids:
try:
ps_result = subprocess.run(
["ps", "-p", str(pid), "-o", "%cpu="],
capture_output=True, text=True, timeout=2
)
if ps_result.returncode == 0 and ps_result.stdout.strip():
cpu = float(ps_result.stdout.strip())
total_cpu += cpu
except (subprocess.TimeoutExpired, ValueError):
continue
return total_cpu
except (subprocess.TimeoutExpired, FileNotFoundError, ImportError):
# Fallback to single process monitoring
return self.get_single_process_cpu(root_pid)
def get_single_process_cpu(self, pid: int) -> float:
"""Fallback: Get CPU usage for single process only."""
try:
result = subprocess.run(
["ps", "-p", str(pid), "-o", "%cpu="],
capture_output=True, text=True, timeout=2
)
return float(result.stdout.strip()) if result.stdout.strip() else 0.0
except (subprocess.TimeoutExpired, ValueError):
return 0.0
def get_system_cpu_usage(self) -> float:
"""Get total system CPU usage from all processes."""
try:
result = subprocess.run(
["ps", "aux"], capture_output=True, text=True, timeout=5
)
total_cpu = 0.0
for line in result.stdout.split('\n')[1:]: # Skip header
fields = line.split()
if len(fields) >= 11: # Ensure we have enough fields
try:
total_cpu += float(fields[2]) # %CPU is field 3 (0-indexed: 2)
except ValueError:
continue
return total_cpu
except (subprocess.TimeoutExpired, ValueError):
return 0.0
def get_process_io(self, pid: int) -> Tuple[int, int]:
"""Get I/O stats for a process from /proc/PID/io."""
try:
with open(f"/proc/{pid}/io", "r") as f:
content = f.read()
read_bytes = write_bytes = 0
for line in content.split('\n'):
if line.startswith('read_bytes:'):
read_bytes = int(line.split()[1])
elif line.startswith('write_bytes:'):
write_bytes = int(line.split()[1])
# Debug: print I/O values for troubleshooting
# if read_bytes > 0 or write_bytes > 0:
# print(f" PID {pid} I/O: read={read_bytes}, write={write_bytes}")
return read_bytes, write_bytes
except (FileNotFoundError, ValueError, PermissionError):
return 0, 0
def get_all_containerd_processes_io(self) -> Tuple[int, int]:
"""Get I/O stats for all containerd processes."""
try:
# Find all processes that might be involved in containerd operations
patterns = [
"containerd",
"runc",
"ctr",
"containerd-shim"
]
total_read = total_write = 0
seen_pids = set() # Avoid duplicate counting
for pattern in patterns:
try:
result = subprocess.run(
["pgrep", "-f", pattern],
capture_output=True, text=True, timeout=3
)
if result.stdout.strip():
pids = [int(pid.strip()) for pid in result.stdout.strip().split('\n') if pid.strip()]
for pid in pids:
if pid not in seen_pids: # Only count each PID once
seen_pids.add(pid)
read_bytes, write_bytes = self.get_process_io(pid)
total_read += read_bytes
total_write += write_bytes
except (subprocess.TimeoutExpired, ValueError):
continue
return total_read, total_write
except Exception:
return 0, 0
def get_system_io(self) -> Tuple[int, int]:
"""Get system I/O stats from /proc/diskstats."""
try:
with open("/proc/diskstats", "r") as f:
content = f.read()
total_read_sectors = total_write_sectors = 0
for line in content.split('\n'):
if line.strip():
fields = line.split()
if len(fields) >= 14: # Ensure complete diskstats entry
try:
total_read_sectors += int(fields[5]) # sectors read
total_write_sectors += int(fields[9]) # sectors written
except ValueError:
continue
# Convert sectors to KB (sectors are usually 512 bytes)
return (total_read_sectors * 512 // 1024,
total_write_sectors * 512 // 1024)
except (FileNotFoundError, ValueError):
return 0, 0
def monitor_performance(self):
"""Monitor performance in a separate thread."""
print(f" Monitor thread started for namespace {self.namespace}")
# Wait for ctr process to start - be more aggressive
for i in range(200): # 20 seconds max wait, checking every 50ms
self.ctr_pid = self.find_ctr_process()
if self.ctr_pid:
print(f" Found ctr PID {self.ctr_pid} after {i*0.05:.2f}s")
break
time.sleep(0.05) # Check more frequently
if not self.ctr_pid:
print(" WARNING: Never found ctr process")
sample_count = 0
print(f" Starting monitoring loop, monitoring={self.monitoring}")
while self.monitoring:
try:
timestamp = time.time()
# Get CPU stats - monitor all containerd-related processes
ctr_cpu = self.get_all_containerd_processes_cpu()
sys_cpu = self.get_system_cpu_usage()
# Get I/O stats - monitor all containerd processes
ctr_read, ctr_write = self.get_all_containerd_processes_io()
sys_read_kb, sys_write_kb = self.get_system_io()
# Store data
self.data.append({
'timestamp': timestamp,
'ctr_pid': self.ctr_pid,
'ctr_cpu_percent': ctr_cpu,
'system_cpu_percent': sys_cpu,
'ctr_read_bytes': ctr_read,
'ctr_write_bytes': ctr_write,
'system_read_kb': sys_read_kb,
'system_write_kb': sys_write_kb
})
sample_count += 1
time.sleep(0.05) # 50ms sampling interval
except Exception as e:
print(f" Error in monitoring loop: {e}")
break
print(f" Monitoring stopped after {sample_count} samples")
def start_monitoring(self):
"""Start performance monitoring in background."""
self.monitoring = True
self.monitor_thread = threading.Thread(target=self.monitor_performance)
self.monitor_thread.daemon = True
self.monitor_thread.start()
def stop_monitoring(self):
"""Stop performance monitoring."""
print(f" stop_monitoring() called, monitoring={self.monitoring}")
self.monitoring = False
if hasattr(self, 'monitor_thread'):
self.monitor_thread.join(timeout=1)
def save_data(self, filename: str):
"""Save monitoring data to JSON file."""
with open(filename, 'w') as f:
json.dump(self.data, f, indent=2)
def get_stats(self) -> Dict:
"""Calculate statistics from collected data."""
if not self.data:
return {"error": "No data collected"}
ctr_cpu_values = [d['ctr_cpu_percent'] for d in self.data]
sys_cpu_values = [d['system_cpu_percent'] for d in self.data]
# Calculate I/O rates (bytes/second and KB/s)
io_rates = []
for i in range(1, len(self.data)):
prev = self.data[i-1]
curr = self.data[i]
time_diff = curr['timestamp'] - prev['timestamp']
if time_diff > 0:
ctr_read_rate = (curr['ctr_read_bytes'] - prev['ctr_read_bytes']) / time_diff / 1024
ctr_write_rate = (curr['ctr_write_bytes'] - prev['ctr_write_bytes']) / time_diff / 1024
sys_read_rate = (curr['system_read_kb'] - prev['system_read_kb']) / time_diff
sys_write_rate = (curr['system_write_kb'] - prev['system_write_kb']) / time_diff
io_rates.append({
'ctr_read_kb_per_s': max(0, ctr_read_rate),
'ctr_write_kb_per_s': max(0, ctr_write_rate),
'sys_read_kb_per_s': max(0, sys_read_rate),
'sys_write_kb_per_s': max(0, sys_write_rate)
})
stats = {
'pid': self.ctr_pid,
'samples': len(self.data),
'duration_seconds': self.data[-1]['timestamp'] - self.data[0]['timestamp'] if self.data else 0,
'cpu_stats': {
'ctr_process': {
'min': min(ctr_cpu_values),
'max': max(ctr_cpu_values),
'avg': sum(ctr_cpu_values) / len(ctr_cpu_values)
},
'system_total': {
'min': min(sys_cpu_values),
'max': max(sys_cpu_values),
'avg': sum(sys_cpu_values) / len(sys_cpu_values)
}
}
}
# Add I/O statistics - if we have rates, show rates; otherwise show raw values
if io_rates:
ctr_read_rates = [r['ctr_read_kb_per_s'] for r in io_rates]
ctr_write_rates = [r['ctr_write_kb_per_s'] for r in io_rates]
sys_read_rates = [r['sys_read_kb_per_s'] for r in io_rates]
sys_write_rates = [r['sys_write_kb_per_s'] for r in io_rates]
stats['io_stats'] = {
'ctr_process': {
'read_kb_per_s': {
'min': min(ctr_read_rates),
'max': max(ctr_read_rates),
'avg': sum(ctr_read_rates) / len(ctr_read_rates)
},
'write_kb_per_s': {
'min': min(ctr_write_rates),
'max': max(ctr_write_rates),
'avg': sum(ctr_write_rates) / len(ctr_write_rates)
}
},
'system_total': {
'read_kb_per_s': {
'min': min(sys_read_rates),
'max': max(sys_read_rates),
'avg': sum(sys_read_rates) / len(sys_read_rates)
},
'write_kb_per_s': {
'min': min(sys_write_rates),
'max': max(sys_write_rates),
'avg': sum(sys_write_rates) / len(sys_write_rates)
}
}
}
else:
# With only 1 sample, show raw I/O values instead of rates
if self.data:
last_sample = self.data[-1]
stats['io_stats'] = {
'ctr_process': {
'total_read_mb': last_sample['ctr_read_bytes'] / (1024 * 1024),
'total_write_mb': last_sample['ctr_write_bytes'] / (1024 * 1024)
},
'system_total': {
'total_read_kb': last_sample['system_read_kb'],
'total_write_kb': last_sample['system_write_kb']
}
}
return stats
class ContainerdBenchmark:
def __init__(self):
self.results = []
def check_dependencies(self):
"""Check if required tools are available."""
deps = ['ctr', 'taskset']
missing = []
for dep in deps:
result = subprocess.run(['which', dep], capture_output=True)
if result.returncode != 0:
missing.append(dep)
if not os.path.exists('/usr/bin/time'):
missing.append('/usr/bin/time')
if missing:
print(f"Missing dependencies: {', '.join(missing)}")
sys.exit(1)
# Check for root privileges
if os.geteuid() != 0:
print("Error: This script must be run with sudo or as root")
print(f"Usage: sudo python3 {sys.argv[0]} [options]")
sys.exit(1)
def create_namespace(self, namespace: str):
"""Create containerd namespace (idempotent)."""
subprocess.run(['ctr', 'ns', 'create', namespace],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
def pull_image(self, namespace: str, image: str):
"""Pull image into containerd content store."""
print("== Pulling image into content store (no unpack yet) ==")
result = subprocess.run(['ctr', '-n', namespace, 'images', 'pull', image])
if result.returncode != 0:
print(f"Failed to pull image {image}")
sys.exit(1)
def cleanup_snapshots(self, namespace: str):
"""Clean up existing snapshots in namespace."""
result = subprocess.run(
['ctr', '-n', namespace, 'snapshots', 'ls'],
capture_output=True, text=True
)
if result.returncode == 0 and result.stdout.strip():
# Parse snapshot list and remove each one
lines = result.stdout.strip().split('\n')[1:] # Skip header
for line in lines:
if line.strip():
snapshot_name = line.split()[0]
subprocess.run(
['ctr', '-n', namespace, 'snapshots', 'rm', snapshot_name],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
)
def run_benchmark_iteration(self, namespace: str, image: str, cpus: int,
iteration: int, drop_caches: bool) -> Tuple[float, Dict]:
"""Run a single benchmark iteration."""
print(f"--- Iter {iteration} ---")
# Clean up snapshots
self.cleanup_snapshots(namespace)
# Drop caches if requested
if drop_caches:
subprocess.run(['sync'])
with open('/proc/sys/vm/drop_caches', 'w') as f:
f.write('3')
# Build command
mount_target = f"/tmp/containerd_mount_{iteration}"
os.makedirs(mount_target, exist_ok=True)
cmd = ['/usr/bin/time', '-f', '%e']
if cpus > 0:
cpu_list = ','.join(str(i) for i in range(cpus))
cmd.extend(['taskset', '-c', cpu_list])
cmd.extend(['ctr', '-n', namespace, 'image', 'mount', image, mount_target])
# Start performance monitoring JUST before running the command
monitor = PerformanceMonitor(namespace, iteration)
monitor.start_monitoring()
# Give monitoring thread a moment to initialize
time.sleep(0.1)
# Run the benchmark
start_time = time.time()
result = subprocess.run(cmd, capture_output=True, text=True)
end_time = time.time()
# Keep monitoring for additional time to capture post-operation activity
time.sleep(1.0)
# Stop monitoring
monitor.stop_monitoring()
# Clean up mount
subprocess.run(['ctr', '-n', namespace, 'image', 'unmount', mount_target],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
try:
os.rmdir(mount_target)
except OSError:
pass
# Get elapsed time
if result.returncode == 0:
try:
elapsed_time = float(result.stderr.strip())
except ValueError:
elapsed_time = end_time - start_time
else:
print(f"Benchmark failed with return code {result.returncode}")
print(f"stderr: {result.stderr}")
elapsed_time = -1
# Save monitoring data
perf_filename = f"performance_iter_{iteration}.json"
monitor.save_data(perf_filename)
# Get performance stats
perf_stats = monitor.get_stats()
print(f"elapsed_s={elapsed_time:.2f}")
return elapsed_time, perf_stats
def run_benchmark(self, image: str, cpus: int, iterations: int,
namespace: str, drop_caches: bool):
"""Run the complete benchmark."""
print("== Settings ==")
print(f"Image: {image}")
print(f"Namespace: {namespace}")
if cpus > 0:
cpu_list = ','.join(str(i) for i in range(cpus))
print(f"CPU cores: {cpu_list} (count={cpus})")
else:
print("CPU cores: unrestricted")
print(f"Iterations: {iterations}")
print(f"Drop caches: {drop_caches}")
print()
# Setup
self.create_namespace(namespace)
self.pull_image(namespace, image)
# Run iterations
times = []
all_stats = []
for i in range(1, iterations + 1):
elapsed_time, perf_stats = self.run_benchmark_iteration(
namespace, image, cpus, i, drop_caches
)
if elapsed_time > 0:
times.append(elapsed_time)
all_stats.append(perf_stats)
# Calculate average
if times:
avg_time = sum(times) / len(times)
else:
avg_time = 0
# Display results
print()
print("== Results ==")
for i, t in enumerate(times, 1):
print(f"iter_{i:02d}: {t:.2f}s")
print(f"average: {avg_time:.3f}s")
print()
print("== Performance Statistics ==")
for i, stats in enumerate(all_stats, 1):
if 'error' in stats:
print(f"Iter {i}: {stats['error']}")
else:
pid = stats.get('pid', 'N/A')
samples = stats.get('samples', 0)
duration = stats.get('duration_seconds', 0)
cpu_stats = stats.get('cpu_stats', {})
ctr_cpu = cpu_stats.get('ctr_process', {})
sys_cpu = cpu_stats.get('system_total', {})
print(f"Iter {i}: PID={pid} | {samples} samples over {duration:.1f}s")
print(f" CPU: ctr={ctr_cpu.get('avg', 0):.1f}% sys={sys_cpu.get('avg', 0):.1f}%")
io_stats = stats.get('io_stats')
if io_stats:
ctr_io = io_stats.get('ctr_process', {})
sys_io = io_stats.get('system_total', {})
# Check if we have rate data or total data
if 'read_kb_per_s' in ctr_io:
# Rate-based data (multiple samples)
ctr_read_avg = ctr_io.get('read_kb_per_s', {}).get('avg', 0)
ctr_write_avg = ctr_io.get('write_kb_per_s', {}).get('avg', 0)
sys_read_avg = sys_io.get('read_kb_per_s', {}).get('avg', 0)
sys_write_avg = sys_io.get('write_kb_per_s', {}).get('avg', 0)
print(f" I/O: ctr_read={ctr_read_avg:.1f}KB/s ctr_write={ctr_write_avg:.1f}KB/s")
print(f" sys_read={sys_read_avg:.1f}KB/s sys_write={sys_write_avg:.1f}KB/s")
else:
# Total-based data (single sample)
ctr_read_mb = ctr_io.get('total_read_mb', 0)
ctr_write_mb = ctr_io.get('total_write_mb', 0)
sys_read_kb = sys_io.get('total_read_kb', 0)
sys_write_kb = sys_io.get('total_write_kb', 0)
print(f" I/O: ctr_read={ctr_read_mb:.1f}MB ctr_write={ctr_write_mb:.1f}MB")
print(f" sys_read={sys_read_kb/1024:.1f}MB sys_write={sys_write_kb/1024:.1f}MB")
print()
print("Performance data saved to: performance_iter_*.json")
def main():
parser = argparse.ArgumentParser(
description="Containerd unpack benchmark with CPU and I/O monitoring"
)
parser.add_argument('--image', default='docker.io/library/alpine:latest',
help='Container image to benchmark (default: alpine:latest)')
parser.add_argument('--cpus', type=int, default=0,
help='Number of CPU cores to use (0 = unrestricted, default: 0)')
parser.add_argument('--iters', type=int, default=3,
help='Number of iterations (default: 3)')
parser.add_argument('--ns', default='unpack-bench',
help='Containerd namespace (default: unpack-bench)')
parser.add_argument('--drop-caches', action='store_true',
help='Drop Linux FS caches before each run (requires root)')
args = parser.parse_args()
benchmark = ContainerdBenchmark()
benchmark.check_dependencies()
benchmark.run_benchmark(
image=args.image,
cpus=args.cpus,
iterations=args.iters,
namespace=args.ns,
drop_caches=args.drop_caches
)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment