Last active
May 30, 2023 19:07
-
-
Save cpcloud/af29ff80118c2377668d271c2b397867 to your computer and use it in GitHub Desktop.
Generate ClickHouse data and run a simple benchmark
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def gen_data(*, n_ids: int, n_minutes: int): | |
import numpy as np | |
import tqdm | |
start_date = np.datetime64("2013-01-01") | |
end_date = np.datetime64("2023-05-20") | |
start_time = np.timedelta64(9, "h") | |
dates = np.arange(start_date, end_date) | |
dates = dates[np.is_busday(dates)] | |
n = n_minutes * n_ids | |
times = start_time + np.repeat(np.arange(n_minutes), n_ids).view("timedelta64[m]") | |
offsets = np.random.randint(0, 1_000_000, size=n).view("timedelta64[us]") | |
for date in tqdm.tqdm(dates, unit_scale=n): | |
yield date + times + offsets | |
np.random.shuffle(offsets) | |
def query(q): | |
import ibis | |
from ibis import _ | |
w = ibis.trailing_window(group_by=["id", "time"], order_by="date", preceding=30) | |
return q.filter(_.date >= "2023-05-01").mutate(avg=_.feature.mean().over(w)) | |
def create_table(con, *, engine: str, table_name: str, n_minutes: int, n_ids: int): | |
import pyarrow as pa | |
import ibis | |
options_mapping = { | |
"rmergetree": "ReplacingMergeTree", | |
"mergetree": "MergeTree", | |
"parquet": "File(Parquet)", | |
"native": "File(Native)", | |
"avro": "File(Avro)", | |
} | |
engine = options_mapping[engine] | |
schema = ibis.schema({"timestamp": "!timestamp(6)"}).to_pyarrow() | |
con.raw_sql("SET engine_file_allow_create_multiple_files = true") | |
con.drop_table(table_name, force=True) | |
con.raw_sql( | |
f""" | |
CREATE TABLE {table_name} ( | |
timestamp DateTime64(6) EPHEMERAL, | |
date Nullable(Date) DEFAULT IF(randUniform(0.0, 1.0) < 0.10, NULL, CAST(timestamp AS Nullable(Date))), | |
time Nullable(Time) DEFAULT IF(randUniform(0.0, 1.0) < 0.20, NULL, CAST(timestamp AS Nullable(Time))), | |
id Nullable(Int64) DEFAULT IF(randUniform(0.0, 1.0) < 0.30, NULL, CAST(round(randUniform(1.0, 3000.0)) AS Nullable(Int64))), | |
feature Nullable(Float64) DEFAULT IF(randUniform(0.0, 1.0) < 0.40, NULL, randUniform(0.0, 1.0)) | |
) ENGINE = {engine} | |
""" | |
+ ( | |
" PRIMARY KEY (date, time, id) ORDER BY (date, time, id) SETTINGS allow_nullable_key = 1" | |
if "mergetree" in engine.lower() | |
else "" | |
) | |
) | |
c = con.con | |
for timestamps in gen_data(n_ids=n_ids, n_minutes=n_minutes): | |
c.insert_arrow( | |
table_name, pa.RecordBatch.from_arrays([timestamps], schema=schema) | |
) | |
def bench(con, *, table_name: str, use_ideal: bool): | |
import time | |
print(f"table: {table_name}") | |
t = con.tables[table_name] | |
q = query(t) | |
if use_ideal: | |
sql = f"""\ | |
SELECT | |
*, | |
AVG(t0.feature) OVER (PARTITION BY t0.id, t0.time ORDER BY t0.date ROWS BETWEEN 30 PRECEDING AND 0 FOLLOWING) AS avg | |
FROM default.{table_name} AS t0 | |
WHERE | |
t0.date >= '2023-05-01'""" | |
else: | |
sql = q.compile() | |
start = time.time() | |
q.drop("timestamp").execute() | |
stop = time.time() | |
delta_ibis = stop - start | |
print(f"Ibis: {delta_ibis:.2f}s") | |
start = time.time() | |
con.con.query_df(sql) | |
stop = time.time() | |
delta_sql = stop - start | |
print(f"SQL: {delta_sql:.2f}s") | |
print(f"Ibis v. SQL: {delta_ibis - delta_sql:.2f}s") | |
def main(args): | |
"""Run with python ~/genslowdata.py.""" | |
import ibis | |
con = ibis.clickhouse.connect() | |
table_name = args.table_name | |
if args.create: | |
create_table( | |
con, | |
engine=args.engine, | |
table_name=table_name, | |
n_ids=args.n_ids, | |
n_minutes=args.n_minutes, | |
) | |
bench(con, table_name=table_name, use_ideal=args.use_ideal) | |
if __name__ == "__main__": | |
import argparse | |
p = argparse.ArgumentParser( | |
description="Test clickhouse client performance versus ibis" | |
) | |
p.add_argument( | |
"-c", "--create", action="store_true", help="Whether to create test data" | |
) | |
p.add_argument( | |
"-e", | |
"--engine", | |
default="rmergetree", | |
choices=["rmergetree", "mergetree", "native", "parquet", "avro"], | |
help="Table storage engine", | |
) | |
p.add_argument("-t", "--table-name", default="test", help="Table name") | |
p.add_argument( | |
"-i", "--n-ids", default=3000, type=int, help="Number of possible unique IDs" | |
) | |
p.add_argument( | |
"-m", "--n-minutes", default=400, type=int, help="Number of minutes per hour" | |
) | |
p.add_argument( | |
"-I", "--use-ideal", action="store_true", help="Use hand-optimized SQL" | |
) | |
main(p.parse_args()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment