Last active
August 28, 2025 13:21
-
-
Save Ruchip16/dfe546d1153afcbea958d84ae5c672fc to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import sys | |
| from pathlib import Path | |
| import pandas as pd | |
| import numpy as np | |
| INPUT_FILE = Path("top_1000_vendors.csv") | |
| OUTPUT_FILE = Path("vendor_stability_mapping2.csv") | |
| CUTOFF_DATE = "2024-06-01" | |
| # Accept common column name variants | |
| VENDOR_ID_COLS = ["invoice_vendor_id"] | |
| DATE_COLS = ["spend_month"] | |
| SPEND_COL_CANDIDATES = ["monthly_spend", "spend", "total_spend"] | |
| def coef_var(series: pd.Series) -> float: | |
| s = pd.to_numeric(series, errors="coerce").dropna() | |
| if len(s) < 2: | |
| return 0.0 | |
| mean = s.mean() | |
| std = s.std(ddof=1) | |
| if mean == 0 or np.isclose(mean, 0.0): | |
| return float(np.inf if std > 0 else 0.0) | |
| return float(std / mean) | |
| def summarize(group: pd.DataFrame, spend_col: str) -> pd.Series: | |
| s = pd.to_numeric(group[spend_col], errors="coerce").dropna() | |
| if s.empty: | |
| return pd.Series({"months": 0, "mean_spend": 0.0, "std_spend": 0.0, "cv": 0.0}) | |
| return pd.Series({ | |
| "months": int(len(s)), | |
| "mean_spend": float(s.mean()), | |
| "std_spend": float(s.std(ddof=1)) if len(s) > 1 else 0.0, | |
| "cv": coef_var(s), | |
| }) | |
| def main(): | |
| if not INPUT_FILE.exists(): | |
| sys.exit(f"ERROR: Could not find input file: {INPUT_FILE.resolve()}") | |
| df = pd.read_csv(INPUT_FILE) | |
| # Detect columns | |
| vendor_col = next((c for c in VENDOR_ID_COLS if c in df.columns), None) | |
| date_col = next((c for c in DATE_COLS if c in df.columns), None) | |
| spend_col = next((c for c in SPEND_COL_CANDIDATES if c in df.columns), None) | |
| missing = [name for name, val in [("vendor_id", vendor_col), ("date", date_col), ("spend", spend_col)] if val is None] | |
| if missing: | |
| sys.exit(f"ERROR: Missing required columns: {missing}. Present: {df.columns.tolist()}") | |
| # Normalize / clean | |
| df = df[[vendor_col, date_col, spend_col]].copy() | |
| df[date_col] = pd.to_datetime(df[date_col], errors="coerce") | |
| df = df.dropna(subset=[date_col]) | |
| df["month"] = df[date_col].dt.to_period("M").dt.to_timestamp() | |
| df[spend_col] = pd.to_numeric(df[spend_col], errors="coerce") | |
| df = df.dropna(subset=[spend_col]) | |
| df = df[df[spend_col] >= 0] | |
| # Aggregate to vendor-month (in case of duplicates) | |
| monthly = (df.groupby([vendor_col, "month"], as_index=False)[spend_col].sum() | |
| .sort_values([vendor_col, "month"])) | |
| # Train/Test split | |
| cutoff = pd.to_datetime(CUTOFF_DATE) | |
| train = monthly[monthly["month"] <= cutoff].copy() | |
| test = monthly[monthly["month"] > cutoff].copy() | |
| # Summaries | |
| train_sum = (train.groupby(vendor_col) | |
| .apply(lambda g: summarize(g, spend_col)) | |
| .reset_index() | |
| .rename(columns={"cv":"cv_train", "months":"months_train", | |
| "mean_spend":"mean_spend_train", "std_spend":"std_spend_train"})) | |
| test_sum = (test.groupby(vendor_col) | |
| .apply(lambda g: summarize(g, spend_col)) | |
| .reset_index() | |
| .rename(columns={"cv":"cv_test", "months":"months_test", | |
| "mean_spend":"mean_spend_test", "std_spend":"std_spend_test"})) | |
| summary = pd.merge(train_sum, test_sum, on=vendor_col, how="left") | |
| # Data-driven thresholds from TRAIN CV distribution (terciles) | |
| cv_series = summary["cv_train"].replace([np.inf, -np.inf], np.nan).dropna() | |
| if len(cv_series) == 0: | |
| q1 = q2 = np.nan | |
| else: | |
| q1 = float(cv_series.quantile(1/3)) | |
| q2 = float(cv_series.quantile(2/3)) | |
| def categorize(cv): | |
| if pd.isna(cv): | |
| return np.nan | |
| if pd.isna(q1) or pd.isna(q2): | |
| return "Moderately Stable" | |
| if cv <= q1: | |
| return "Highly Stable" | |
| elif cv <= q2: | |
| return "Moderately Stable" | |
| else: | |
| return "Low Stable" | |
| summary["stability_class_train"] = summary["cv_train"].apply(categorize) | |
| summary["stability_class_test"] = summary["cv_test"].apply(categorize) | |
| summary["cv_delta"] = summary["cv_test"] - summary["cv_train"] | |
| summary["bucket_shift"] = summary["stability_class_train"] != summary["stability_class_test"] | |
| # Write full diagnostics (per-vendor CVs + classes) | |
| out_cols = [ | |
| vendor_col, | |
| "cv_train", "stability_class_train", | |
| "cv_test", "stability_class_test", | |
| "cv_delta", "bucket_shift", | |
| "months_train","mean_spend_train","std_spend_train", | |
| "months_test","mean_spend_test","std_spend_test" | |
| ] | |
| out_cols = [c for c in out_cols if c in summary.columns] | |
| summary[out_cols].to_csv(OUTPUT_FILE, index=False) | |
| # Console summary | |
| counts = summary["stability_class_train"].value_counts(dropna=False).to_dict() | |
| print(f"Cutoff date: {CUTOFF_DATE}") | |
| if not np.isnan(q1): | |
| print(f"Terciles (train CV): q1={q1:.4f}, q2={q2:.4f}") | |
| else: | |
| print("Terciles (train CV): unavailable") | |
| print(f"Saved mapping -> {OUTPUT_FILE.resolve()}") | |
| print("Counts (train):", counts) | |
| print("Vendors shifted:", int(summary['bucket_shift'].fillna(False).sum())) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment