Skip to content

Instantly share code, notes, and snippets.

@Jeremiah-England
Created April 3, 2025 17:20
Show Gist options
  • Save Jeremiah-England/b0c7bb0f1774f86b1124870ffdb9da7f to your computer and use it in GitHub Desktop.
Save Jeremiah-England/b0c7bb0f1774f86b1124870ffdb9da7f to your computer and use it in GitHub Desktop.
Plot the tokens used over time by a git repository
#!/usr/bin/env python3
"""
Analyzes the token count (using ttok) of specified file types (.md, .py)
over the history of a Git repository branch (default: origin/master).
Plots the token count over time and adds a linear regression trend line
for the most recent period.
Requires: Python 3.10+, git, ttok, matplotlib, numpy.
Usage: Run from the root directory of the target Git repository.
python plot_repo_tokens.py
"""
import datetime
import shutil
import statistics # Requires Python 3.10+
import subprocess
import sys
import time # For progress timing
import numpy as np
# --- Prerequisite Check ---
try:
import matplotlib.dates as mdates
import matplotlib.pyplot as plt
except ImportError:
print("Error: matplotlib is required.")
print("Please install it using: pip install matplotlib numpy")
sys.exit(1)
if sys.version_info < (3, 10):
print("Error: This script requires Python 3.10+ for statistics.linear_regression.")
print(f"You are using Python {sys.version_info.major}.{sys.version_info.minor}")
sys.exit(1)
# --- Configuration ---
REPO_PATH = "." # Analyze the repo in the current directory
BRANCH = "origin/master" # Analyze the history of this branch
DAYS_BACK = 365 * 2 # Go back 2 years
DATE_STEP_DAYS = 10 # Sample every 10 days
FILE_PATTERNS = ["**/*.ts", "**/*.tsc", "**/*.tsx", "**/*.md", "**/*.js"] # File patterns for analysis
# --- Regression Configuration ---
REGRESSION_MONTHS = 3 # Calculate regression over the last N months of data
MIN_POINTS_FOR_REGRESSION = 5 # Minimum data points needed for regression calculation
# --- Helper Functions ---
def run_command(cmd: list[str], check: bool = True, capture_output: bool = True, text: bool = True, **kwargs) -> subprocess.CompletedProcess | None:
"""Runs a command using subprocess, handles errors, and manages encoding."""
try:
# Ensure input encoding matches 'text' argument expectation
if "input" in kwargs:
if text and isinstance(kwargs["input"], bytes):
kwargs["input"] = kwargs["input"].decode("utf-8", errors="replace")
elif not text and isinstance(kwargs["input"], str):
kwargs["input"] = kwargs["input"].encode("utf-8", errors="replace")
# print(f"Running command: {' '.join(cmd)}") # Uncomment for debugging
process = subprocess.run(cmd, check=False, capture_output=capture_output, text=text, cwd=REPO_PATH, **kwargs)
# Manual check if caller expects success
if check and process.returncode != 0:
raise subprocess.CalledProcessError(process.returncode, cmd, output=process.stdout, stderr=process.stderr)
return process
except FileNotFoundError:
print(f"\nError: Command '{cmd[0]}' not found. Make sure it's installed and in your PATH.")
sys.exit(1) # Exit if essential commands are missing
except subprocess.CalledProcessError as e:
# Only print detailed errors if check=True would have raised it
if check:
print(f"\nError running command: {' '.join(cmd)}")
print(f"Return code: {e.returncode}")
if e.stdout: print(f"Output:\n{e.stdout}")
if e.stderr: print(f"Error output:\n{e.stderr}")
raise # Re-raise if the caller expects exceptions on failure
return e # Return the completed process object even on error if check=False
except Exception as e:
print(f"\nAn unexpected error occurred while running {' '.join(cmd)}: {e}")
return None
def check_prerequisites():
"""Check if git, ttok, and required libraries are available."""
print("Checking prerequisites...")
if not shutil.which("git"):
print("Error: 'git' command not found. Please install Git.")
sys.exit(1)
if not shutil.which("ttok"):
print("Error: 'ttok' command not found.")
print("Please install it using: pip install ttok")
sys.exit(1)
try:
import numpy # noqa: F401
except ImportError:
print("Error: 'numpy' package not found.")
print("Please install it using: pip install matplotlib numpy")
sys.exit(1)
print("Prerequisites met.")
def check_repo_state():
"""Check if the repository working directory is clean (optional but recommended)."""
print("Checking repository status...")
result = run_command(["git", "status", "--porcelain"], check=False) # Don't raise if git status fails
if result is None or result.returncode != 0:
print("Warning: Could not reliably check repository status (git status failed). Proceeding cautiously.")
return
if result.stdout.strip():
print("\nWarning: Your Git working directory is not clean.")
print(" This script reads historical data but running with uncommitted changes is not recommended.")
print("Output of 'git status --porcelain':")
print(result.stdout.strip())
# Add sys.exit(1) here if a clean state should be mandatory
else:
print("Repository is clean.")
def get_commit_hash_for_date(target_date: datetime.datetime) -> str | None:
"""Find the latest commit hash on the specified branch on or before the target date."""
date_str = target_date.isoformat()
cmd = ["git", "rev-list", "-n", "1", f"--before={date_str}", BRANCH]
result = run_command(cmd, check=False) # Check failure manually
if result and result.returncode == 0 and result.stdout.strip():
return result.stdout.strip()
if result and result.returncode != 0 and "unknown revision or path" in result.stderr.lower():
print(f"\nWarning: Branch '{BRANCH}' not found or invalid.")
# Attempt to find default branch (e.g., main or master) could be added here
return None # Cannot proceed without a valid branch
# Fallback for dates before the first commit (more costly check)
first_commit_cmd = ["git", "rev-list", "--max-parents=0", "--pretty=format:%cI", BRANCH]
first_commit_result = run_command(first_commit_cmd, check=False)
if first_commit_result and first_commit_result.returncode == 0 and first_commit_result.stdout.strip():
try:
# Output might contain commit hash and date line
lines = first_commit_result.stdout.strip().splitlines()
first_commit_date_str = lines[-1] # Date is usually last line of format
# Parse date, handling timezones
if "+" in first_commit_date_str or (first_commit_date_str.endswith("Z") and "-" in first_commit_date_str):
first_commit_dt = datetime.datetime.fromisoformat(first_commit_date_str.replace("Z", "+00:00"))
# Ensure target_date is comparable (make it aware if naive)
if target_date.tzinfo is None:
target_date = target_date.replace(tzinfo=datetime.timezone.utc)
else: # Assume naive UTC if no timezone info
first_commit_dt = datetime.datetime.fromisoformat(first_commit_date_str)
if target_date.tzinfo is not None: target_date = target_date.astimezone(datetime.timezone.utc).replace(tzinfo=None)
if target_date < first_commit_dt:
# This date is before the repository's history began on this branch
return None # Signal no relevant commit exists
except Exception as e:
print(f"\nWarning: Could not parse first commit date. {e}")
# Proceed, maybe rev-list just failed for other reasons
# If fallback didn't work or date wasn't before first commit, log warning
# print(f"\nWarning: No commit found on '{BRANCH}' for date {target_date.date()}.")
return None # No suitable commit found
def get_token_count_for_commit(commit_hash: str) -> int | None:
"""Get the ttok count for specified file patterns at a given commit hash."""
# 1. List ALL files recursively at that commit
ls_tree_cmd = ["git", "ls-tree", "-r", "--name-only", commit_hash]
ls_result = run_command(ls_tree_cmd, check=False) # Don't fail if commit has no files
if ls_result is None: return None # Error running command
if ls_result.returncode != 0:
# print(f"\nWarning: git ls-tree failed for commit {commit_hash[:8]}. stderr: {ls_result.stderr.strip()}")
return 0 # Treat as 0 if tree is inaccessible/invalid
if not ls_result.stdout.strip():
return 0 # Commit has no files
all_files = ls_result.stdout.strip().split("\n")
# 2. Filter the file list using configured patterns
try:
# Simple endswith check for common patterns like **/*.ext
target_extensions = tuple("." + p.split(".")[-1].lower() for p in FILE_PATTERNS if "*." in p)
if not target_extensions:
print(f"\nWarning: Could not extract target extensions from FILE_PATTERNS: {FILE_PATTERNS}. Check format.")
return 0
except Exception as e:
print(f"\nError processing FILE_PATTERNS: {e}. Patterns: {FILE_PATTERNS}")
return None # Pattern error is critical
matching_files = [f for f in all_files if f.lower().endswith(target_extensions)]
if not matching_files:
return 0 # No files match the patterns in this commit
# 3. Get content of matching files and concatenate
all_content = ""
for file_path in matching_files:
show_cmd = ["git", "show", f"{commit_hash}:{file_path}"]
show_result = run_command(show_cmd, check=False, text=False) # Get raw bytes
if show_result and show_result.returncode == 0 and show_result.stdout:
try:
# Decode as UTF-8, replacing errors to avoid crashing on binary/corrupt files
file_content = show_result.stdout.decode("utf-8", errors="replace")
all_content += file_content + "\n" # Add newline separator
except Exception:
# print(f"\nWarning: Could not decode content for '{file_path}': {e}. Skipping.")
continue # Skip problematic file
# else: print(f"\nWarning: Could not retrieve content for '{file_path}'. Skipping.")
if not all_content.strip():
return 0 # No valid text content found
# 4. Run ttok on the concatenated content via stdin
ttok_cmd = ["ttok"]
ttok_result = run_command(ttok_cmd, input=all_content, check=False, text=True) # Pass string, get string
if ttok_result and ttok_result.returncode == 0 and ttok_result.stdout.strip() is not None:
try:
return int(ttok_result.stdout.strip())
except ValueError:
print(f"\nWarning: Could not parse ttok output '{ttok_result.stdout.strip()}' as integer for commit {commit_hash[:8]}.")
return None # Indicate failure to parse
else:
# print(f"\nWarning: ttok command failed or produced no output for commit {commit_hash[:8]}.")
# if ttok_result and ttok_result.stderr: print(f" ttok stderr: {ttok_result.stderr.strip()}")
return None # Indicate ttok failure
# --- Main Execution ---
if __name__ == "__main__":
start_time = time.time()
check_prerequisites()
# check_repo_state() # Uncomment this line to enforce a clean working directory
# --- Data Fetching ---
print(f"\nAnalyzing Git history for branch '{BRANCH}'...")
end_date = datetime.datetime.now(datetime.timezone.utc) # Use timezone-aware
start_date = end_date - datetime.timedelta(days=DAYS_BACK)
date_step = datetime.timedelta(days=DATE_STEP_DAYS)
results_list: list[tuple[datetime.datetime, int]] = []
processed_commits = set() # Avoid reprocessing the same commit for different dates
skipped_dates = 0
total_steps = (DAYS_BACK // DATE_STEP_DAYS) + 1
step_count = 0
current_date = end_date
while current_date >= start_date:
step_count += 1
progress = (step_count / total_steps) * 100
# Use timezone-aware comparison
if current_date.tzinfo is None: current_date = current_date.replace(tzinfo=datetime.timezone.utc)
# Update progress on the same line
print(f"\rProcessing: {current_date.date()} [{step_count}/{total_steps}] ({progress:.1f}%)", end="")
commit_hash = get_commit_hash_for_date(current_date)
if commit_hash:
if commit_hash not in processed_commits:
token_count = get_token_count_for_commit(commit_hash)
if token_count is not None:
# Store the date we aimed for, and the count from the commit found
results_list.append((current_date, token_count))
processed_commits.add(commit_hash)
# else: print(f"\n Skipped commit {commit_hash[:8]} due to token count error.")
# else: pass # Commit already processed for a more recent date step
else:
skipped_dates += 1
# print(f"\n No suitable commit found on or before {current_date.date()}.")
current_date -= date_step
print(f"\rData fetching complete. Processed {total_steps} date steps. {' '*20}") # Clear progress line
fetch_duration = time.time() - start_time
print(f"Found data for {len(results_list)} points ({len(processed_commits)} unique commits). Skipped {skipped_dates} dates before first commit.")
print(f"Data fetching took {fetch_duration:.2f} seconds.")
if not results_list:
print("\nNo data collected. Cannot generate plot.")
sys.exit(0)
# Sort results strictly by date for plotting
results_list.sort(key=lambda x: x[0])
dates = [r[0] for r in results_list]
counts = [r[1] for r in results_list]
# --- Plotting ---
print("Generating plot...")
fig, ax = plt.subplots(figsize=(15, 8)) # Slightly larger figure
# Plot the main token count data
ax.plot(dates, counts, marker=".", linestyle="-", markersize=4, label="Token Count")
# --- Plot Formatting ---
ax.set_xlabel("Date")
ax.set_ylabel("Token Count (ttok)")
patterns_str = ", ".join(FILE_PATTERNS)
ax.set_title(f"Token Count History ({patterns_str} on '{BRANCH}')")
ax.grid(True, which="major", linestyle="--", linewidth=0.7)
ax.grid(True, which="minor", linestyle=":", linewidth=0.5) # Minor grid
# Correct Date Formatting for X-Axis
major_formatter = mdates.DateFormatter("%Y-%m-%d") # Clear date format
ax.xaxis.set_major_formatter(major_formatter)
major_locator = mdates.MonthLocator(interval=3) # Major ticks every 3 months (adjust as needed)
ax.xaxis.set_major_locator(major_locator)
minor_locator = mdates.MonthLocator() # Minor ticks every month
ax.xaxis.set_minor_locator(minor_locator)
fig.autofmt_xdate(rotation=30, ha="right") # Rotate labels for readability
# Format Y-Axis with commas for thousands
ax.get_yaxis().set_major_formatter(plt.FuncFormatter(lambda x, p: format(int(x), ",")))
# --- Regression Analysis ---
print(f"Calculating linear regression for the last {REGRESSION_MONTHS} months...")
# Determine date cutoff based on the *last data point collected*
last_data_date = dates[-1]
regression_cutoff_date = last_data_date - datetime.timedelta(days=REGRESSION_MONTHS * 30.44) # Approx months
# Filter data points that fall within the regression period
regression_points = [(dt, ct) for dt, ct in zip(dates, counts, strict=False) if dt >= regression_cutoff_date]
print(f"Found {len(regression_points)} data points since {regression_cutoff_date.date()} for regression.")
if len(regression_points) < MIN_POINTS_FOR_REGRESSION:
print(f"Not enough data points for regression (minimum {MIN_POINTS_FOR_REGRESSION}). Skipping trend line.")
else:
try:
regression_dates_dt = [p[0] for p in regression_points]
regression_counts = [p[1] for p in regression_points]
# Convert datetime objects to numerical representation for regression
regression_dates_num = mdates.date2num(regression_dates_dt)
# Perform linear regression using statistics module
lin_reg = statistics.linear_regression(regression_dates_num, regression_counts)
slope = lin_reg.slope # tokens per day (since date2num unit is days)
intercept = lin_reg.intercept
slope_per_year = slope * 365.25
print(f" Slope: {slope:.2f} tokens/day ({slope_per_year:,.0f} tokens/year)")
# Generate points for the regression line (start and end of regression period)
reg_line_x_num = np.array([regression_dates_num[0], regression_dates_num[-1]])
reg_line_y = intercept + slope * reg_line_x_num
# Convert numerical dates back to datetime for plotting
reg_line_x_dt = mdates.num2date(reg_line_x_num)
# Plot the regression line on the same axes
ax.plot(reg_line_x_dt, reg_line_y, color="red", linestyle="--", linewidth=2,
label=f"Trend (Last {REGRESSION_MONTHS} mo)")
# Add Slope Annotation Box
annotation_text = f"Slope (last {REGRESSION_MONTHS} mo):\n{slope_per_year:,.0f} tokens/year"
ax.text(0.02, 0.98, annotation_text, transform=ax.transAxes, # Position relative to axes
fontsize=9, verticalalignment="top",
bbox=dict(boxstyle="round,pad=0.4", fc="wheat", alpha=0.8)) # Background box
print("Regression line and annotation added.")
except statistics.StatisticsError as e:
print(f"Could not calculate regression: {e}. Check data variability. Skipping trend line.")
except Exception as e:
print(f"An error occurred during regression/plotting: {e}. Skipping trend line.")
# --- Final Touches ---
ax.legend(loc="best") # Add legend (loc='best' tries to find a good spot)
plt.tight_layout(pad=1.5) # Adjust layout to prevent labels overlapping axes/title
# --- Save and Show Plot ---
plot_filename = "token_count_history.png"
try:
plt.savefig(plot_filename, dpi=150, bbox_inches="tight")
print(f"\nPlot saved to {plot_filename}")
except Exception as e:
print(f"\nError saving plot to {plot_filename}: {e}")
print("Displaying plot...")
plt.show()
total_duration = time.time() - start_time
print(f"\nScript finished in {total_duration:.2f} seconds.")
@Jeremiah-England
Copy link
Author

Written by Gemini 2.5 Pro, inspired and directed by me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment