Skip to content

Instantly share code, notes, and snippets.

@thomascamminady
Created February 11, 2025 11:59
Show Gist options
  • Save thomascamminady/6cdbee274efc5b97f190c752a067464b to your computer and use it in GitHub Desktop.
Save thomascamminady/6cdbee274efc5b97f190c752a067464b to your computer and use it in GitHub Desktop.
Run Metabase queries in the command line
# /// script
# dependencies = [
# "fire",
# "polars",
# "rich",
# "redshift_connector",
# ]
# ///
import json
import os
import fire
import polars as pl
import redshift_connector
from rich import print
from rich.console import Console
from rich.table import Table
class MetabaseQueryEngine:
@staticmethod
def run(
*,
query_file: str | None = None,
query: str | None = None,
timeout: int = 10,
) -> pl.DataFrame:
"""Run a query against the Metabase database.
Parameters
----------
query_file : str | None, optional
The path to a file containing the query to run, by default None.
query : str | None, optional
The query to run, by default None.
timeout : int, optional
The number of seconds to wait for a connection to the database to be established,
by default 10.
Returns
-------
pl.DataFrame
The results of the query.
Raises
------
ValueError
If neither query_file nor query are specified.
"""
if query_file:
query = MetabaseQueryEngine._get_query(query_file)
elif query:
pass
else:
raise ValueError("Either query_file or query must be specified.")
return pl.read_database(
connection=MetabaseQueryEngine._get_connection(timeout=timeout),
query=query,
)
@staticmethod
def _get_connection(timeout: int) -> redshift_connector.Connection:
with open(os.path.expanduser("~/.wahoofitness.json")) as f:
wf = json.load(f)
try:
connection = redshift_connector.connect(
host=wf["redshift_server"],
port=wf["redshift_port"],
database=wf["redshift_database"],
user=wf["redshift_user"],
password=wf["redshift_password"],
timeout=timeout,
)
except redshift_connector.OperationalError as e:
raise ConnectionError(
f"Connection failed. Did you remember to start your VPN? {e}"
) from e
return connection
@staticmethod
def _get_query(query_file: str) -> str:
with open(query_file) as f:
return f.read()
def main(query_file: str | None = None, query: str | None = None, nice: bool = False):
df = MetabaseQueryEngine.run(query_file=query_file, query=query)
if nice:
textual_table(df, query_file, query)
else:
print(df.write_csv())
def textual_table(df, query_file, query):
table = Table(caption=query_file or query)
colors = [
"#E58606",
"#5D69B1",
"#52BCA3",
"#99C945",
"#CC61B0",
"#24796C",
"#DAA51B",
"#2F8AC4",
]
for i, column in enumerate(df.columns):
table.add_column(
column,
justify="left",
style=colors[i % len(colors)],
)
for row in df.iter_rows():
table.add_row(*[str(_) for _ in row])
console = Console()
console.clear()
console.print(table)
if __name__ == "__main__":
fire.Fire(main)
@thomascamminady
Copy link
Author

uv run \
    --python 3.11 \
    --no-project \
    --script run_query_against_redshift.py \
    --nice=True \
    --query="""SELECT * FROM wahoo_cloud.user_workout_biking_profiles LIMIT 100;"""

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment