Created
April 3, 2025 17:20
-
-
Save Jeremiah-England/b0c7bb0f1774f86b1124870ffdb9da7f to your computer and use it in GitHub Desktop.
Plot the tokens used over time by a git repository
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Analyzes the token count (using ttok) of specified file types (.md, .py) | |
over the history of a Git repository branch (default: origin/master). | |
Plots the token count over time and adds a linear regression trend line | |
for the most recent period. | |
Requires: Python 3.10+, git, ttok, matplotlib, numpy. | |
Usage: Run from the root directory of the target Git repository. | |
python plot_repo_tokens.py | |
""" | |
import datetime | |
import shutil | |
import statistics # Requires Python 3.10+ | |
import subprocess | |
import sys | |
import time # For progress timing | |
import numpy as np | |
# --- Prerequisite Check --- | |
try: | |
import matplotlib.dates as mdates | |
import matplotlib.pyplot as plt | |
except ImportError: | |
print("Error: matplotlib is required.") | |
print("Please install it using: pip install matplotlib numpy") | |
sys.exit(1) | |
if sys.version_info < (3, 10): | |
print("Error: This script requires Python 3.10+ for statistics.linear_regression.") | |
print(f"You are using Python {sys.version_info.major}.{sys.version_info.minor}") | |
sys.exit(1) | |
# --- Configuration --- | |
REPO_PATH = "." # Analyze the repo in the current directory | |
BRANCH = "origin/master" # Analyze the history of this branch | |
DAYS_BACK = 365 * 2 # Go back 2 years | |
DATE_STEP_DAYS = 10 # Sample every 10 days | |
FILE_PATTERNS = ["**/*.ts", "**/*.tsc", "**/*.tsx", "**/*.md", "**/*.js"] # File patterns for analysis | |
# --- Regression Configuration --- | |
REGRESSION_MONTHS = 3 # Calculate regression over the last N months of data | |
MIN_POINTS_FOR_REGRESSION = 5 # Minimum data points needed for regression calculation | |
# --- Helper Functions --- | |
def run_command(cmd: list[str], check: bool = True, capture_output: bool = True, text: bool = True, **kwargs) -> subprocess.CompletedProcess | None: | |
"""Runs a command using subprocess, handles errors, and manages encoding.""" | |
try: | |
# Ensure input encoding matches 'text' argument expectation | |
if "input" in kwargs: | |
if text and isinstance(kwargs["input"], bytes): | |
kwargs["input"] = kwargs["input"].decode("utf-8", errors="replace") | |
elif not text and isinstance(kwargs["input"], str): | |
kwargs["input"] = kwargs["input"].encode("utf-8", errors="replace") | |
# print(f"Running command: {' '.join(cmd)}") # Uncomment for debugging | |
process = subprocess.run(cmd, check=False, capture_output=capture_output, text=text, cwd=REPO_PATH, **kwargs) | |
# Manual check if caller expects success | |
if check and process.returncode != 0: | |
raise subprocess.CalledProcessError(process.returncode, cmd, output=process.stdout, stderr=process.stderr) | |
return process | |
except FileNotFoundError: | |
print(f"\nError: Command '{cmd[0]}' not found. Make sure it's installed and in your PATH.") | |
sys.exit(1) # Exit if essential commands are missing | |
except subprocess.CalledProcessError as e: | |
# Only print detailed errors if check=True would have raised it | |
if check: | |
print(f"\nError running command: {' '.join(cmd)}") | |
print(f"Return code: {e.returncode}") | |
if e.stdout: print(f"Output:\n{e.stdout}") | |
if e.stderr: print(f"Error output:\n{e.stderr}") | |
raise # Re-raise if the caller expects exceptions on failure | |
return e # Return the completed process object even on error if check=False | |
except Exception as e: | |
print(f"\nAn unexpected error occurred while running {' '.join(cmd)}: {e}") | |
return None | |
def check_prerequisites(): | |
"""Check if git, ttok, and required libraries are available.""" | |
print("Checking prerequisites...") | |
if not shutil.which("git"): | |
print("Error: 'git' command not found. Please install Git.") | |
sys.exit(1) | |
if not shutil.which("ttok"): | |
print("Error: 'ttok' command not found.") | |
print("Please install it using: pip install ttok") | |
sys.exit(1) | |
try: | |
import numpy # noqa: F401 | |
except ImportError: | |
print("Error: 'numpy' package not found.") | |
print("Please install it using: pip install matplotlib numpy") | |
sys.exit(1) | |
print("Prerequisites met.") | |
def check_repo_state(): | |
"""Check if the repository working directory is clean (optional but recommended).""" | |
print("Checking repository status...") | |
result = run_command(["git", "status", "--porcelain"], check=False) # Don't raise if git status fails | |
if result is None or result.returncode != 0: | |
print("Warning: Could not reliably check repository status (git status failed). Proceeding cautiously.") | |
return | |
if result.stdout.strip(): | |
print("\nWarning: Your Git working directory is not clean.") | |
print(" This script reads historical data but running with uncommitted changes is not recommended.") | |
print("Output of 'git status --porcelain':") | |
print(result.stdout.strip()) | |
# Add sys.exit(1) here if a clean state should be mandatory | |
else: | |
print("Repository is clean.") | |
def get_commit_hash_for_date(target_date: datetime.datetime) -> str | None: | |
"""Find the latest commit hash on the specified branch on or before the target date.""" | |
date_str = target_date.isoformat() | |
cmd = ["git", "rev-list", "-n", "1", f"--before={date_str}", BRANCH] | |
result = run_command(cmd, check=False) # Check failure manually | |
if result and result.returncode == 0 and result.stdout.strip(): | |
return result.stdout.strip() | |
if result and result.returncode != 0 and "unknown revision or path" in result.stderr.lower(): | |
print(f"\nWarning: Branch '{BRANCH}' not found or invalid.") | |
# Attempt to find default branch (e.g., main or master) could be added here | |
return None # Cannot proceed without a valid branch | |
# Fallback for dates before the first commit (more costly check) | |
first_commit_cmd = ["git", "rev-list", "--max-parents=0", "--pretty=format:%cI", BRANCH] | |
first_commit_result = run_command(first_commit_cmd, check=False) | |
if first_commit_result and first_commit_result.returncode == 0 and first_commit_result.stdout.strip(): | |
try: | |
# Output might contain commit hash and date line | |
lines = first_commit_result.stdout.strip().splitlines() | |
first_commit_date_str = lines[-1] # Date is usually last line of format | |
# Parse date, handling timezones | |
if "+" in first_commit_date_str or (first_commit_date_str.endswith("Z") and "-" in first_commit_date_str): | |
first_commit_dt = datetime.datetime.fromisoformat(first_commit_date_str.replace("Z", "+00:00")) | |
# Ensure target_date is comparable (make it aware if naive) | |
if target_date.tzinfo is None: | |
target_date = target_date.replace(tzinfo=datetime.timezone.utc) | |
else: # Assume naive UTC if no timezone info | |
first_commit_dt = datetime.datetime.fromisoformat(first_commit_date_str) | |
if target_date.tzinfo is not None: target_date = target_date.astimezone(datetime.timezone.utc).replace(tzinfo=None) | |
if target_date < first_commit_dt: | |
# This date is before the repository's history began on this branch | |
return None # Signal no relevant commit exists | |
except Exception as e: | |
print(f"\nWarning: Could not parse first commit date. {e}") | |
# Proceed, maybe rev-list just failed for other reasons | |
# If fallback didn't work or date wasn't before first commit, log warning | |
# print(f"\nWarning: No commit found on '{BRANCH}' for date {target_date.date()}.") | |
return None # No suitable commit found | |
def get_token_count_for_commit(commit_hash: str) -> int | None: | |
"""Get the ttok count for specified file patterns at a given commit hash.""" | |
# 1. List ALL files recursively at that commit | |
ls_tree_cmd = ["git", "ls-tree", "-r", "--name-only", commit_hash] | |
ls_result = run_command(ls_tree_cmd, check=False) # Don't fail if commit has no files | |
if ls_result is None: return None # Error running command | |
if ls_result.returncode != 0: | |
# print(f"\nWarning: git ls-tree failed for commit {commit_hash[:8]}. stderr: {ls_result.stderr.strip()}") | |
return 0 # Treat as 0 if tree is inaccessible/invalid | |
if not ls_result.stdout.strip(): | |
return 0 # Commit has no files | |
all_files = ls_result.stdout.strip().split("\n") | |
# 2. Filter the file list using configured patterns | |
try: | |
# Simple endswith check for common patterns like **/*.ext | |
target_extensions = tuple("." + p.split(".")[-1].lower() for p in FILE_PATTERNS if "*." in p) | |
if not target_extensions: | |
print(f"\nWarning: Could not extract target extensions from FILE_PATTERNS: {FILE_PATTERNS}. Check format.") | |
return 0 | |
except Exception as e: | |
print(f"\nError processing FILE_PATTERNS: {e}. Patterns: {FILE_PATTERNS}") | |
return None # Pattern error is critical | |
matching_files = [f for f in all_files if f.lower().endswith(target_extensions)] | |
if not matching_files: | |
return 0 # No files match the patterns in this commit | |
# 3. Get content of matching files and concatenate | |
all_content = "" | |
for file_path in matching_files: | |
show_cmd = ["git", "show", f"{commit_hash}:{file_path}"] | |
show_result = run_command(show_cmd, check=False, text=False) # Get raw bytes | |
if show_result and show_result.returncode == 0 and show_result.stdout: | |
try: | |
# Decode as UTF-8, replacing errors to avoid crashing on binary/corrupt files | |
file_content = show_result.stdout.decode("utf-8", errors="replace") | |
all_content += file_content + "\n" # Add newline separator | |
except Exception: | |
# print(f"\nWarning: Could not decode content for '{file_path}': {e}. Skipping.") | |
continue # Skip problematic file | |
# else: print(f"\nWarning: Could not retrieve content for '{file_path}'. Skipping.") | |
if not all_content.strip(): | |
return 0 # No valid text content found | |
# 4. Run ttok on the concatenated content via stdin | |
ttok_cmd = ["ttok"] | |
ttok_result = run_command(ttok_cmd, input=all_content, check=False, text=True) # Pass string, get string | |
if ttok_result and ttok_result.returncode == 0 and ttok_result.stdout.strip() is not None: | |
try: | |
return int(ttok_result.stdout.strip()) | |
except ValueError: | |
print(f"\nWarning: Could not parse ttok output '{ttok_result.stdout.strip()}' as integer for commit {commit_hash[:8]}.") | |
return None # Indicate failure to parse | |
else: | |
# print(f"\nWarning: ttok command failed or produced no output for commit {commit_hash[:8]}.") | |
# if ttok_result and ttok_result.stderr: print(f" ttok stderr: {ttok_result.stderr.strip()}") | |
return None # Indicate ttok failure | |
# --- Main Execution --- | |
if __name__ == "__main__": | |
start_time = time.time() | |
check_prerequisites() | |
# check_repo_state() # Uncomment this line to enforce a clean working directory | |
# --- Data Fetching --- | |
print(f"\nAnalyzing Git history for branch '{BRANCH}'...") | |
end_date = datetime.datetime.now(datetime.timezone.utc) # Use timezone-aware | |
start_date = end_date - datetime.timedelta(days=DAYS_BACK) | |
date_step = datetime.timedelta(days=DATE_STEP_DAYS) | |
results_list: list[tuple[datetime.datetime, int]] = [] | |
processed_commits = set() # Avoid reprocessing the same commit for different dates | |
skipped_dates = 0 | |
total_steps = (DAYS_BACK // DATE_STEP_DAYS) + 1 | |
step_count = 0 | |
current_date = end_date | |
while current_date >= start_date: | |
step_count += 1 | |
progress = (step_count / total_steps) * 100 | |
# Use timezone-aware comparison | |
if current_date.tzinfo is None: current_date = current_date.replace(tzinfo=datetime.timezone.utc) | |
# Update progress on the same line | |
print(f"\rProcessing: {current_date.date()} [{step_count}/{total_steps}] ({progress:.1f}%)", end="") | |
commit_hash = get_commit_hash_for_date(current_date) | |
if commit_hash: | |
if commit_hash not in processed_commits: | |
token_count = get_token_count_for_commit(commit_hash) | |
if token_count is not None: | |
# Store the date we aimed for, and the count from the commit found | |
results_list.append((current_date, token_count)) | |
processed_commits.add(commit_hash) | |
# else: print(f"\n Skipped commit {commit_hash[:8]} due to token count error.") | |
# else: pass # Commit already processed for a more recent date step | |
else: | |
skipped_dates += 1 | |
# print(f"\n No suitable commit found on or before {current_date.date()}.") | |
current_date -= date_step | |
print(f"\rData fetching complete. Processed {total_steps} date steps. {' '*20}") # Clear progress line | |
fetch_duration = time.time() - start_time | |
print(f"Found data for {len(results_list)} points ({len(processed_commits)} unique commits). Skipped {skipped_dates} dates before first commit.") | |
print(f"Data fetching took {fetch_duration:.2f} seconds.") | |
if not results_list: | |
print("\nNo data collected. Cannot generate plot.") | |
sys.exit(0) | |
# Sort results strictly by date for plotting | |
results_list.sort(key=lambda x: x[0]) | |
dates = [r[0] for r in results_list] | |
counts = [r[1] for r in results_list] | |
# --- Plotting --- | |
print("Generating plot...") | |
fig, ax = plt.subplots(figsize=(15, 8)) # Slightly larger figure | |
# Plot the main token count data | |
ax.plot(dates, counts, marker=".", linestyle="-", markersize=4, label="Token Count") | |
# --- Plot Formatting --- | |
ax.set_xlabel("Date") | |
ax.set_ylabel("Token Count (ttok)") | |
patterns_str = ", ".join(FILE_PATTERNS) | |
ax.set_title(f"Token Count History ({patterns_str} on '{BRANCH}')") | |
ax.grid(True, which="major", linestyle="--", linewidth=0.7) | |
ax.grid(True, which="minor", linestyle=":", linewidth=0.5) # Minor grid | |
# Correct Date Formatting for X-Axis | |
major_formatter = mdates.DateFormatter("%Y-%m-%d") # Clear date format | |
ax.xaxis.set_major_formatter(major_formatter) | |
major_locator = mdates.MonthLocator(interval=3) # Major ticks every 3 months (adjust as needed) | |
ax.xaxis.set_major_locator(major_locator) | |
minor_locator = mdates.MonthLocator() # Minor ticks every month | |
ax.xaxis.set_minor_locator(minor_locator) | |
fig.autofmt_xdate(rotation=30, ha="right") # Rotate labels for readability | |
# Format Y-Axis with commas for thousands | |
ax.get_yaxis().set_major_formatter(plt.FuncFormatter(lambda x, p: format(int(x), ","))) | |
# --- Regression Analysis --- | |
print(f"Calculating linear regression for the last {REGRESSION_MONTHS} months...") | |
# Determine date cutoff based on the *last data point collected* | |
last_data_date = dates[-1] | |
regression_cutoff_date = last_data_date - datetime.timedelta(days=REGRESSION_MONTHS * 30.44) # Approx months | |
# Filter data points that fall within the regression period | |
regression_points = [(dt, ct) for dt, ct in zip(dates, counts, strict=False) if dt >= regression_cutoff_date] | |
print(f"Found {len(regression_points)} data points since {regression_cutoff_date.date()} for regression.") | |
if len(regression_points) < MIN_POINTS_FOR_REGRESSION: | |
print(f"Not enough data points for regression (minimum {MIN_POINTS_FOR_REGRESSION}). Skipping trend line.") | |
else: | |
try: | |
regression_dates_dt = [p[0] for p in regression_points] | |
regression_counts = [p[1] for p in regression_points] | |
# Convert datetime objects to numerical representation for regression | |
regression_dates_num = mdates.date2num(regression_dates_dt) | |
# Perform linear regression using statistics module | |
lin_reg = statistics.linear_regression(regression_dates_num, regression_counts) | |
slope = lin_reg.slope # tokens per day (since date2num unit is days) | |
intercept = lin_reg.intercept | |
slope_per_year = slope * 365.25 | |
print(f" Slope: {slope:.2f} tokens/day ({slope_per_year:,.0f} tokens/year)") | |
# Generate points for the regression line (start and end of regression period) | |
reg_line_x_num = np.array([regression_dates_num[0], regression_dates_num[-1]]) | |
reg_line_y = intercept + slope * reg_line_x_num | |
# Convert numerical dates back to datetime for plotting | |
reg_line_x_dt = mdates.num2date(reg_line_x_num) | |
# Plot the regression line on the same axes | |
ax.plot(reg_line_x_dt, reg_line_y, color="red", linestyle="--", linewidth=2, | |
label=f"Trend (Last {REGRESSION_MONTHS} mo)") | |
# Add Slope Annotation Box | |
annotation_text = f"Slope (last {REGRESSION_MONTHS} mo):\n{slope_per_year:,.0f} tokens/year" | |
ax.text(0.02, 0.98, annotation_text, transform=ax.transAxes, # Position relative to axes | |
fontsize=9, verticalalignment="top", | |
bbox=dict(boxstyle="round,pad=0.4", fc="wheat", alpha=0.8)) # Background box | |
print("Regression line and annotation added.") | |
except statistics.StatisticsError as e: | |
print(f"Could not calculate regression: {e}. Check data variability. Skipping trend line.") | |
except Exception as e: | |
print(f"An error occurred during regression/plotting: {e}. Skipping trend line.") | |
# --- Final Touches --- | |
ax.legend(loc="best") # Add legend (loc='best' tries to find a good spot) | |
plt.tight_layout(pad=1.5) # Adjust layout to prevent labels overlapping axes/title | |
# --- Save and Show Plot --- | |
plot_filename = "token_count_history.png" | |
try: | |
plt.savefig(plot_filename, dpi=150, bbox_inches="tight") | |
print(f"\nPlot saved to {plot_filename}") | |
except Exception as e: | |
print(f"\nError saving plot to {plot_filename}: {e}") | |
print("Displaying plot...") | |
plt.show() | |
total_duration = time.time() - start_time | |
print(f"\nScript finished in {total_duration:.2f} seconds.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Written by Gemini 2.5 Pro, inspired and directed by me.