Last active
August 12, 2025 14:36
-
-
Save ArthurDelannoyazerty/ab5ebdff935e241a93dc8dbddb2de111 to your computer and use it in GitHub Desktop.
EASY TO USE. LONG FILE. EXAMPLE AT THE BOTTOM. A decorator that time functions, and have advanced features : options to print to the console, to a default file, to a custom file. Can dynamically group functions by a tag. Can display a complete timing summary in the console. Can create chart that compare functions or compare whole groups.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
``` | |
from . import timefunc, timing_manager | |
``` | |
This module provides a comprehensive and thread-safe toolkit for performance monitoring | |
in Python applications. It is designed to be both easy to use for quick debugging | |
and powerful enough for detailed performance analysis. | |
The module is built around two core components: | |
1. `@timefunc`: A versatile decorator that, when applied to any function, | |
measures its execution time automatically. It can be used with or without arguments. | |
2. `timing_manager`: A central, singleton-like instance of the `TimingManager` | |
class. This manager acts as the control plane for the entire system. It handles: | |
- **Configuration:** Programmatically enable or disable logging to the console | |
or files. | |
- **Data Aggregation:** Collects timing data from all decorated functions. | |
- **Dynamic Grouping:** Use the `with timing_manager.group_context()` block | |
to temporarily assign all function calls within it to a specific group, | |
overriding their default group. | |
- **Reporting:** Generate and display clean, aggregated statistical tables | |
in the console. | |
- **Visualization:** Create insightful charts (bar, box, pie) to visually | |
diagnose performance bottlenecks. | |
- **Data Export:** Save all collected metrics to a JSON file for archival | |
or further analysis in other tools. | |
Basic Usage: | |
```python | |
from your_module import timefunc, timing_manager | |
@timefunc(group="database_queries") | |
def fetch_user(user_id): | |
# Your database logic here... | |
time.sleep(0.1) | |
# Run your application logic | |
fetch_user(1) | |
fetch_user(2) | |
# At the end of your application run, display the results | |
timing_manager.display_stats() | |
# Or save a visual chart of the performance | |
timing_manager.create_chart("performance_report_bar.png", chart_type='bar', log_scale=True) | |
timing_manager.create_chart("performance_report_pie.png", chart_type='pie') | |
``` | |
""" | |
import json | |
import logging | |
import statistics | |
import threading | |
import time | |
from collections import defaultdict | |
from contextlib import contextmanager | |
from datetime import datetime | |
from functools import wraps | |
from pathlib import Path | |
from typing import Optional, Callable, TypeVar, ParamSpec, overload | |
try: | |
import matplotlib.pyplot as plt | |
except ImportError: | |
plt = None | |
try: | |
from tabulate import tabulate | |
except ImportError: | |
tabulate = None | |
logger = logging.getLogger(__name__) | |
__all__ = ['timefunc', 'timing_manager'] | |
class TimingManager: | |
_sentinel = object() # Used to detect if an argument was provided | |
# Constants for table formatting | |
GROUP_COL_WIDTH = 35 | |
FUNC_COL_WIDTH = 40 | |
"""A singleton-like manager for configuring, collecting, and reporting function execution times.""" | |
# ---------------------------------------------------------------------------- # | |
# Configuration # | |
# ---------------------------------------------------------------------------- # | |
def __init__(self): | |
"""Initializes the manager with default settings.""" | |
self._write_to_console = False | |
self._write_to_file = False | |
self._log_file_path: Optional[Path] = None | |
self.execution_stats = defaultdict(lambda: defaultdict(list)) | |
# thread safe stuff | |
self.context_data = threading.local() | |
self.data_lock = threading.Lock() | |
self._rebuild_log_handlers() | |
def configure( | |
self, | |
write_to_console: Optional[bool] = None, | |
log_file: Optional[str | Path] = _sentinel | |
): | |
""" | |
Configures the timing behavior for the application run. | |
This method is idempotent; it only changes settings that are explicitly provided. | |
Args: | |
write_to_console (Optional[bool]): If True, enables console logs. | |
If False, disables them. | |
If not provided, the current setting is unchanged. | |
log_file (Optional[str | Path]): Sets the path for the log file. | |
- Provide a string or Path to specify a file. | |
- Use 'default' for a timestamped log in ./logs/timing/. | |
- Provide `None` to explicitly disable file logging. | |
- If not provided, the current file log setting is unchanged. | |
""" | |
# Update internal state based on provided arguments | |
if write_to_console is not None: | |
self._write_to_console = write_to_console | |
if log_file is not self._sentinel: | |
if log_file: | |
self._write_to_file = True | |
if log_file == 'default': | |
# Use ISO format for better standard compliance and file name sorting | |
safe_timestamp = datetime.now().isoformat().replace(":", "-").replace(".", "_") | |
default_path = Path(f'./logs/timing/timing_{safe_timestamp}.log') | |
self._log_file_path = default_path | |
else: | |
self._log_file_path = Path(log_file) | |
# Safely create parent directories | |
self._log_file_path.parent.mkdir(parents=True, exist_ok=True) | |
else: # This means log_file was explicitly set to None | |
self._write_to_file = False | |
self._log_file_path = None | |
# Rebuild the logger handlers to apply the new configuration | |
self._rebuild_log_handlers() | |
def _rebuild_log_handlers(self): | |
""" | |
Private helper to set up logging handlers based on the current state. | |
This is the modern replacement for manual file I/O. | |
""" | |
# Clear any existing handlers to prevent duplicate logs on re-configuration | |
logger.handlers = [] | |
# Exit early if all logging is disabled | |
if not self._write_to_console and not self._write_to_file: | |
logger.setLevel(logging.CRITICAL + 1) # Effectively disable our logger | |
return | |
logger.setLevel(logging.INFO) | |
# Use a single formatter for consistent output | |
log_formatter = logging.Formatter('%(message)s') | |
if self._write_to_console: | |
stream_handler = logging.StreamHandler() | |
stream_handler.setFormatter(log_formatter) | |
logger.addHandler(stream_handler) | |
if self._write_to_file and self._log_file_path: | |
# The FileHandler manages opening/closing the file efficiently | |
file_handler = logging.FileHandler(self._log_file_path, mode='a', encoding='utf-8') | |
file_handler.setFormatter(log_formatter) | |
logger.addHandler(file_handler) | |
@contextmanager | |
def group_context(self, group_name: str): | |
"""A context manager to temporarily set the group for timed functions (`with` statement).""" | |
previous_group = getattr(self.context_data, 'active_group', None) | |
self.context_data.active_group = group_name | |
try: | |
yield | |
finally: | |
self.context_data.active_group = previous_group | |
# ---------------------------------------------------------------------------- # | |
# Stats # | |
# ---------------------------------------------------------------------------- # | |
def record_execution(self, group: str, func_name: str, exec_time: float): | |
"""Internal method to record a single function execution.""" | |
with self.data_lock: | |
self.execution_stats[group][func_name].append(exec_time) | |
def reset(self): | |
"""Clears all collected statistics.""" | |
with self.data_lock: | |
self.execution_stats.clear() | |
def get_stats(self) -> dict[str, dict[str, list[float]]]: | |
"""Returns the collected statistics as a dictionary.""" | |
return {group: dict(funcs) for group, funcs in self.execution_stats.items()} | |
def display_stats(self): | |
"""Prints a deterministic, formatted summary of all collected statistics.""" | |
if not self.execution_stats: | |
print("No statistics recorded.") | |
return | |
# Dispatch to the appropriate display method | |
if tabulate: | |
self._display_stats_tabulate() | |
else: | |
print("(Note: For a richer table view, run: pip install tabulate)") | |
self._display_stats_basic() | |
def _display_stats_basic(self): | |
"""The basic, dependency-free print format for statistics.""" | |
print("\n--- Execution Time Statistics ---") | |
if not self.execution_stats: | |
print("No statistics recorded.") | |
return | |
# Sort groups and functions for deterministic output | |
for group, funcs in sorted(self.execution_stats.items()): | |
all_group_times = [] | |
print(f"\n[Group: {group}]") | |
for func_name, times in sorted(funcs.items()): | |
count, total_time = len(times), sum(times) | |
mean_time = total_time / count | |
all_group_times.extend(times) | |
print(f" - Function: {func_name}") | |
print(f" - Runs: {count}, Total Time: {total_time:.4f}s, Mean Time: {mean_time:.4f}s") | |
group_count, group_total = len(all_group_times), sum(all_group_times) | |
group_mean = group_total / group_count | |
print("-" * 20) | |
print(f" > Group Summary: Runs: {group_count}, Total: {group_total:.4f}s, Mean: {group_mean:.4f}s") | |
print("\n" + "=" * 33 + "\n") | |
def _display_stats_tabulate(self): | |
"""The rich table format for statistics, requires `tabulate`.""" | |
print("\n--- Function Execution Statistics ---") | |
func_headers = [ | |
"Group", "Function", "Runs", "Total (s)", | |
"Mean (s)", "Median (s)", "Min (s)", "Max (s)" | |
] | |
func_data = [] | |
group_summary_data = {} | |
for group, funcs in sorted(self.execution_stats.items()): | |
all_group_times = [] | |
for func_name, times in sorted(funcs.items()): | |
if not times: | |
continue | |
all_group_times.extend(times) | |
func_data.append([ | |
group, | |
func_name, | |
len(times), | |
f"{sum(times):.4f}", | |
f"{statistics.mean(times):.4f}", | |
f"{statistics.median(times):.4f}", | |
f"{min(times):.4f}", | |
f"{max(times):.4f}", | |
]) | |
if all_group_times: | |
group_summary_data[group] = all_group_times | |
print(tabulate(func_data, headers=func_headers, tablefmt="fancy_grid")) | |
print("\n--- Group Summary Statistics ---") | |
group_headers = ["Group", "Total Runs", "Total Time (s)", "Mean Time (s)"] | |
group_data = [] | |
for group, times in sorted(group_summary_data.items()): | |
group_data.append([ | |
group, | |
len(times), | |
f"{sum(times):.4f}", | |
f"{statistics.mean(times):.4f}", | |
]) | |
print(tabulate(group_data, headers=group_headers, tablefmt="fancy_grid")) | |
def save_stats_to_json(self, filepath: str|Path): | |
"""Saves the collected statistics to a JSON file.""" | |
path = Path(filepath) | |
path.parent.mkdir(parents=True, exist_ok=True) | |
serializable_stats = {group: dict(funcs) for group, funcs in self.execution_stats.items()} | |
path.write_text(json.dumps(serializable_stats, indent=4)) | |
def create_chart( | |
self, | |
output_path: str | Path, | |
chart_type: str = 'bar', | |
log_scale: bool = False | |
): | |
""" | |
Generates and saves a performance chart based on collected data. | |
Args: | |
output_path: The file path to save the chart image (e.g., 'performance.png'). | |
chart_type: The type of chart to generate. | |
'bar' -> Compares mean execution time of each function. | |
'box' -> More details than bar. | |
'pie' -> Compares total time consumed by each group. | |
log_scale: If True, uses a logarithmic scale for the y-axis. Only used for chart_type='bar'. | |
""" | |
if plt is None: | |
print("\nError: Matplotlib is not installed. Cannot create charts.") | |
print("Please install it using: pip install matplotlib") | |
return | |
if not self.execution_stats: | |
print("No statistics recorded, cannot generate chart.") | |
return | |
path = Path(output_path) | |
path.parent.mkdir(parents=True, exist_ok=True) | |
fig, ax = plt.subplots(figsize=(10, 8), dpi=100) | |
if chart_type == 'bar': | |
# --- Bar Chart: Mean time per function --- | |
func_means = [] | |
for group, funcs in self.execution_stats.items(): | |
for func_name, times in funcs.items(): | |
mean_time = sum(times) / len(times) | |
# Use a clear, unique label for each function | |
func_means.append((f"({group}) {func_name}", mean_time)) | |
# Sort by time (slowest first) for a more readable chart | |
func_means.sort(key=lambda x: x[1]) | |
labels = [item[0] for item in func_means] | |
values = [item[1] for item in func_means] | |
ax.barh(labels, values, color='skyblue') | |
ax.set_xlabel('Mean Execution Time (seconds)') | |
ax.set_ylabel('(Group) Function') | |
ax.set_title('Mean Execution Time per Function') | |
ax.grid(axis='x', linestyle='--', alpha=0.7) | |
if log_scale: | |
ax.set_xscale('log') | |
elif chart_type == 'pie': | |
# --- Pie Chart: Total time per group --- | |
group_totals = defaultdict(float) | |
for group, funcs in self.execution_stats.items(): | |
for _, times in funcs.items(): | |
group_totals[group] += sum(times) | |
# Sort for consistent pie ordering | |
sorted_groups = sorted(group_totals.items(), key=lambda x: x[1], reverse=True) | |
labels = [item[0] for item in sorted_groups] | |
sizes = [item[1] for item in sorted_groups] | |
ax.pie(sizes, labels=labels, autopct='%1.1f%%', startangle=90, | |
wedgeprops={'edgecolor': 'white'}) | |
ax.axis('equal') # Ensures the pie chart is a circle. | |
ax.set_title('Percentage of Total Time per Group') | |
elif chart_type == 'box': | |
# Create a temporary list of tuples containing all data needed for sorting | |
plot_data_with_median = [] | |
for group, funcs in self.execution_stats.items(): | |
for func_name, times in funcs.items(): | |
if not times: | |
continue # Skip if a function was never run | |
# Calculate the median time for this function | |
median_time = statistics.median(times) | |
label = f"({group}) {func_name}" | |
plot_data_with_median.append((label, times, median_time)) | |
# Sort this list based on the median time (the 3rd element in the tuple) | |
plot_data_with_median.sort(key=lambda item: item[2], reverse=False) | |
sorted_labels = [item[0] for item in plot_data_with_median] | |
sorted_data_lists = [item[1] for item in plot_data_with_median] | |
ax.boxplot(sorted_data_lists, tick_labels=sorted_labels, vert=False, patch_artist=True) | |
ax.set_xlabel('Execution Time Distribution (seconds)') | |
ax.set_title('Function Performance Distribution (Sorted by Median)') | |
if log_scale: | |
ax.set_xscale('log') | |
else: | |
print(f"Error: Unknown chart type '{chart_type}'. Use 'bar', 'box' or 'pie'.") | |
return | |
plt.tight_layout() | |
try: | |
plt.savefig(path) | |
print(f"Chart successfully saved to '{path}'") | |
except Exception as e: | |
print(f"Error saving chart: {e}") | |
finally: | |
plt.close(fig) # Free up memory | |
# --- The single, shared instance for the entire application --- | |
timing_manager = TimingManager() | |
# ---------------------------------------------------------------------------- # | |
# Decorator # | |
# ---------------------------------------------------------------------------- # | |
# Help IDE to understand the decorator input and output | |
P = ParamSpec("P") | |
R = TypeVar("R") | |
# Also help IDE, just some overloads decorator to help when using the decorator without '()' after the '@timefunc' call | |
@overload | |
def timefunc(*, group: str = "Ungrouped") -> Callable[[Callable[P, R]], Callable[P, R]]: | |
... | |
@overload | |
def timefunc(_func: Callable[P, R]) -> Callable[P, R]: | |
... | |
# The actual decorator | |
def timefunc(_func:Callable[P,R]|None=None, *, group:str="Ungrouped") -> Callable[P,R] | Callable[[Callable[P,R]], Callable[P,R]]: | |
def decorator(func:Callable[P,R]) -> Callable[P,R]: | |
@wraps(func) | |
def wrapper(*args:P.args, **kwargs:P.kwargs) -> R: | |
# Measure timing | |
start_time = time.perf_counter() | |
result = func(*args, **kwargs) | |
end_time = time.perf_counter() | |
execution_time = end_time - start_time | |
# Chech dynamic group | |
context_group = getattr(timing_manager.context_data, 'active_group', None) | |
final_group = context_group or group | |
# Save timing | |
timing_manager.record_execution(final_group, func.__name__, execution_time) | |
# Log timing | |
timestamp_part = f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S:%f')[:-3]}]" | |
group_part = f"Group: '{final_group}'" | |
func_part = f"Function: '{func.__name__}'" | |
log_message = ( | |
f"{timestamp_part} " | |
f"{group_part:<{TimingManager.GROUP_COL_WIDTH}} " | |
f"{func_part:<{TimingManager.FUNC_COL_WIDTH}} " | |
f"Execution Time: {execution_time:.5f}s" | |
) | |
logger.info(log_message) | |
return result | |
return wrapper | |
return decorator if _func is None else decorator(_func) | |
# ---------------------------------------------------------------------------- # | |
# Example # | |
# ---------------------------------------------------------------------------- # | |
if __name__ == '__main__': | |
import random | |
# 1. Define some functions to be timed | |
@timefunc(group="database") | |
def query_user_data(): | |
"""Simulates a database query.""" | |
time.sleep(random.uniform(0.1, 0.2)) | |
@timefunc(group="api_call") | |
def fetch_weather_data(): | |
"""Simulates an external API call.""" | |
time.sleep(random.uniform(0.2, 0.4)) | |
@timefunc # This will use the default 'Ungrouped' group | |
def perform_calculation(): | |
"""Simulates a CPU-bound task.""" | |
_ = [i*i for i in range(1_000_000)] | |
# --- SCENARIO 1: Default Behavior (Log to Console) --- | |
print("="*50) | |
print("--- SCENARIO 1: Running with default settings (console output only) ---") | |
print("="*50) | |
query_user_data() | |
perform_calculation() | |
# --- SCENARIO 2: Using the Context Manager --- | |
print("\n" + "="*50) | |
print("--- SCENARIO 2: Using the 'group_context' manager ---") | |
print("="*50) | |
print("Running two functions inside the 'user_login_flow' context...") | |
with timing_manager.group_context("user_login_flow"): | |
# Notice how the group for these calls will be 'user_login_flow' | |
query_user_data() | |
fetch_weather_data() | |
print("\nRunning the same function outside the context (reverts to its default group)...") | |
query_user_data() | |
# --- SCENARIO 3: Configure to Write to a File --- | |
print("\n" + "="*50) | |
print("--- SCENARIO 3: Configuring to log to a file (and disabling console) ---") | |
print("="*50) | |
timing_manager.configure( | |
write_to_console=False, | |
log_file="timing_decorator_example.log" | |
) | |
print("Console output is now OFF. Check 'timing_decorator_example.log' for the next two entries.") | |
fetch_weather_data() | |
perform_calculation() | |
# --- FINAL STEP: Display and Save Aggregated Statistics --- | |
# The stats include ALL function calls from every scenario above. | |
timing_manager.display_stats() | |
# Generate and save the performance charts | |
timing_manager.create_chart( | |
output_path="charts/function_mean_times.png", | |
chart_type='bar' | |
) | |
timing_manager.create_chart( | |
output_path="charts/function_mean_times_log.png", | |
chart_type='bar', | |
log_scale=True | |
) | |
timing_manager.create_chart( | |
output_path="charts/function_detail_times.png", | |
chart_type='box' | |
) | |
timing_manager.create_chart( | |
output_path="charts/group_total_times.png", | |
chart_type='pie' | |
) | |
# Save the complete statistics to a JSON file for later analysis. | |
timing_manager.save_stats_to_json("final_timing_stats.json") | |
print("Complete statistics have been saved to 'final_timing_stats.json'") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment