import pandas as pd import dask.dataframe as dd import geopandas as gpd from pyproj import Transformer from dask.diagnostics import ProgressBar def transform_coordinates(partition: dd.DataFrame) -> dd.DataFrame: # Define the transformer: EPSG:27700 (BNG) to EPSG:4326 (WGS84) transformer = Transformer.from_crs("EPSG:27700", "EPSG:4326") # for the partition, transform the coordinates lats, lons = transformer.transform( partition["Northings"].values, partition["Eastings"].values ) partition["Latitude"] = lats partition["Longitude"] = lons partition["geometry"] = gpd.points_from_xy(lons, lats) return partition def dask_memory_usage(df: pd.DataFrame) -> int: return ( df.map_partitions(lambda df: df.memory_usage(deep=True).sum()).compute() / 1e6 ).sum() if __name__ == "__main__": geo_data_dir = Path("/Users/tommylees/github/pltr_fellowship/geography") csv_data = list((geo_data_dir / "codepo_gb/Data/CSV").glob("*.csv")) headers_file = geo_data_dir / "codepo_gb/Doc/Code-Point_Open_Column_Headers.csv" with open(headers_file, "r") as f: _headers = f.readlines() headers = _headers[1].strip().split(",") # Create a dask dataframe from all CSV files ddf = dd.read_csv( csv_data, # List of CSV file paths names=headers, # Use the headers we extracted assume_missing=True, # Handle any missing values blocksize="64MB", # Reasonable block size for memory management dtype={"Admin_county_code": "object"}, ) total_memory = dask_memory_usage(ddf) print(f"Total memory usage: {total_memory} MB") # Lazy load the transformation ddf = ddf.map_partitions(transform_coordinates) fpath = downloads / "postcode_to_latlon.csv" with ProgressBar(): ddf.to_csv(fpath, index=False, single_file=True)