Created
May 14, 2026 14:14
-
-
Save kevinpapst/7b5920094887101c8d4fc4e2f65eacf2 to your computer and use it in GitHub Desktop.
Migrating SimpleAnalytics.com to self-hosted Umami.is
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import argparse | |
| import csv | |
| import hashlib | |
| import sys | |
| import uuid | |
| from dataclasses import dataclass | |
| from datetime import datetime, timedelta, timezone | |
| from pathlib import Path | |
| from typing import Iterable | |
| from urllib.parse import urlsplit | |
| csv.field_size_limit(sys.maxsize) | |
| UUID_NAMESPACE = uuid.UUID("6ba7b811-9dad-11d1-80b4-00c04fd430c8") | |
| DEFAULT_BATCH_SIZE = 1000 | |
| DEFAULT_SESSION_TIMEOUT_MINUTES = 30 | |
| SESSION_COLUMNS = ( | |
| "session_id", | |
| "website_id", | |
| "browser", | |
| "os", | |
| "device", | |
| "screen", | |
| "language", | |
| "country", | |
| "region", | |
| "city", | |
| "distinct_id", | |
| "created_at", | |
| ) | |
| WEBSITE_EVENT_COLUMNS = ( | |
| "event_id", | |
| "website_id", | |
| "session_id", | |
| "visit_id", | |
| "created_at", | |
| "url_path", | |
| "url_query", | |
| "utm_source", | |
| "utm_medium", | |
| "utm_campaign", | |
| "utm_content", | |
| "utm_term", | |
| "referrer_path", | |
| "referrer_query", | |
| "referrer_domain", | |
| "page_title", | |
| "gclid", | |
| "fbclid", | |
| "msclkid", | |
| "ttclid", | |
| "li_fat_id", | |
| "twclid", | |
| "event_type", | |
| "event_name", | |
| "tag", | |
| "hostname", | |
| ) | |
| @dataclass | |
| class VisitState: | |
| last_seen_at: datetime | |
| visit_index: int | |
| visit_id: str | |
| def parse_args() -> argparse.Namespace: | |
| parser = argparse.ArgumentParser( | |
| description="Generate SQL to import legacy analytics CSV exports into Umami." | |
| ) | |
| parser.add_argument( | |
| "--input-file", | |
| required=True, | |
| help="Path to the source CSV file for a single website.", | |
| ) | |
| parser.add_argument( | |
| "--website-id", | |
| required=True, | |
| help="Umami website_id UUID for the destination website.", | |
| ) | |
| parser.add_argument( | |
| "--output-file", | |
| help="Path to the generated SQL file. Defaults to <input-file>.sql", | |
| ) | |
| parser.add_argument( | |
| "--batch-size", | |
| type=int, | |
| default=DEFAULT_BATCH_SIZE, | |
| help=f"Rows per INSERT statement. Default: {DEFAULT_BATCH_SIZE}", | |
| ) | |
| parser.add_argument( | |
| "--session-timeout-minutes", | |
| type=int, | |
| default=DEFAULT_SESSION_TIMEOUT_MINUTES, | |
| help=( | |
| "Gap after which a new visit is created for the same fingerprint. " | |
| f"Default: {DEFAULT_SESSION_TIMEOUT_MINUTES}" | |
| ), | |
| ) | |
| return parser.parse_args() | |
| def main() -> int: | |
| args = parse_args() | |
| input_path = Path(args.input_file).expanduser().resolve() | |
| output_path = ( | |
| Path(args.output_file).expanduser().resolve() | |
| if args.output_file | |
| else input_path.with_suffix(input_path.suffix + ".sql") | |
| ) | |
| if args.batch_size <= 0: | |
| raise ValueError("--batch-size must be greater than 0") | |
| if args.session_timeout_minutes <= 0: | |
| raise ValueError("--session-timeout-minutes must be greater than 0") | |
| # Validate early so failures are explicit before the main pass. | |
| uuid.UUID(args.website_id) | |
| generate_sql( | |
| input_path=input_path, | |
| output_path=output_path, | |
| website_id=args.website_id, | |
| batch_size=args.batch_size, | |
| session_timeout=timedelta(minutes=args.session_timeout_minutes), | |
| ) | |
| return 0 | |
| def generate_sql( | |
| *, | |
| input_path: Path, | |
| output_path: Path, | |
| website_id: str, | |
| batch_size: int, | |
| session_timeout: timedelta, | |
| ) -> None: | |
| session_rows: list[tuple[object, ...]] = [] | |
| event_rows: list[tuple[object, ...]] = [] | |
| emitted_sessions: set[str] = set() | |
| visit_states: dict[str, VisitState] = {} | |
| stats = { | |
| "rows_total": 0, | |
| "rows_imported": 0, | |
| "rows_skipped_robots": 0, | |
| "sessions_created": 0, | |
| "visits_created": 0, | |
| "events_created": 0, | |
| } | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| with input_path.open("r", encoding="utf-8-sig", newline="") as source, output_path.open( | |
| "w", encoding="utf-8", newline="" | |
| ) as target: | |
| reader = csv.DictReader(source) | |
| ensure_required_columns(reader.fieldnames or []) | |
| write_header( | |
| target=target, | |
| input_path=input_path, | |
| website_id=website_id, | |
| batch_size=batch_size, | |
| session_timeout=session_timeout, | |
| ) | |
| for line_number, row in enumerate(reader, start=2): | |
| stats["rows_total"] += 1 | |
| if is_truthy(row.get("is_robot")): | |
| stats["rows_skipped_robots"] += 1 | |
| continue | |
| event_timestamp = parse_timestamp(row.get("added_iso"), line_number) | |
| fingerprint = build_fingerprint(row) | |
| fingerprint_hash = sha256_hex("\x1f".join(fingerprint)) | |
| session_id = make_uuid("session", website_id, fingerprint_hash) | |
| distinct_id = truncate(fingerprint_hash, 50) | |
| visit_state = visit_states.get(fingerprint_hash) | |
| if visit_state and event_timestamp - visit_state.last_seen_at <= session_timeout: | |
| visit_id = visit_state.visit_id | |
| visit_state.last_seen_at = event_timestamp | |
| else: | |
| visit_index = 1 if visit_state is None else visit_state.visit_index + 1 | |
| visit_id = make_uuid("visit", website_id, fingerprint_hash, str(visit_index)) | |
| visit_states[fingerprint_hash] = VisitState( | |
| last_seen_at=event_timestamp, | |
| visit_index=visit_index, | |
| visit_id=visit_id, | |
| ) | |
| stats["visits_created"] += 1 | |
| if session_id not in emitted_sessions: | |
| session_rows.append( | |
| build_session_row( | |
| website_id=website_id, | |
| session_id=session_id, | |
| distinct_id=distinct_id, | |
| event_timestamp=event_timestamp, | |
| row=row, | |
| ) | |
| ) | |
| emitted_sessions.add(session_id) | |
| stats["sessions_created"] += 1 | |
| event_rows.append( | |
| build_website_event_row( | |
| website_id=website_id, | |
| session_id=session_id, | |
| visit_id=visit_id, | |
| event_timestamp=event_timestamp, | |
| row=row, | |
| line_number=line_number, | |
| ) | |
| ) | |
| stats["rows_imported"] += 1 | |
| stats["events_created"] += 1 | |
| if len(session_rows) >= batch_size: | |
| flush_insert(target, "session", SESSION_COLUMNS, session_rows, "session_id") | |
| session_rows.clear() | |
| if len(event_rows) >= batch_size: | |
| if session_rows: | |
| flush_insert(target, "session", SESSION_COLUMNS, session_rows, "session_id") | |
| session_rows.clear() | |
| flush_insert(target, "website_event", WEBSITE_EVENT_COLUMNS, event_rows, "event_id") | |
| event_rows.clear() | |
| if session_rows: | |
| flush_insert(target, "session", SESSION_COLUMNS, session_rows, "session_id") | |
| if event_rows: | |
| flush_insert(target, "website_event", WEBSITE_EVENT_COLUMNS, event_rows, "event_id") | |
| target.write("COMMIT;\n") | |
| print_summary(input_path=input_path, output_path=output_path, stats=stats) | |
| def ensure_required_columns(fieldnames: Iterable[str]) -> None: | |
| required = { | |
| "added_iso", | |
| "browser_name", | |
| "browser_version", | |
| "country_code", | |
| "device_type", | |
| "hostname", | |
| "is_robot", | |
| "lang_language", | |
| "lang_region", | |
| "os_name", | |
| "os_version", | |
| "path", | |
| "path_and_query", | |
| "query", | |
| "referrer_hostname", | |
| "referrer_path", | |
| "screen_height", | |
| "screen_width", | |
| "session_id", | |
| "user_agent", | |
| "utm_campaign", | |
| "utm_content", | |
| "utm_medium", | |
| "utm_source", | |
| "utm_term", | |
| "uuid", | |
| } | |
| missing = sorted(required.difference(fieldnames)) | |
| if missing: | |
| raise ValueError(f"CSV is missing required columns: {', '.join(missing)}") | |
| def write_header( | |
| *, | |
| target, | |
| input_path: Path, | |
| website_id: str, | |
| batch_size: int, | |
| session_timeout: timedelta, | |
| ) -> None: | |
| target.write("-- Generated by migrate_simpleanalytics_to_umami.py\n") | |
| target.write(f"-- Source file: {input_path}\n") | |
| target.write(f"-- Destination website_id: {website_id}\n") | |
| target.write(f"-- Batch size: {batch_size}\n") | |
| target.write( | |
| f"-- Session timeout: {int(session_timeout.total_seconds() // 60)} minutes\n\n" | |
| ) | |
| target.write("BEGIN;\n\n") | |
| def build_session_row( | |
| *, | |
| website_id: str, | |
| session_id: str, | |
| distinct_id: str, | |
| event_timestamp: datetime, | |
| row: dict[str, str], | |
| ) -> tuple[object, ...]: | |
| browser = normalize_browser(row.get("browser_name")) | |
| os_name = normalize_os(row.get("os_name")) | |
| device = normalize_device(row.get("device_type")) | |
| screen = normalize_screen(row.get("screen_width"), row.get("screen_height")) | |
| language = normalize_language(row.get("lang_language"), row.get("lang_region")) | |
| country = normalize_country(row.get("country_code")) | |
| return ( | |
| session_id, | |
| website_id, | |
| browser, | |
| os_name, | |
| device, | |
| screen, | |
| language, | |
| country, | |
| None, | |
| None, | |
| distinct_id, | |
| to_sql_timestamp(event_timestamp), | |
| ) | |
| def build_website_event_row( | |
| *, | |
| website_id: str, | |
| session_id: str, | |
| visit_id: str, | |
| event_timestamp: datetime, | |
| row: dict[str, str], | |
| line_number: int, | |
| ) -> tuple[object, ...]: | |
| event_id = build_event_id(website_id=website_id, row=row, line_number=line_number) | |
| url_path = normalize_url_path(row.get("path")) | |
| url_query = normalize_query(row.get("query")) | |
| derived_path, derived_query = split_path_and_query(row.get("path_and_query")) | |
| if not url_path: | |
| url_path = derived_path or "/" | |
| if not url_query: | |
| url_query = derived_query | |
| referrer_domain = clean_text(row.get("referrer_hostname")) | |
| referrer_path = clean_text(row.get("referrer_path")) | |
| referrer_query = derive_referrer_query(row.get("document_referrer")) | |
| hostname = truncate(clean_text(row.get("hostname")), 100) | |
| return ( | |
| event_id, | |
| website_id, | |
| session_id, | |
| visit_id, | |
| to_sql_timestamp(event_timestamp), | |
| truncate(url_path or "/", 500), | |
| truncate(url_query, 500), | |
| truncate(clean_text(row.get("utm_source")), 255), | |
| truncate(clean_text(row.get("utm_medium")), 255), | |
| truncate(clean_text(row.get("utm_campaign")), 255), | |
| truncate(clean_text(row.get("utm_content")), 255), | |
| truncate(clean_text(row.get("utm_term")), 255), | |
| truncate(referrer_path, 500), | |
| truncate(referrer_query, 500), | |
| truncate(referrer_domain, 500), | |
| None, | |
| None, | |
| None, | |
| None, | |
| None, | |
| None, | |
| None, | |
| 1, | |
| None, | |
| None, | |
| hostname, | |
| ) | |
| def build_event_id(*, website_id: str, row: dict[str, str], line_number: int) -> str: | |
| source_uuid = clean_text(row.get("uuid")) | |
| if source_uuid: | |
| return make_uuid("event", website_id, source_uuid) | |
| return make_uuid( | |
| "event", | |
| website_id, | |
| clean_text(row.get("added_iso")) or "", | |
| clean_text(row.get("path_and_query")) or "", | |
| str(line_number), | |
| ) | |
| def build_fingerprint(row: dict[str, str]) -> tuple[str, ...]: | |
| return ( | |
| normalize_for_fingerprint(row.get("browser_name")), | |
| normalize_for_fingerprint(row.get("browser_version")), | |
| normalize_for_fingerprint(row.get("country_code")), | |
| normalize_for_fingerprint(row.get("os_name")), | |
| normalize_for_fingerprint(row.get("os_version")), | |
| normalize_for_fingerprint(row.get("screen_height")), | |
| normalize_for_fingerprint(row.get("screen_width")), | |
| normalize_for_fingerprint(row.get("user_agent")), | |
| ) | |
| def parse_timestamp(raw_value: str | None, line_number: int) -> datetime: | |
| value = clean_text(raw_value) | |
| if not value: | |
| raise ValueError(f"Missing added_iso at CSV line {line_number}") | |
| try: | |
| parsed = datetime.fromisoformat(value.replace("Z", "+00:00")) | |
| except ValueError as exc: | |
| raise ValueError(f"Invalid added_iso at CSV line {line_number}: {value}") from exc | |
| if parsed.tzinfo is None: | |
| parsed = parsed.replace(tzinfo=timezone.utc) | |
| return parsed | |
| def split_path_and_query(raw_value: str | None) -> tuple[str | None, str | None]: | |
| value = clean_text(raw_value) | |
| if not value: | |
| return None, None | |
| if "?" not in value: | |
| return value, None | |
| path, query = value.split("?", 1) | |
| return path or "/", query or None | |
| def derive_referrer_query(raw_value: str | None) -> str | None: | |
| value = clean_text(raw_value) | |
| if not value: | |
| return None | |
| try: | |
| parsed = urlsplit(value) | |
| except ValueError: | |
| return None | |
| return parsed.query or None | |
| def normalize_url_path(raw_value: str | None) -> str | None: | |
| value = clean_text(raw_value) | |
| if not value: | |
| return None | |
| return value if value.startswith("/") else f"/{value}" | |
| def normalize_query(raw_value: str | None) -> str | None: | |
| value = clean_text(raw_value) | |
| if not value: | |
| return None | |
| return value[1:] if value.startswith("?") else value | |
| def normalize_browser(raw_value: str | None) -> str | None: | |
| value = clean_text(raw_value) | |
| if not value: | |
| return None | |
| return truncate(value.lower(), 20) | |
| def normalize_os(raw_value: str | None) -> str | None: | |
| value = clean_text(raw_value) | |
| if not value: | |
| return None | |
| return truncate(value.lower(), 20) | |
| def normalize_device(raw_value: str | None) -> str | None: | |
| value = clean_text(raw_value) | |
| if not value: | |
| return None | |
| lowered = value.lower() | |
| if lowered in {"desktop", "mobile", "tablet"}: | |
| return lowered | |
| return truncate(lowered, 20) | |
| def normalize_screen(width: str | None, height: str | None) -> str | None: | |
| width_value = clean_text(width) | |
| height_value = clean_text(height) | |
| if not width_value or not height_value: | |
| return None | |
| return truncate(f"{width_value}x{height_value}", 11) | |
| def normalize_language(language: str | None, region: str | None) -> str | None: | |
| language_value = clean_text(language) | |
| region_value = clean_text(region) | |
| if not language_value and not region_value: | |
| return None | |
| if language_value and region_value: | |
| return truncate(f"{language_value.lower()}-{region_value.upper()}", 35) | |
| return truncate((language_value or region_value).lower(), 35) | |
| def normalize_country(raw_value: str | None) -> str | None: | |
| value = clean_text(raw_value) | |
| if not value: | |
| return None | |
| return truncate(value.upper(), 2) | |
| def normalize_for_fingerprint(raw_value: str | None) -> str: | |
| value = clean_text(raw_value) | |
| return value.casefold() if value else "" | |
| def clean_text(raw_value: str | None) -> str | None: | |
| if raw_value is None: | |
| return None | |
| value = raw_value.strip() | |
| return value or None | |
| def truncate(value: str | None, max_length: int) -> str | None: | |
| if value is None: | |
| return None | |
| return value[:max_length] | |
| def to_sql_timestamp(value: datetime) -> str: | |
| return value.astimezone(timezone.utc).isoformat().replace("+00:00", "Z") | |
| def sha256_hex(value: str) -> str: | |
| return hashlib.sha256(value.encode("utf-8")).hexdigest() | |
| def make_uuid(*parts: str) -> str: | |
| return str(uuid.uuid5(UUID_NAMESPACE, ":".join(parts))) | |
| def is_truthy(raw_value: str | None) -> bool: | |
| value = clean_text(raw_value) | |
| return value is not None and value.lower() in {"1", "true", "yes"} | |
| def flush_insert(target, table_name: str, columns: tuple[str, ...], rows, conflict_column: str) -> None: | |
| if not rows: | |
| return | |
| target.write(f"INSERT INTO {table_name} ({', '.join(columns)})\nVALUES\n") | |
| serialized_rows = ",\n".join(f" ({serialize_row(row)})" for row in rows) | |
| target.write(serialized_rows) | |
| target.write(f"\nON CONFLICT ({conflict_column}) DO NOTHING;\n\n") | |
| def serialize_row(row: tuple[object, ...]) -> str: | |
| return ", ".join(sql_value(value) for value in row) | |
| def sql_value(value: object) -> str: | |
| if value is None: | |
| return "NULL" | |
| if isinstance(value, int): | |
| return str(value) | |
| text = str(value).replace("'", "''") | |
| return f"'{text}'" | |
| def print_summary(*, input_path: Path, output_path: Path, stats: dict[str, int]) -> None: | |
| print(f"Source CSV: {input_path}") | |
| print(f"Generated SQL: {output_path}") | |
| print(f"Total CSV rows: {stats['rows_total']}") | |
| print(f"Imported events: {stats['rows_imported']}") | |
| print(f"Skipped robots: {stats['rows_skipped_robots']}") | |
| print(f"Approximate visitors (session rows): {stats['sessions_created']}") | |
| print(f"Approximate visits: {stats['visits_created']}") | |
| print(f"Generated pageview events: {stats['events_created']}") | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment