Skip to content

Instantly share code, notes, and snippets.

@thomascamminady
Last active September 10, 2025 09:25
Show Gist options
  • Save thomascamminady/9c7a396461b912db4b25fd005a8ca3ea to your computer and use it in GitHub Desktop.
Save thomascamminady/9c7a396461b912db4b25fd005a8ca3ea to your computer and use it in GitHub Desktop.
fitvis.py
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "fire",
# "altair",
# "polars",
# "pycrux",
# "ipython",
# "rich"
# ]
# ///
import glob
import json
import logging
import os
from dataclasses import dataclass
from typing import Literal
import altair as alt # ty: ignore[unresolved-import]
import fire # ty: ignore[unresolved-import]
import polars as pl # ty: ignore[unresolved-import]
import pycrux # ty: ignore[unresolved-import]
class AltairConfig:
@staticmethod
def setup(
where: Literal["browser", "html", "jupyter"] = "browser",
name: str = "my_theme",
enable: bool = True,
dark_color: str = "gray",
font_weight: str = "normal",
small_font_size: int = 12,
medium_font_size: int = 14,
large_font_size: int = 18,
height: int = 500,
width: int = 1500,
spacing_facet: int = 60,
spacing_concat: int = 60,
):
"""Setup Altair."""
AltairConfig.disable_max_rows()
AltairConfig.renderers_enable(where=where)
AltairConfig.register_theme(
name=name,
enable=enable,
dark_color=dark_color,
font_weight=font_weight,
small_font_size=small_font_size,
medium_font_size=medium_font_size,
large_font_size=large_font_size,
height=height,
width=width,
spacing_facet=spacing_facet,
spacing_concat=spacing_concat,
)
@staticmethod
def disable_max_rows():
"""Disables the max rows limit for Altair."""
alt.data_transformers.disable_max_rows()
@staticmethod
def renderers_enable(where: Literal["browser", "html", "jupyter"]):
"""Enables Altair renderers."""
alt.renderers.enable(where)
@staticmethod
def register_theme(
name: str,
enable: bool,
dark_color: str,
font_weight: str,
small_font_size: int,
medium_font_size: int,
large_font_size: int,
height: int,
width: int,
spacing_facet: int,
spacing_concat: int,
):
"""Registers my custom Altair theme."""
@alt.theme.register(name, enable=enable)
def loader():
return json.loads(
json.dumps(
{
"config": {
"text": {
"color": dark_color,
"fontSize": small_font_size,
},
"header": {
"titleFontSize": large_font_size,
"labelFontSize": medium_font_size,
"color": dark_color,
"titleColor": dark_color,
"labelColor": dark_color,
"fontWeight": font_weight,
"titleFontWeight": font_weight,
"labelFontWeight": font_weight,
},
"view": {
"height": height,
"width": width,
"strokeWidth": 0,
"fill": "white",
},
"facet": {
"spacing": spacing_facet,
},
"concat": {
"spacing": spacing_concat,
},
"axis": {
"domain": True,
"domainColor": dark_color,
"domainWidth": 1,
"gridWidth": 1,
"labelAngle": 0,
"tickSize": 5,
"gridCap": "round",
"gridDash": [2, 4],
"fontWeight": font_weight,
"titleFontWeight": font_weight,
"labelFontWeight": font_weight,
"titleFontSize": medium_font_size,
"labelFontSize": small_font_size,
"color": dark_color,
"titleColor": dark_color,
"labelColor": dark_color,
"tickColor": dark_color,
},
"axisX": {
"titleAnchor": "end",
"titleAlign": "center",
},
"axisY": {
"titleAnchor": "end",
"titleAngle": 0,
"titleAlign": "center",
"titleY": -15,
"titleX": 0,
},
"legend": {
"fontWeight": font_weight,
"titleFontWeight": font_weight,
"labelFontWeight": font_weight,
"titleFontSize": medium_font_size,
"labelFontSize": small_font_size,
"labelLimit": 0,
"color": dark_color,
"titleColor": dark_color,
"labelColor": dark_color,
"tickColor": dark_color,
"domainColor": dark_color,
},
"title": {
"anchor": "middle",
"fontWeight": font_weight,
"titleFontWeight": font_weight,
"labelFontWeight": font_weight,
"titleFontSize": large_font_size,
"labelFontSize": small_font_size,
"color": dark_color,
"titleColor": dark_color,
"labelColor": dark_color,
"tickColor": dark_color,
"domainColor": dark_color,
},
}
}
)
)
@dataclass
class Config:
folder: str
crux_common: str
height: int = 300
width: int = 2000
zero: bool = False
reference_file: str | None = None
rolling_window: int | None = None
silent: bool = False
cleanup: bool = True
def move_files(df: pl.DataFrame, config: Config) -> None:
files = df["file"].unique()
for file in files:
# if not exists, create a hidden folder with the same name but a dot prefix
hidden_folder = os.path.join(config.folder, f".crfittool_{file}")
if not os.path.exists(hidden_folder):
os.makedirs(hidden_folder)
# try to move all files into that hidden folder that contain the
# file name and end in .csv
for f in glob.glob(os.path.join(config.folder, f"*{file}*.csv")):
try:
os.rename(f, os.path.join(hidden_folder, os.path.basename(f)))
except Exception as e:
logging.warning(f"Failed to move {f} to {hidden_folder}: {e}")
def io(config: Config) -> pl.DataFrame:
_df_list: list[pl.DataFrame] = []
crux = pycrux.Crux(config.crux_common)
files = (
glob.glob(os.path.join(config.folder, "*.fit"))
+ glob.glob(os.path.join(config.folder, "*.FIT"))
+ glob.glob(os.path.join(config.folder, "*.Fit"))
)
for file in files:
try:
_df = (
pl.DataFrame(crux.read_fit(file))
.with_columns(
file=pl.lit(os.path.basename(file)),
folder=pl.lit(config.folder),
)
.select(
"timestamp",
"file",
"folder",
"cad_rpm",
"pwr_watts",
"spd_mps",
)
)
if config.rolling_window:
_df = _df.with_columns(
pl.col("cad_rpm", "pwr_watts", "spd_mps").rolling_mean(
window_size=config.rolling_window, center=True
)
)
_df_list.append(_df)
except Exception as e:
logging.warning(f"Failed to read {file}: {e}")
try:
df = pl.concat(_df_list, how="diagonal_relaxed")
except Exception as e:
raise ValueError(f"Failed to create df: {e}") from e
try:
# Try to convert "timestamp" which looks like this: 2025-09-04T08:00:46.000Z to datetime
df = df.with_columns(
pl.col("timestamp").str.strptime(
pl.Datetime, format="%Y-%m-%dT%H:%M:%S%.3fZ"
)
)
except Exception as e:
logging.warning(f"Failed to convert timestamp: {e}")
try:
# drop columns that are all null
def drop_columns_that_are_all_null(_df: pl.DataFrame) -> pl.DataFrame:
return _df[
[s.name for s in _df if not (s.null_count() == _df.height)]
]
df = df.pipe(drop_columns_that_are_all_null)
except Exception as e:
logging.warning(f"Failed to drop columns that are all null: {e}")
if (
config.reference_file is not None
and config.reference_file in df["file"].unique()
):
df = (
df.join(
df.filter(pl.col("file") == config.reference_file)
.select("timestamp", "pwr_watts")
.rename({"pwr_watts": "pwr_watts_reference"}),
on="timestamp",
how="left",
)
.with_columns(
pwr_watts_diff=pl.col("pwr_watts")
- pl.col("pwr_watts_reference")
)
.with_columns(
pwr_watts_diff_hyb=100
* pl.col("pwr_watts_diff")
/ pl.col("pwr_watts_reference").clip(250, None)
)
)
return df
def plot(df: pl.DataFrame, config: Config) -> alt.Chart:
AltairConfig.setup()
if not config.silent:
print("\nIf you want to define a reference, use the flag")
print("--reference_file=<filename>")
print("Options are:")
for file in df["file"].unique():
print(f"""--reference_file="{file}" """)
interval = alt.selection_interval(encodings=["x"])
legend_sel = alt.selection_point(fields=["file"], bind="legend")
def make_zoomable_chart(y, color_col="file", title: str | None = None):
base = alt.Chart(df).encode(
x=alt.X("timestamp:T", title="Timestamp"),
y=alt.Y(f"{y}:Q", title=title, scale=alt.Scale(zero=config.zero)),
color=alt.Color(f"{color_col}:N"),
opacity=alt.condition(legend_sel, alt.value(1), alt.value(0.12)),
)
# TOP: focus (filtered by brush)
chart = (
base.transform_filter(interval)
.transform_filter(legend_sel)
.mark_line()
.properties(
width=config.width, height=config.height, title=config.folder
)
)
# BOTTOM: overview with same brush
view = (
base.mark_line()
.add_params(interval)
.properties(width=config.width, height=60)
)
# Mean table over selected window
table = (
base.transform_filter(interval)
.transform_filter(legend_sel)
.mark_text(align="center", fontSize=30)
.encode(
x=alt.X("file:N", title=None, axis=alt.Axis(labelLimit=0)),
text=alt.Text(f"mean({y}):Q", format=".2f"),
y=alt.value(0),
)
.properties(width=config.width, height=30, title=f"Mean {y}")
)
return alt.vconcat(chart, view, table, spacing=10)
charts = [
make_zoomable_chart("pwr_watts", title="Power (Watts)"),
]
if "pwr_watts_diff" in df.columns:
charts.append(
make_zoomable_chart("pwr_watts_diff", title="Power Error (Watts) ")
)
if "pwr_watts_diff_hyb" in df.columns:
charts.append(
make_zoomable_chart(
"pwr_watts_diff_hyb", title="Hyb. Power Error (%) "
)
)
charts.append(
make_zoomable_chart("cad_rpm", title="Cadence (RPM)"),
)
charts.append(
make_zoomable_chart("spd_mps", title="Speed (m/s)"),
)
chart = alt.vconcat(*charts, spacing=80).resolve_scale(
y="independent", color="independent"
)
# Add sliders + interactions at the top level so they affect both charts
return chart.add_params(legend_sel)
def main(
folder: str,
crux_common: str,
height: int = 200,
width: int = 2000,
zero: bool = False,
reference_file: str | None = None,
rolling_window: int | None = None,
silent: bool = False,
cleanup: bool = True,
):
config = Config(
folder=folder,
crux_common=crux_common,
height=height,
width=width,
zero=zero,
reference_file=reference_file,
rolling_window=rolling_window,
silent=silent,
cleanup=cleanup,
)
df = io(config)
chart = plot(df, config)
chart.save(os.path.join(config.folder, "chart.html"))
chart.show()
if config.cleanup:
move_files(df, config)
if __name__ == "__main__":
"""
uv run \
fitvis.py \
--crux_common="/YOUR/PATH/TO/crux_common/" \
--width=1400 \
--height=300 \
--zero=False \
--folder="/YOUR/PATH/TO/DATASET/xyz" \
--rolling_window=5 \
--silent=False \
--reference_file="A FIT FILE INSIDE xyz" \
--cleanup=True ;
"""
fire.Fire(main)
@thomascamminady
Copy link
Author

thomascamminady commented Sep 10, 2025

  1. Install uv: brew install uv
  2. Download script and save as fitvis.py
  3. Run script:
uv run \
        fitvis.py \
        --crux_common="/YOUR/PATH/TO/crux_common/" \
        --width=1400 \
        --height=300 \
        --zero=False \
        --folder="/YOUR/PATH/TO/DATASET/xyz" \
        --rolling_window=5 \
        --silent=False \
        --reference_file="A FIT FILE INSIDE xyz" \
        --cleanup=True ;

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment