Last active
September 10, 2025 09:25
-
-
Save thomascamminady/9c7a396461b912db4b25fd005a8ca3ea to your computer and use it in GitHub Desktop.
fitvis.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env -S uv run --script | |
# /// script | |
# requires-python = ">=3.11" | |
# dependencies = [ | |
# "fire", | |
# "altair", | |
# "polars", | |
# "pycrux", | |
# "ipython", | |
# "rich" | |
# ] | |
# /// | |
import glob | |
import json | |
import logging | |
import os | |
from dataclasses import dataclass | |
from typing import Literal | |
import altair as alt # ty: ignore[unresolved-import] | |
import fire # ty: ignore[unresolved-import] | |
import polars as pl # ty: ignore[unresolved-import] | |
import pycrux # ty: ignore[unresolved-import] | |
class AltairConfig: | |
@staticmethod | |
def setup( | |
where: Literal["browser", "html", "jupyter"] = "browser", | |
name: str = "my_theme", | |
enable: bool = True, | |
dark_color: str = "gray", | |
font_weight: str = "normal", | |
small_font_size: int = 12, | |
medium_font_size: int = 14, | |
large_font_size: int = 18, | |
height: int = 500, | |
width: int = 1500, | |
spacing_facet: int = 60, | |
spacing_concat: int = 60, | |
): | |
"""Setup Altair.""" | |
AltairConfig.disable_max_rows() | |
AltairConfig.renderers_enable(where=where) | |
AltairConfig.register_theme( | |
name=name, | |
enable=enable, | |
dark_color=dark_color, | |
font_weight=font_weight, | |
small_font_size=small_font_size, | |
medium_font_size=medium_font_size, | |
large_font_size=large_font_size, | |
height=height, | |
width=width, | |
spacing_facet=spacing_facet, | |
spacing_concat=spacing_concat, | |
) | |
@staticmethod | |
def disable_max_rows(): | |
"""Disables the max rows limit for Altair.""" | |
alt.data_transformers.disable_max_rows() | |
@staticmethod | |
def renderers_enable(where: Literal["browser", "html", "jupyter"]): | |
"""Enables Altair renderers.""" | |
alt.renderers.enable(where) | |
@staticmethod | |
def register_theme( | |
name: str, | |
enable: bool, | |
dark_color: str, | |
font_weight: str, | |
small_font_size: int, | |
medium_font_size: int, | |
large_font_size: int, | |
height: int, | |
width: int, | |
spacing_facet: int, | |
spacing_concat: int, | |
): | |
"""Registers my custom Altair theme.""" | |
@alt.theme.register(name, enable=enable) | |
def loader(): | |
return json.loads( | |
json.dumps( | |
{ | |
"config": { | |
"text": { | |
"color": dark_color, | |
"fontSize": small_font_size, | |
}, | |
"header": { | |
"titleFontSize": large_font_size, | |
"labelFontSize": medium_font_size, | |
"color": dark_color, | |
"titleColor": dark_color, | |
"labelColor": dark_color, | |
"fontWeight": font_weight, | |
"titleFontWeight": font_weight, | |
"labelFontWeight": font_weight, | |
}, | |
"view": { | |
"height": height, | |
"width": width, | |
"strokeWidth": 0, | |
"fill": "white", | |
}, | |
"facet": { | |
"spacing": spacing_facet, | |
}, | |
"concat": { | |
"spacing": spacing_concat, | |
}, | |
"axis": { | |
"domain": True, | |
"domainColor": dark_color, | |
"domainWidth": 1, | |
"gridWidth": 1, | |
"labelAngle": 0, | |
"tickSize": 5, | |
"gridCap": "round", | |
"gridDash": [2, 4], | |
"fontWeight": font_weight, | |
"titleFontWeight": font_weight, | |
"labelFontWeight": font_weight, | |
"titleFontSize": medium_font_size, | |
"labelFontSize": small_font_size, | |
"color": dark_color, | |
"titleColor": dark_color, | |
"labelColor": dark_color, | |
"tickColor": dark_color, | |
}, | |
"axisX": { | |
"titleAnchor": "end", | |
"titleAlign": "center", | |
}, | |
"axisY": { | |
"titleAnchor": "end", | |
"titleAngle": 0, | |
"titleAlign": "center", | |
"titleY": -15, | |
"titleX": 0, | |
}, | |
"legend": { | |
"fontWeight": font_weight, | |
"titleFontWeight": font_weight, | |
"labelFontWeight": font_weight, | |
"titleFontSize": medium_font_size, | |
"labelFontSize": small_font_size, | |
"labelLimit": 0, | |
"color": dark_color, | |
"titleColor": dark_color, | |
"labelColor": dark_color, | |
"tickColor": dark_color, | |
"domainColor": dark_color, | |
}, | |
"title": { | |
"anchor": "middle", | |
"fontWeight": font_weight, | |
"titleFontWeight": font_weight, | |
"labelFontWeight": font_weight, | |
"titleFontSize": large_font_size, | |
"labelFontSize": small_font_size, | |
"color": dark_color, | |
"titleColor": dark_color, | |
"labelColor": dark_color, | |
"tickColor": dark_color, | |
"domainColor": dark_color, | |
}, | |
} | |
} | |
) | |
) | |
@dataclass | |
class Config: | |
folder: str | |
crux_common: str | |
height: int = 300 | |
width: int = 2000 | |
zero: bool = False | |
reference_file: str | None = None | |
rolling_window: int | None = None | |
silent: bool = False | |
cleanup: bool = True | |
def move_files(df: pl.DataFrame, config: Config) -> None: | |
files = df["file"].unique() | |
for file in files: | |
# if not exists, create a hidden folder with the same name but a dot prefix | |
hidden_folder = os.path.join(config.folder, f".crfittool_{file}") | |
if not os.path.exists(hidden_folder): | |
os.makedirs(hidden_folder) | |
# try to move all files into that hidden folder that contain the | |
# file name and end in .csv | |
for f in glob.glob(os.path.join(config.folder, f"*{file}*.csv")): | |
try: | |
os.rename(f, os.path.join(hidden_folder, os.path.basename(f))) | |
except Exception as e: | |
logging.warning(f"Failed to move {f} to {hidden_folder}: {e}") | |
def io(config: Config) -> pl.DataFrame: | |
_df_list: list[pl.DataFrame] = [] | |
crux = pycrux.Crux(config.crux_common) | |
files = ( | |
glob.glob(os.path.join(config.folder, "*.fit")) | |
+ glob.glob(os.path.join(config.folder, "*.FIT")) | |
+ glob.glob(os.path.join(config.folder, "*.Fit")) | |
) | |
for file in files: | |
try: | |
_df = ( | |
pl.DataFrame(crux.read_fit(file)) | |
.with_columns( | |
file=pl.lit(os.path.basename(file)), | |
folder=pl.lit(config.folder), | |
) | |
.select( | |
"timestamp", | |
"file", | |
"folder", | |
"cad_rpm", | |
"pwr_watts", | |
"spd_mps", | |
) | |
) | |
if config.rolling_window: | |
_df = _df.with_columns( | |
pl.col("cad_rpm", "pwr_watts", "spd_mps").rolling_mean( | |
window_size=config.rolling_window, center=True | |
) | |
) | |
_df_list.append(_df) | |
except Exception as e: | |
logging.warning(f"Failed to read {file}: {e}") | |
try: | |
df = pl.concat(_df_list, how="diagonal_relaxed") | |
except Exception as e: | |
raise ValueError(f"Failed to create df: {e}") from e | |
try: | |
# Try to convert "timestamp" which looks like this: 2025-09-04T08:00:46.000Z to datetime | |
df = df.with_columns( | |
pl.col("timestamp").str.strptime( | |
pl.Datetime, format="%Y-%m-%dT%H:%M:%S%.3fZ" | |
) | |
) | |
except Exception as e: | |
logging.warning(f"Failed to convert timestamp: {e}") | |
try: | |
# drop columns that are all null | |
def drop_columns_that_are_all_null(_df: pl.DataFrame) -> pl.DataFrame: | |
return _df[ | |
[s.name for s in _df if not (s.null_count() == _df.height)] | |
] | |
df = df.pipe(drop_columns_that_are_all_null) | |
except Exception as e: | |
logging.warning(f"Failed to drop columns that are all null: {e}") | |
if ( | |
config.reference_file is not None | |
and config.reference_file in df["file"].unique() | |
): | |
df = ( | |
df.join( | |
df.filter(pl.col("file") == config.reference_file) | |
.select("timestamp", "pwr_watts") | |
.rename({"pwr_watts": "pwr_watts_reference"}), | |
on="timestamp", | |
how="left", | |
) | |
.with_columns( | |
pwr_watts_diff=pl.col("pwr_watts") | |
- pl.col("pwr_watts_reference") | |
) | |
.with_columns( | |
pwr_watts_diff_hyb=100 | |
* pl.col("pwr_watts_diff") | |
/ pl.col("pwr_watts_reference").clip(250, None) | |
) | |
) | |
return df | |
def plot(df: pl.DataFrame, config: Config) -> alt.Chart: | |
AltairConfig.setup() | |
if not config.silent: | |
print("\nIf you want to define a reference, use the flag") | |
print("--reference_file=<filename>") | |
print("Options are:") | |
for file in df["file"].unique(): | |
print(f"""--reference_file="{file}" """) | |
interval = alt.selection_interval(encodings=["x"]) | |
legend_sel = alt.selection_point(fields=["file"], bind="legend") | |
def make_zoomable_chart(y, color_col="file", title: str | None = None): | |
base = alt.Chart(df).encode( | |
x=alt.X("timestamp:T", title="Timestamp"), | |
y=alt.Y(f"{y}:Q", title=title, scale=alt.Scale(zero=config.zero)), | |
color=alt.Color(f"{color_col}:N"), | |
opacity=alt.condition(legend_sel, alt.value(1), alt.value(0.12)), | |
) | |
# TOP: focus (filtered by brush) | |
chart = ( | |
base.transform_filter(interval) | |
.transform_filter(legend_sel) | |
.mark_line() | |
.properties( | |
width=config.width, height=config.height, title=config.folder | |
) | |
) | |
# BOTTOM: overview with same brush | |
view = ( | |
base.mark_line() | |
.add_params(interval) | |
.properties(width=config.width, height=60) | |
) | |
# Mean table over selected window | |
table = ( | |
base.transform_filter(interval) | |
.transform_filter(legend_sel) | |
.mark_text(align="center", fontSize=30) | |
.encode( | |
x=alt.X("file:N", title=None, axis=alt.Axis(labelLimit=0)), | |
text=alt.Text(f"mean({y}):Q", format=".2f"), | |
y=alt.value(0), | |
) | |
.properties(width=config.width, height=30, title=f"Mean {y}") | |
) | |
return alt.vconcat(chart, view, table, spacing=10) | |
charts = [ | |
make_zoomable_chart("pwr_watts", title="Power (Watts)"), | |
] | |
if "pwr_watts_diff" in df.columns: | |
charts.append( | |
make_zoomable_chart("pwr_watts_diff", title="Power Error (Watts) ") | |
) | |
if "pwr_watts_diff_hyb" in df.columns: | |
charts.append( | |
make_zoomable_chart( | |
"pwr_watts_diff_hyb", title="Hyb. Power Error (%) " | |
) | |
) | |
charts.append( | |
make_zoomable_chart("cad_rpm", title="Cadence (RPM)"), | |
) | |
charts.append( | |
make_zoomable_chart("spd_mps", title="Speed (m/s)"), | |
) | |
chart = alt.vconcat(*charts, spacing=80).resolve_scale( | |
y="independent", color="independent" | |
) | |
# Add sliders + interactions at the top level so they affect both charts | |
return chart.add_params(legend_sel) | |
def main( | |
folder: str, | |
crux_common: str, | |
height: int = 200, | |
width: int = 2000, | |
zero: bool = False, | |
reference_file: str | None = None, | |
rolling_window: int | None = None, | |
silent: bool = False, | |
cleanup: bool = True, | |
): | |
config = Config( | |
folder=folder, | |
crux_common=crux_common, | |
height=height, | |
width=width, | |
zero=zero, | |
reference_file=reference_file, | |
rolling_window=rolling_window, | |
silent=silent, | |
cleanup=cleanup, | |
) | |
df = io(config) | |
chart = plot(df, config) | |
chart.save(os.path.join(config.folder, "chart.html")) | |
chart.show() | |
if config.cleanup: | |
move_files(df, config) | |
if __name__ == "__main__": | |
""" | |
uv run \ | |
fitvis.py \ | |
--crux_common="/YOUR/PATH/TO/crux_common/" \ | |
--width=1400 \ | |
--height=300 \ | |
--zero=False \ | |
--folder="/YOUR/PATH/TO/DATASET/xyz" \ | |
--rolling_window=5 \ | |
--silent=False \ | |
--reference_file="A FIT FILE INSIDE xyz" \ | |
--cleanup=True ; | |
""" | |
fire.Fire(main) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
uv
:brew install uv
fitvis.py