|
#!/usr/bin/env python3 |
|
""" |
|
A Python script to benchmark the zstd command-line tool. |
|
|
|
This script iterates through various zstd compression levels (1-19) and long-mode settings |
|
to measure compression/decompression performance, effectiveness, and peak memory usage. |
|
|
|
Input: A file path provided as a command-line argument. |
|
Output: An SQLite database file containing the benchmark results. |
|
|
|
Dependencies: |
|
- The `zstd` command-line tool must be installed and available in the system's PATH. |
|
- The `psutil` Python library: `pip install psutil` |
|
|
|
Usage: |
|
python benchmark_zstd_mem.py /path/to/your/file.dat |
|
python benchmark_zstd_mem.py /path/to/your/file.dat -o custom_results.db |
|
""" |
|
|
|
import sys |
|
import subprocess |
|
import os |
|
import time |
|
import sqlite3 |
|
import argparse |
|
import shutil |
|
import random |
|
import itertools |
|
from typing import Tuple, List, Optional |
|
from threading import Thread |
|
from tqdm import tqdm |
|
|
|
# --- Configuration --- |
|
ZSTD_LEVELS = range(1, 20) |
|
BYTES_TO_MB = 1 / (1024 * 1024) |
|
|
|
# --- Dependency Checks --- |
|
try: |
|
import psutil |
|
except ImportError: |
|
print("Error: `psutil` library not found.", file=sys.stderr) |
|
print("Please install it to run this script: `pip install psutil`", file=sys.stderr) |
|
sys.exit(1) |
|
|
|
|
|
def check_zstd_availability(): |
|
"""Check if the 'zstd' command is available in the system's PATH.""" |
|
if not shutil.which("zstd"): |
|
print("Error: 'zstd' command not found in your PATH.", file=sys.stderr) |
|
print("Please install zstd to run this benchmark.", file=sys.stderr) |
|
sys.exit(1) |
|
|
|
|
|
# --- Core Functions --- |
|
def setup_database(db_path: str) -> Tuple[sqlite3.Connection, sqlite3.Cursor]: |
|
"""Creates and sets up the SQLite database and table.""" |
|
db_exists = os.path.exists(db_path) |
|
conn = sqlite3.connect(db_path) |
|
cursor = conn.cursor() |
|
|
|
if db_exists: |
|
cursor.execute( |
|
"SELECT name FROM sqlite_master WHERE type='table' AND name='benchmarks'" |
|
) |
|
if cursor.fetchone(): |
|
while True: |
|
choice = ( |
|
input( |
|
f"Database '{db_path}' already contains a 'benchmarks' table. Overwrite it? [y/N]: " |
|
) |
|
.lower() |
|
.strip() |
|
) |
|
if choice == "y": |
|
cursor.execute("DROP TABLE benchmarks") |
|
break |
|
elif choice in ("n", ""): |
|
print("Exiting without modifying the database.") |
|
conn.close() |
|
sys.exit(0) |
|
|
|
cursor.execute(""" |
|
CREATE TABLE benchmarks ( |
|
id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
level INTEGER NOT NULL, |
|
long_mode BOOLEAN NOT NULL, |
|
original_size_bytes INTEGER NOT NULL, |
|
compressed_size_bytes INTEGER NOT NULL, |
|
compression_ratio REAL NOT NULL, |
|
compression_time_sec REAL NOT NULL, |
|
decompression_time_sec REAL NOT NULL, |
|
compression_speed_mbps REAL NOT NULL, |
|
decompression_speed_mbps REAL NOT NULL, |
|
compression_peak_mem_mb REAL, |
|
decompression_peak_mem_mb REAL, |
|
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP |
|
) |
|
""") |
|
conn.commit() |
|
print(f"Database '{db_path}' is ready.") |
|
return conn, cursor |
|
|
|
|
|
def run_and_monitor_memory( |
|
cmd: List[str], |
|
) -> Tuple[float, Optional[int], Optional[bytes]]: |
|
""" |
|
Runs a command, measures its execution time, and monitors its peak memory usage. |
|
|
|
Returns: |
|
A tuple containing (execution_time, peak_memory_bytes, stderr). |
|
""" |
|
peak_mem_bytes = [0] # Use a list to be mutable inside the thread |
|
|
|
def monitor(process: psutil.Process): |
|
"""Polls process memory usage until it terminates.""" |
|
try: |
|
while process.is_running(): |
|
try: |
|
mem_info = process.memory_info() |
|
# RSS (Resident Set Size) is a good proxy for memory usage |
|
if mem_info.rss > peak_mem_bytes[0]: |
|
peak_mem_bytes[0] = mem_info.rss |
|
except (psutil.NoSuchProcess, psutil.AccessDenied): |
|
break # Process ended before we could read memory |
|
time.sleep(0.01) # Poll interval |
|
except Exception: |
|
# Broad exception to ensure thread doesn't die silently |
|
pass |
|
|
|
try: |
|
start_time = time.perf_counter() |
|
# Start the process without blocking |
|
proc = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE) |
|
|
|
# Start the memory monitoring thread |
|
ps_proc = psutil.Process(proc.pid) |
|
monitor_thread = Thread(target=monitor, args=(ps_proc,)) |
|
monitor_thread.start() |
|
|
|
# Wait for the process and the monitor to finish |
|
stderr_output = proc.communicate()[1] |
|
monitor_thread.join() |
|
end_time = time.perf_counter() |
|
|
|
if proc.returncode != 0: |
|
print(f"\nError executing command: {' '.join(cmd)}", file=sys.stderr) |
|
print(f"Stderr: {stderr_output.decode()}", file=sys.stderr) |
|
return (end_time - start_time, None, stderr_output) |
|
|
|
return (end_time - start_time, peak_mem_bytes[0], None) |
|
|
|
except FileNotFoundError: |
|
print(f"\nError: Command not found: {cmd[0]}", file=sys.stderr) |
|
return (0, None, b"Command not found") |
|
except (psutil.NoSuchProcess, psutil.AccessDenied): |
|
# Process might finish so fast that psutil can't attach. |
|
# In this case, we can't measure memory, but the timing is still valid. |
|
end_time = time.perf_counter() |
|
return (end_time - start_time, 0, None) |
|
|
|
|
|
def run_benchmark(input_file: str, level: int, use_long: bool) -> Optional[dict]: |
|
"""Runs a single compression/decompression cycle and returns the results.""" |
|
|
|
base_name = ( |
|
f"{os.path.basename(input_file)}.{level}.{'long' if use_long else 'nolong'}" |
|
) |
|
compressed_file = f"{base_name}.zst" |
|
decompressed_file = f"{base_name}.decomp" |
|
|
|
results = {} |
|
original_size = os.path.getsize(input_file) |
|
results["original_size_bytes"] = original_size |
|
|
|
# --- Compression --- |
|
comp_cmd = ["zstd", f"-{level}", "-f", "-o", compressed_file, input_file] |
|
if use_long: |
|
comp_cmd.insert(1, "--long") |
|
|
|
comp_time, comp_mem, comp_err = run_and_monitor_memory(comp_cmd) |
|
if comp_err is not None: |
|
return None |
|
|
|
results["compression_time_sec"] = comp_time |
|
results["compression_peak_mem_mb"] = ( |
|
comp_mem * BYTES_TO_MB if comp_mem is not None else None |
|
) |
|
|
|
try: |
|
results["compressed_size_bytes"] = os.path.getsize(compressed_file) |
|
except FileNotFoundError: |
|
print( |
|
f"\nError: Could not find '{compressed_file}'. Compression failed.", |
|
file=sys.stderr, |
|
) |
|
return None |
|
|
|
# --- Decompression --- |
|
decomp_cmd = ["zstd", "-d", "-f", "-o", decompressed_file, compressed_file] |
|
decomp_time, decomp_mem, decomp_err = run_and_monitor_memory(decomp_cmd) |
|
if decomp_err is not None: |
|
return None |
|
|
|
results["decompression_time_sec"] = decomp_time |
|
results["decompression_peak_mem_mb"] = ( |
|
decomp_mem * BYTES_TO_MB if decomp_mem is not None else None |
|
) |
|
|
|
# --- Verification & Cleanup --- |
|
try: |
|
decompressed_size = os.path.getsize(decompressed_file) |
|
if original_size != decompressed_size: |
|
print( |
|
f"\nCRITICAL: Size mismatch! Original={original_size}, Decompressed={decompressed_size}", |
|
file=sys.stderr, |
|
) |
|
|
|
# Calculate derived metrics |
|
if results["compression_time_sec"] > 0: |
|
results["compression_speed_mbps"] = (original_size * BYTES_TO_MB) / results[ |
|
"compression_time_sec" |
|
] |
|
else: |
|
results["compression_speed_mbps"] = float("inf") |
|
|
|
if results["decompression_time_sec"] > 0: |
|
results["decompression_speed_mbps"] = ( |
|
original_size * BYTES_TO_MB |
|
) / results["decompression_time_sec"] |
|
else: |
|
results["decompression_speed_mbps"] = float("inf") |
|
|
|
if results["compressed_size_bytes"] > 0: |
|
results["compression_ratio"] = ( |
|
original_size / results["compressed_size_bytes"] |
|
) |
|
else: |
|
results["compression_ratio"] = float("inf") |
|
|
|
finally: |
|
if os.path.exists(compressed_file): |
|
os.remove(compressed_file) |
|
if os.path.exists(decompressed_file): |
|
os.remove(decompressed_file) |
|
|
|
return results |
|
|
|
|
|
def main(): |
|
"""Main function to parse arguments and run the benchmark suite.""" |
|
parser = argparse.ArgumentParser( |
|
description="A Python script to benchmark zstd performance and memory usage.", |
|
formatter_class=argparse.RawTextHelpFormatter, |
|
epilog="Requires `psutil` and `tqdm` libraries: `pip install psutil tqdm`", |
|
) |
|
parser.add_argument("input_file", help="The input file to use for benchmarking.") |
|
parser.add_argument( |
|
"-o", |
|
"--output_db", |
|
default="zstd_benchmark_mem.db", |
|
help="Path to the output SQLite database file (default: zstd_benchmark_mem.db).", |
|
) |
|
args = parser.parse_args() |
|
|
|
# check_zstd_availability() # Assumed to be defined |
|
|
|
input_file = args.input_file |
|
if not os.path.isfile(input_file): |
|
print(f"Error: Input file not found at '{input_file}'", file=sys.stderr) |
|
sys.exit(1) |
|
|
|
conn, cursor = setup_database(args.output_db) |
|
|
|
# 1. Generate all benchmark configurations (level, use_long) |
|
long_modes = [False, True] |
|
benchmark_configs = list(itertools.product(ZSTD_LEVELS, long_modes)) |
|
|
|
# 2. Randomize the order of execution to get more accurate time estimates |
|
random.shuffle(benchmark_configs) |
|
|
|
print(f"Starting {len(benchmark_configs)} benchmark runs in random order...") |
|
|
|
try: |
|
# 3. Use tqdm to create a progress bar over the shuffled configurations |
|
for level, use_long in tqdm(benchmark_configs, desc="Running Benchmarks"): |
|
results = run_benchmark(input_file, level, use_long) |
|
|
|
if results: |
|
cursor.execute( |
|
""" |
|
INSERT INTO benchmarks ( |
|
level, long_mode, original_size_bytes, compressed_size_bytes, |
|
compression_ratio, compression_time_sec, decompression_time_sec, |
|
compression_speed_mbps, decompression_speed_mbps, |
|
compression_peak_mem_mb, decompression_peak_mem_mb |
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) |
|
""", |
|
( |
|
level, |
|
use_long, |
|
results["original_size_bytes"], |
|
results["compressed_size_bytes"], |
|
results["compression_ratio"], |
|
results["compression_time_sec"], |
|
results["decompression_time_sec"], |
|
results["compression_speed_mbps"], |
|
results["decompression_speed_mbps"], |
|
results["compression_peak_mem_mb"], |
|
results["decompression_peak_mem_mb"], |
|
), |
|
) |
|
conn.commit() |
|
|
|
except KeyboardInterrupt: |
|
print("\nBenchmark interrupted by user. Partial results are saved.") |
|
finally: |
|
conn.close() |
|
print("\nBenchmark finished.") |
|
print(f"Results have been saved to '{args.output_db}'") |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |