Skip to content

Instantly share code, notes, and snippets.

@kevinpapst
Created May 14, 2026 14:14
Show Gist options
  • Select an option

  • Save kevinpapst/7b5920094887101c8d4fc4e2f65eacf2 to your computer and use it in GitHub Desktop.

Select an option

Save kevinpapst/7b5920094887101c8d4fc4e2f65eacf2 to your computer and use it in GitHub Desktop.
Migrating SimpleAnalytics.com to self-hosted Umami.is
#!/usr/bin/env python3
import argparse
import csv
import hashlib
import sys
import uuid
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Iterable
from urllib.parse import urlsplit
csv.field_size_limit(sys.maxsize)
UUID_NAMESPACE = uuid.UUID("6ba7b811-9dad-11d1-80b4-00c04fd430c8")
DEFAULT_BATCH_SIZE = 1000
DEFAULT_SESSION_TIMEOUT_MINUTES = 30
SESSION_COLUMNS = (
"session_id",
"website_id",
"browser",
"os",
"device",
"screen",
"language",
"country",
"region",
"city",
"distinct_id",
"created_at",
)
WEBSITE_EVENT_COLUMNS = (
"event_id",
"website_id",
"session_id",
"visit_id",
"created_at",
"url_path",
"url_query",
"utm_source",
"utm_medium",
"utm_campaign",
"utm_content",
"utm_term",
"referrer_path",
"referrer_query",
"referrer_domain",
"page_title",
"gclid",
"fbclid",
"msclkid",
"ttclid",
"li_fat_id",
"twclid",
"event_type",
"event_name",
"tag",
"hostname",
)
@dataclass
class VisitState:
last_seen_at: datetime
visit_index: int
visit_id: str
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Generate SQL to import legacy analytics CSV exports into Umami."
)
parser.add_argument(
"--input-file",
required=True,
help="Path to the source CSV file for a single website.",
)
parser.add_argument(
"--website-id",
required=True,
help="Umami website_id UUID for the destination website.",
)
parser.add_argument(
"--output-file",
help="Path to the generated SQL file. Defaults to <input-file>.sql",
)
parser.add_argument(
"--batch-size",
type=int,
default=DEFAULT_BATCH_SIZE,
help=f"Rows per INSERT statement. Default: {DEFAULT_BATCH_SIZE}",
)
parser.add_argument(
"--session-timeout-minutes",
type=int,
default=DEFAULT_SESSION_TIMEOUT_MINUTES,
help=(
"Gap after which a new visit is created for the same fingerprint. "
f"Default: {DEFAULT_SESSION_TIMEOUT_MINUTES}"
),
)
return parser.parse_args()
def main() -> int:
args = parse_args()
input_path = Path(args.input_file).expanduser().resolve()
output_path = (
Path(args.output_file).expanduser().resolve()
if args.output_file
else input_path.with_suffix(input_path.suffix + ".sql")
)
if args.batch_size <= 0:
raise ValueError("--batch-size must be greater than 0")
if args.session_timeout_minutes <= 0:
raise ValueError("--session-timeout-minutes must be greater than 0")
# Validate early so failures are explicit before the main pass.
uuid.UUID(args.website_id)
generate_sql(
input_path=input_path,
output_path=output_path,
website_id=args.website_id,
batch_size=args.batch_size,
session_timeout=timedelta(minutes=args.session_timeout_minutes),
)
return 0
def generate_sql(
*,
input_path: Path,
output_path: Path,
website_id: str,
batch_size: int,
session_timeout: timedelta,
) -> None:
session_rows: list[tuple[object, ...]] = []
event_rows: list[tuple[object, ...]] = []
emitted_sessions: set[str] = set()
visit_states: dict[str, VisitState] = {}
stats = {
"rows_total": 0,
"rows_imported": 0,
"rows_skipped_robots": 0,
"sessions_created": 0,
"visits_created": 0,
"events_created": 0,
}
output_path.parent.mkdir(parents=True, exist_ok=True)
with input_path.open("r", encoding="utf-8-sig", newline="") as source, output_path.open(
"w", encoding="utf-8", newline=""
) as target:
reader = csv.DictReader(source)
ensure_required_columns(reader.fieldnames or [])
write_header(
target=target,
input_path=input_path,
website_id=website_id,
batch_size=batch_size,
session_timeout=session_timeout,
)
for line_number, row in enumerate(reader, start=2):
stats["rows_total"] += 1
if is_truthy(row.get("is_robot")):
stats["rows_skipped_robots"] += 1
continue
event_timestamp = parse_timestamp(row.get("added_iso"), line_number)
fingerprint = build_fingerprint(row)
fingerprint_hash = sha256_hex("\x1f".join(fingerprint))
session_id = make_uuid("session", website_id, fingerprint_hash)
distinct_id = truncate(fingerprint_hash, 50)
visit_state = visit_states.get(fingerprint_hash)
if visit_state and event_timestamp - visit_state.last_seen_at <= session_timeout:
visit_id = visit_state.visit_id
visit_state.last_seen_at = event_timestamp
else:
visit_index = 1 if visit_state is None else visit_state.visit_index + 1
visit_id = make_uuid("visit", website_id, fingerprint_hash, str(visit_index))
visit_states[fingerprint_hash] = VisitState(
last_seen_at=event_timestamp,
visit_index=visit_index,
visit_id=visit_id,
)
stats["visits_created"] += 1
if session_id not in emitted_sessions:
session_rows.append(
build_session_row(
website_id=website_id,
session_id=session_id,
distinct_id=distinct_id,
event_timestamp=event_timestamp,
row=row,
)
)
emitted_sessions.add(session_id)
stats["sessions_created"] += 1
event_rows.append(
build_website_event_row(
website_id=website_id,
session_id=session_id,
visit_id=visit_id,
event_timestamp=event_timestamp,
row=row,
line_number=line_number,
)
)
stats["rows_imported"] += 1
stats["events_created"] += 1
if len(session_rows) >= batch_size:
flush_insert(target, "session", SESSION_COLUMNS, session_rows, "session_id")
session_rows.clear()
if len(event_rows) >= batch_size:
if session_rows:
flush_insert(target, "session", SESSION_COLUMNS, session_rows, "session_id")
session_rows.clear()
flush_insert(target, "website_event", WEBSITE_EVENT_COLUMNS, event_rows, "event_id")
event_rows.clear()
if session_rows:
flush_insert(target, "session", SESSION_COLUMNS, session_rows, "session_id")
if event_rows:
flush_insert(target, "website_event", WEBSITE_EVENT_COLUMNS, event_rows, "event_id")
target.write("COMMIT;\n")
print_summary(input_path=input_path, output_path=output_path, stats=stats)
def ensure_required_columns(fieldnames: Iterable[str]) -> None:
required = {
"added_iso",
"browser_name",
"browser_version",
"country_code",
"device_type",
"hostname",
"is_robot",
"lang_language",
"lang_region",
"os_name",
"os_version",
"path",
"path_and_query",
"query",
"referrer_hostname",
"referrer_path",
"screen_height",
"screen_width",
"session_id",
"user_agent",
"utm_campaign",
"utm_content",
"utm_medium",
"utm_source",
"utm_term",
"uuid",
}
missing = sorted(required.difference(fieldnames))
if missing:
raise ValueError(f"CSV is missing required columns: {', '.join(missing)}")
def write_header(
*,
target,
input_path: Path,
website_id: str,
batch_size: int,
session_timeout: timedelta,
) -> None:
target.write("-- Generated by migrate_simpleanalytics_to_umami.py\n")
target.write(f"-- Source file: {input_path}\n")
target.write(f"-- Destination website_id: {website_id}\n")
target.write(f"-- Batch size: {batch_size}\n")
target.write(
f"-- Session timeout: {int(session_timeout.total_seconds() // 60)} minutes\n\n"
)
target.write("BEGIN;\n\n")
def build_session_row(
*,
website_id: str,
session_id: str,
distinct_id: str,
event_timestamp: datetime,
row: dict[str, str],
) -> tuple[object, ...]:
browser = normalize_browser(row.get("browser_name"))
os_name = normalize_os(row.get("os_name"))
device = normalize_device(row.get("device_type"))
screen = normalize_screen(row.get("screen_width"), row.get("screen_height"))
language = normalize_language(row.get("lang_language"), row.get("lang_region"))
country = normalize_country(row.get("country_code"))
return (
session_id,
website_id,
browser,
os_name,
device,
screen,
language,
country,
None,
None,
distinct_id,
to_sql_timestamp(event_timestamp),
)
def build_website_event_row(
*,
website_id: str,
session_id: str,
visit_id: str,
event_timestamp: datetime,
row: dict[str, str],
line_number: int,
) -> tuple[object, ...]:
event_id = build_event_id(website_id=website_id, row=row, line_number=line_number)
url_path = normalize_url_path(row.get("path"))
url_query = normalize_query(row.get("query"))
derived_path, derived_query = split_path_and_query(row.get("path_and_query"))
if not url_path:
url_path = derived_path or "/"
if not url_query:
url_query = derived_query
referrer_domain = clean_text(row.get("referrer_hostname"))
referrer_path = clean_text(row.get("referrer_path"))
referrer_query = derive_referrer_query(row.get("document_referrer"))
hostname = truncate(clean_text(row.get("hostname")), 100)
return (
event_id,
website_id,
session_id,
visit_id,
to_sql_timestamp(event_timestamp),
truncate(url_path or "/", 500),
truncate(url_query, 500),
truncate(clean_text(row.get("utm_source")), 255),
truncate(clean_text(row.get("utm_medium")), 255),
truncate(clean_text(row.get("utm_campaign")), 255),
truncate(clean_text(row.get("utm_content")), 255),
truncate(clean_text(row.get("utm_term")), 255),
truncate(referrer_path, 500),
truncate(referrer_query, 500),
truncate(referrer_domain, 500),
None,
None,
None,
None,
None,
None,
None,
1,
None,
None,
hostname,
)
def build_event_id(*, website_id: str, row: dict[str, str], line_number: int) -> str:
source_uuid = clean_text(row.get("uuid"))
if source_uuid:
return make_uuid("event", website_id, source_uuid)
return make_uuid(
"event",
website_id,
clean_text(row.get("added_iso")) or "",
clean_text(row.get("path_and_query")) or "",
str(line_number),
)
def build_fingerprint(row: dict[str, str]) -> tuple[str, ...]:
return (
normalize_for_fingerprint(row.get("browser_name")),
normalize_for_fingerprint(row.get("browser_version")),
normalize_for_fingerprint(row.get("country_code")),
normalize_for_fingerprint(row.get("os_name")),
normalize_for_fingerprint(row.get("os_version")),
normalize_for_fingerprint(row.get("screen_height")),
normalize_for_fingerprint(row.get("screen_width")),
normalize_for_fingerprint(row.get("user_agent")),
)
def parse_timestamp(raw_value: str | None, line_number: int) -> datetime:
value = clean_text(raw_value)
if not value:
raise ValueError(f"Missing added_iso at CSV line {line_number}")
try:
parsed = datetime.fromisoformat(value.replace("Z", "+00:00"))
except ValueError as exc:
raise ValueError(f"Invalid added_iso at CSV line {line_number}: {value}") from exc
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=timezone.utc)
return parsed
def split_path_and_query(raw_value: str | None) -> tuple[str | None, str | None]:
value = clean_text(raw_value)
if not value:
return None, None
if "?" not in value:
return value, None
path, query = value.split("?", 1)
return path or "/", query or None
def derive_referrer_query(raw_value: str | None) -> str | None:
value = clean_text(raw_value)
if not value:
return None
try:
parsed = urlsplit(value)
except ValueError:
return None
return parsed.query or None
def normalize_url_path(raw_value: str | None) -> str | None:
value = clean_text(raw_value)
if not value:
return None
return value if value.startswith("/") else f"/{value}"
def normalize_query(raw_value: str | None) -> str | None:
value = clean_text(raw_value)
if not value:
return None
return value[1:] if value.startswith("?") else value
def normalize_browser(raw_value: str | None) -> str | None:
value = clean_text(raw_value)
if not value:
return None
return truncate(value.lower(), 20)
def normalize_os(raw_value: str | None) -> str | None:
value = clean_text(raw_value)
if not value:
return None
return truncate(value.lower(), 20)
def normalize_device(raw_value: str | None) -> str | None:
value = clean_text(raw_value)
if not value:
return None
lowered = value.lower()
if lowered in {"desktop", "mobile", "tablet"}:
return lowered
return truncate(lowered, 20)
def normalize_screen(width: str | None, height: str | None) -> str | None:
width_value = clean_text(width)
height_value = clean_text(height)
if not width_value or not height_value:
return None
return truncate(f"{width_value}x{height_value}", 11)
def normalize_language(language: str | None, region: str | None) -> str | None:
language_value = clean_text(language)
region_value = clean_text(region)
if not language_value and not region_value:
return None
if language_value and region_value:
return truncate(f"{language_value.lower()}-{region_value.upper()}", 35)
return truncate((language_value or region_value).lower(), 35)
def normalize_country(raw_value: str | None) -> str | None:
value = clean_text(raw_value)
if not value:
return None
return truncate(value.upper(), 2)
def normalize_for_fingerprint(raw_value: str | None) -> str:
value = clean_text(raw_value)
return value.casefold() if value else ""
def clean_text(raw_value: str | None) -> str | None:
if raw_value is None:
return None
value = raw_value.strip()
return value or None
def truncate(value: str | None, max_length: int) -> str | None:
if value is None:
return None
return value[:max_length]
def to_sql_timestamp(value: datetime) -> str:
return value.astimezone(timezone.utc).isoformat().replace("+00:00", "Z")
def sha256_hex(value: str) -> str:
return hashlib.sha256(value.encode("utf-8")).hexdigest()
def make_uuid(*parts: str) -> str:
return str(uuid.uuid5(UUID_NAMESPACE, ":".join(parts)))
def is_truthy(raw_value: str | None) -> bool:
value = clean_text(raw_value)
return value is not None and value.lower() in {"1", "true", "yes"}
def flush_insert(target, table_name: str, columns: tuple[str, ...], rows, conflict_column: str) -> None:
if not rows:
return
target.write(f"INSERT INTO {table_name} ({', '.join(columns)})\nVALUES\n")
serialized_rows = ",\n".join(f" ({serialize_row(row)})" for row in rows)
target.write(serialized_rows)
target.write(f"\nON CONFLICT ({conflict_column}) DO NOTHING;\n\n")
def serialize_row(row: tuple[object, ...]) -> str:
return ", ".join(sql_value(value) for value in row)
def sql_value(value: object) -> str:
if value is None:
return "NULL"
if isinstance(value, int):
return str(value)
text = str(value).replace("'", "''")
return f"'{text}'"
def print_summary(*, input_path: Path, output_path: Path, stats: dict[str, int]) -> None:
print(f"Source CSV: {input_path}")
print(f"Generated SQL: {output_path}")
print(f"Total CSV rows: {stats['rows_total']}")
print(f"Imported events: {stats['rows_imported']}")
print(f"Skipped robots: {stats['rows_skipped_robots']}")
print(f"Approximate visitors (session rows): {stats['sessions_created']}")
print(f"Approximate visits: {stats['visits_created']}")
print(f"Generated pageview events: {stats['events_created']}")
if __name__ == "__main__":
raise SystemExit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment