import pandas as pd
import dask.dataframe as dd
import geopandas as gpd
from pyproj import Transformer
from dask.diagnostics import ProgressBar



def transform_coordinates(partition: dd.DataFrame) -> dd.DataFrame:
    # Define the transformer: EPSG:27700 (BNG) to EPSG:4326 (WGS84)
    transformer = Transformer.from_crs("EPSG:27700", "EPSG:4326")

    # for the partition, transform the coordinates
    lats, lons = transformer.transform(
        partition["Northings"].values, partition["Eastings"].values
    )
    partition["Latitude"] = lats
    partition["Longitude"] = lons
    partition["geometry"] = gpd.points_from_xy(lons, lats)
    return partition
  

def dask_memory_usage(df: pd.DataFrame) -> int:
    return (
        df.map_partitions(lambda df: df.memory_usage(deep=True).sum()).compute() / 1e6
    ).sum()

if __name__ == "__main__":
  geo_data_dir = Path("/Users/tommylees/github/pltr_fellowship/geography")
  csv_data = list((geo_data_dir / "codepo_gb/Data/CSV").glob("*.csv"))
  headers_file = geo_data_dir / "codepo_gb/Doc/Code-Point_Open_Column_Headers.csv"

  with open(headers_file, "r") as f:
      _headers = f.readlines()

  headers = _headers[1].strip().split(",")

  # Create a dask dataframe from all CSV files
  ddf = dd.read_csv(
      csv_data,  # List of CSV file paths
      names=headers,  # Use the headers we extracted
      assume_missing=True,  # Handle any missing values
      blocksize="64MB",  # Reasonable block size for memory management
      dtype={"Admin_county_code": "object"},
  )

  total_memory = dask_memory_usage(ddf)
  print(f"Total memory usage: {total_memory} MB")

  # Lazy load the transformation
  ddf = ddf.map_partitions(transform_coordinates)
  
  fpath = downloads / "postcode_to_latlon.csv"
  with ProgressBar():
      ddf.to_csv(fpath, index=False, single_file=True)