|
import argparse |
|
import os |
|
import pandas as pd |
|
from openpyxl import Workbook |
|
from pathlib import Path |
|
from xml.etree.ElementTree import Element, SubElement, tostring |
|
from xml.dom.minidom import parseString |
|
import requests |
|
import json |
|
import csv |
|
import shutil |
|
import math |
|
import datetime |
|
import logging |
|
|
|
# Configure logging |
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
|
|
DEFAULT_DAILY_MAX = 180000 |
|
|
|
# Extract GeoJSON Requests |
|
def extract_geojson_requests(har_file, output_csv): |
|
""" |
|
Extracts GeoJSON requests from a HAR file and saves them to a CSV file. |
|
|
|
Args: |
|
har_file (str): Path to the HAR file. |
|
folder (Path): Directory where the output CSV will be saved. |
|
""" |
|
logging.info(f"Extracting GeoJSON requests from {har_file} to {output_csv}") |
|
try: |
|
# Read the HAR file |
|
with open(har_file, 'r', encoding='utf-8') as file: |
|
har_data = json.load(file) |
|
|
|
# Extract URLs |
|
requests_list = [] |
|
for entry in har_data["log"]["entries"]: |
|
url = entry["request"]["url"] |
|
if "brouter" in url and "geojson" in url: |
|
requests_list.append(url) |
|
|
|
# Write URLs to the CSV file |
|
with open(output_csv, 'w', newline='', encoding='utf-8') as csvfile: |
|
csv_writer = csv.writer(csvfile) |
|
csv_writer.writerow(["URL"]) |
|
for url in requests_list: |
|
csv_writer.writerow([url]) |
|
|
|
logging.info(f"Extracted URLs saved to: {output_csv}") |
|
except Exception as e: |
|
logging.error(f"Error extracting GeoJSON requests: {e}") |
|
|
|
|
|
# Download GeoJSON Files |
|
def sanitize_filename(filename): |
|
return "".join(c for c in filename if c.isalnum() or c in (' ', '.', '_', '-')).strip() |
|
|
|
def download_geojson(csv_file, geojson_folder): |
|
""" |
|
Reads the CSV file, creates a subfolder "geojson", deletes its content, |
|
and downloads all URLs, saving them with sanitized filenames and an appropriate numerical prefix. |
|
|
|
Args: |
|
csv_file (str): Path to the input CSV file. |
|
folder (Path): Base folder where the geojson subfolder will be created. |
|
""" |
|
logging.info(f"Starting GeoJSON download process from CSV: {csv_file}") |
|
try: |
|
# Create or clear the "geojson" folder |
|
if geojson_folder.exists(): |
|
shutil.rmtree(geojson_folder) # Delete folder contents |
|
geojson_folder.mkdir(parents=True, exist_ok=True) |
|
|
|
# Read URLs from the CSV file |
|
with open(csv_file, 'r', encoding='utf-8') as csvfile: |
|
reader = csv.reader(csvfile) |
|
next(reader) # Skip the header row |
|
urls = [row[0] for row in reader] |
|
|
|
# Determine padding based on the number of URLs |
|
padding = math.ceil(math.log10(len(urls))) if urls else 1 |
|
total_downloads = 0 |
|
|
|
# Download each URL and save with sanitized filenames |
|
for index, url in enumerate(urls): |
|
try: |
|
response = requests.get(url) |
|
response.raise_for_status() # Raise error for HTTP failures |
|
|
|
# Generate sanitized filename with dynamic padding |
|
base_name = Path(url.split('?')[0]).name # Extract base filename from URL |
|
sanitized_name = sanitize_filename(base_name) |
|
file_name = f"{index:0{padding}d}_{sanitized_name}.geojson" |
|
|
|
# Save the file |
|
file_path = geojson_folder / file_name |
|
with open(file_path, 'wb') as file: |
|
file.write(response.content) |
|
|
|
logging.debug(f"Downloaded: {url} -> {file_path}") |
|
total_downloads += 1 |
|
|
|
except Exception as e: |
|
logging.warning(f"Error downloading {url}: {e}") |
|
|
|
logging.info(f"GeoJSON download process completed. {total_downloads} files saved in: {geojson_folder}") |
|
except Exception as e: |
|
logging.error(f"Error processing CSV file: {e}") |
|
|
|
def process_messages(rows, csv_writer, cumulative_time, cumulative_cost): |
|
""" |
|
Processes the rows of messages, calculates additional fields, and writes to CSV. |
|
|
|
Args: |
|
rows (iterable): Rows of messages to process. |
|
csv_writer (csv.writer): CSV writer object to write processed rows. |
|
cumulative_time (int): Initial cumulative time. |
|
cumulative_cost (int): Initial cumulative cost. |
|
Returns: |
|
tuple: Updated cumulative_time and cumulative_cost. |
|
""" |
|
for i, row in enumerate(rows): |
|
try: |
|
# Extract values for calculations |
|
cost_per_km = int(row[4]) # "CostPerKm" |
|
elevation_cost = int(row[5]) # "ElevationCost" |
|
turn_cost = int(row[6]) # "TurnCost" |
|
distance = int(row[3]) # "Distance" |
|
|
|
# Calculate segment costs |
|
segment_cost_pure = round(elevation_cost + turn_cost) |
|
segment_cost_with_distance = round( |
|
(cost_per_km * distance / 1000) + segment_cost_pure |
|
) |
|
|
|
# Update cumulative cost |
|
cumulative_cost += segment_cost_with_distance |
|
|
|
# Time calculations |
|
current_time = int(row[11]) # "Time" field |
|
previous_time = int(rows[i - 1][11]) if i > 0 else 0 |
|
time_delta = current_time - previous_time |
|
cumulative_time += time_delta |
|
|
|
# Append calculations to the row |
|
extended_row = row + [ |
|
time_delta, |
|
cumulative_time, |
|
segment_cost_pure, |
|
segment_cost_with_distance, |
|
cumulative_cost, |
|
] |
|
csv_writer.writerow(extended_row) |
|
|
|
except (IndexError, ValueError) as e: |
|
logging.warning(f"Skipping malformed row: {row}, Error: {e}") |
|
continue |
|
|
|
return cumulative_time, cumulative_cost |
|
|
|
def merge_geojson_messages(geojson_folder, processed_csv): |
|
""" |
|
Merges GeoJSON files in the specified folder, extracting and processing the "messages" array, |
|
and writes the output to a processed CSV file. |
|
|
|
Args: |
|
geojson_folder (Path): Path to the folder containing GeoJSON files. |
|
processed_csv (Path): Path to the output processed CSV file. |
|
""" |
|
logging.info(f"Merging GeoJSON files in folder: {geojson_folder}") |
|
try: |
|
cumulative_time = 0 |
|
cumulative_cost = 0 |
|
|
|
with open(processed_csv, 'w', newline='', encoding='utf-8') as csvfile: |
|
csv_writer = None |
|
|
|
for geojson_file in geojson_folder.glob("*.geojson"): |
|
logging.debug(f"Processing GeoJSON file: {geojson_file}") |
|
with open(geojson_file, 'r', encoding='utf-8') as file: |
|
geojson_data = json.load(file) |
|
|
|
for feature in geojson_data.get("features", []): |
|
properties = feature.get("properties", {}) |
|
messages = properties.get("messages", []) |
|
|
|
if not messages or not isinstance(messages[0], list): |
|
logging.debug(f"No valid messages found in file: {geojson_file}") |
|
continue |
|
|
|
header = messages[0] + [ |
|
"TimeDelta", |
|
"CumulativeTime", |
|
"SegmentCostPure", |
|
"SegmentCostWithDistance", |
|
"CumulativeCost", |
|
] |
|
|
|
if csv_writer is None: |
|
csv_writer = csv.writer(csvfile) |
|
csv_writer.writerow(header) |
|
|
|
cumulative_time, cumulative_cost = process_messages( |
|
messages[1:], csv_writer, cumulative_time, cumulative_cost |
|
) |
|
|
|
logging.info(f"Merged data saved to processed CSV: {processed_csv}") |
|
except Exception as e: |
|
logging.error(f"Error merging GeoJSON files: {e}") |
|
|
|
def process_tsv(tsv_file, processed_csv): |
|
""" |
|
Processes a tab-separated file containing GeoJSON messages and writes the output to a processed CSV file. |
|
|
|
Args: |
|
tsv_file (Path): Path to the input TSV file containing GeoJSON messages. |
|
processed_csv (Path): Path to the output processed CSV file. |
|
""" |
|
logging.info(f"Processing GeoJSON messages from TSV file: {tsv_file}") |
|
try: |
|
cumulative_time = 0 |
|
cumulative_cost = 0 |
|
|
|
with open(tsv_file, 'r', encoding='utf-8') as tsv_input, open(processed_csv, 'w', newline='', encoding='utf-8') as csv_output: |
|
tsv_reader = csv.reader(tsv_input, delimiter='\t') |
|
csv_writer = None |
|
|
|
for i, row in enumerate(tsv_reader): |
|
if i == 0: # Assume the first row is the header |
|
header = row + [ |
|
"TimeDelta", |
|
"CumulativeTime", |
|
"SegmentCostPure", |
|
"SegmentCostWithDistance", |
|
"CumulativeCost", |
|
] |
|
csv_writer = csv.writer(csv_output) |
|
csv_writer.writerow(header) |
|
continue |
|
|
|
cumulative_time, cumulative_cost = process_messages( |
|
[row], csv_writer, cumulative_time, cumulative_cost |
|
) |
|
|
|
logging.info(f"Processed data saved to processed CSV: {processed_csv}") |
|
|
|
except Exception as e: |
|
logging.error(f"Error processing TSV file: {e}") |
|
|
|
# Generate Daily Segments and GPX |
|
def create_gpx_file(waypoints, base_folder, output_file_name): |
|
""" |
|
Creates a GPX file from the given waypoints and saves it in the base folder. |
|
|
|
Args: |
|
waypoints (list of dict): List of waypoints with lat, lon, and additional data. |
|
base_folder (Path): Base folder where the GPX file will be saved. |
|
output_file_name (str): Name of the output GPX file. |
|
""" |
|
logging.info(f"Creating GPX file: {output_file_name} in folder: {base_folder}") |
|
|
|
# Ensure base folder is a Path object and resolve the output file path |
|
base_folder = Path(base_folder).resolve() |
|
output_file = base_folder / output_file_name |
|
|
|
# Create GPX structure |
|
gpx = Element('gpx', attrib={"version": "1.1", "creator": "CycleSegmentPlanner"}) |
|
|
|
for waypoint in waypoints: |
|
wpt = SubElement(gpx, 'wpt', attrib={ |
|
"lat": str(waypoint['Latitude']), |
|
"lon": str(waypoint['Longitude']) |
|
}) |
|
name = SubElement(wpt, 'name') |
|
name.text = waypoint['Name'] |
|
|
|
desc = SubElement(wpt, 'desc') |
|
desc.text = ( |
|
f"Segment {waypoint['Name']}, Total Distance: {waypoint['TotalDistance']} m, " |
|
f"Cost with Distance: {waypoint['SegmentCostWithDistance']} m, " |
|
f"Time: {waypoint['SegmentTime']} mins" |
|
) |
|
|
|
# Convert GPX structure to XML |
|
xml_str = tostring(gpx) |
|
pretty_xml = parseString(xml_str).toprettyxml() |
|
|
|
# Write the GPX file |
|
try: |
|
with open(output_file, 'w', encoding='utf-8') as file: |
|
file.write(pretty_xml) |
|
logging.info(f"GPX file successfully created: {output_file}") |
|
except Exception as e: |
|
logging.error(f"Error writing GPX file: {e}") |
|
|
|
def save_to_excel(waypoints, output_excel): |
|
""" |
|
Saves waypoints data to an Excel file. |
|
|
|
Args: |
|
waypoints (list of dict): List of waypoints data. |
|
output_excel (Path): Path to the output Excel file. |
|
""" |
|
try: |
|
pd.DataFrame(waypoints).to_excel(output_excel, index=False) |
|
logging.info(f"Excel file created: {output_excel}") |
|
except Exception as e: |
|
logging.error(f"Error saving to Excel: {e}") |
|
|
|
|
|
def generate_daily_segments(strategy, processed_csv, daily_max, output_gpx, first_day_max=None): |
|
""" |
|
Generates daily cycling segments based on the chosen strategy (cost or time), creates a GPX file, |
|
and saves the data to an Excel file. |
|
|
|
Args: |
|
strategy (str): Segmentation strategy ("cost" or "time"). |
|
processed_csv (Path): Path to the processed CSV file. |
|
daily_max (int): Maximum daily cost or time. |
|
output_gpx (Path): Path to the output GPX file. |
|
first_day_max (int, optional): Maximum cost or time for the first day. Defaults to None. |
|
""" |
|
logging.info(f"Generating daily segments from: {processed_csv}") |
|
try: |
|
# Load the processed CSV data |
|
data = pd.read_csv(processed_csv) |
|
|
|
waypoints = [] |
|
cumulative_cost = 0 |
|
cumulative_time = 0 |
|
segment_distance = 0 |
|
segment_time = 0 |
|
segment_counter = 1 |
|
current_daily_max = first_day_max if first_day_max else daily_max |
|
|
|
# Collect waypoints based on the strategy |
|
for index, row in data.iterrows(): |
|
# Check if adding this row exceeds the daily max |
|
if (strategy == "cost" and cumulative_cost + row["SegmentCostWithDistance"] > current_daily_max) or \ |
|
(strategy == "time" and segment_time + row["TimeDelta"] > current_daily_max): |
|
# Finalize the current segment |
|
waypoints.append({ |
|
"Name": f"Day {segment_counter}", |
|
"Day": f"Day {segment_counter}", |
|
"Latitude": row["Latitude"] / 1e6, |
|
"Longitude": row["Longitude"] / 1e6, |
|
"TotalDistance": segment_distance, |
|
"SegmentCostWithDistance": cumulative_cost, |
|
"SegmentTime": segment_time, |
|
}) |
|
# Reset counters and prepare for the next segment |
|
cumulative_cost = 0 |
|
segment_time = 0 |
|
segment_distance = 0 |
|
segment_counter += 1 |
|
current_daily_max = daily_max # Reset to standard daily max after the first day |
|
|
|
# Add the current row to the current segment |
|
cumulative_cost += row["SegmentCostWithDistance"] |
|
segment_time += row["TimeDelta"] |
|
segment_distance += row["Distance"] |
|
|
|
# Add any remaining values as the last segment |
|
if segment_distance > 0 or segment_time > 0: |
|
waypoints.append({ |
|
"Name": f"Day {segment_counter}", |
|
"Day": f"Day {segment_counter}", |
|
"Latitude": data.iloc[-1]["Latitude"] / 1e6, |
|
"Longitude": data.iloc[-1]["Longitude"] / 1e6, |
|
"TotalDistance": segment_distance, |
|
"SegmentCostWithDistance": cumulative_cost, |
|
"SegmentTime": segment_time, |
|
}) |
|
|
|
# Create GPX file |
|
create_gpx_file(waypoints, Path(output_gpx).parent, Path(output_gpx).name) |
|
|
|
# Create Excel file |
|
output_excel = output_gpx.with_suffix(".xlsx") |
|
save_to_excel(waypoints, output_excel) |
|
|
|
except Exception as e: |
|
logging.error(f"Error generating daily segments: {e}") |
|
|
|
|
|
|
|
# Main Function |
|
def main(): |
|
parser = argparse.ArgumentParser(description="Effort-Adjusted Bicycle Tour Planner") |
|
|
|
# Core Arguments |
|
parser.add_argument("--har-file", help="Path to the HAR file containing network activity.") |
|
parser.add_argument("--tsv-file", help="Path to the input tab-separated file containing GeoJSON messages.") |
|
parser.add_argument("--task", default="all", choices=["extract", "download", "merge", "segment", "all"], |
|
help="Task to perform (default: all).") |
|
|
|
# Segmentation Arguments |
|
parser.add_argument("--strategy", choices=["cost", "time"], help="Segmentation strategy (default: cost).") |
|
parser.add_argument("--daily-max", type=int, default=160000, |
|
help="Maximum daily cost or time (default: 160000).") |
|
parser.add_argument("--first-day-max", type=int, |
|
help="Maximum cost or time for the first day (default: same as --daily-max).") |
|
|
|
args = parser.parse_args() |
|
|
|
# Determine Base Folder |
|
base_file = args.tsv_file or args.har_file |
|
if not base_file: |
|
parser.print_usage() |
|
logging.error( |
|
"Error: Either --tsv-file or --har-file must be specified.") |
|
return |
|
|
|
base_folder = Path(base_file).parent.resolve() |
|
geojson_folder = base_folder / "geojson" |
|
merged_csv = base_folder / "merged-geojsons.csv" |
|
|
|
# CSV Derived from HAR (if applicable) |
|
if args.har_file: |
|
csv_file = base_folder / f"{Path(args.har_file).stem}.csv" |
|
else: |
|
csv_file = None |
|
|
|
# Build Output GPX Path |
|
output_gpx = base_folder / ( |
|
f"daily_segments_{args.strategy or 'cost'}_max{args.daily_max}" + |
|
(f"_firstday{args.first_day_max}" if args.first_day_max else "") + |
|
".gpx" |
|
) |
|
|
|
# Task Execution |
|
if args.task in ["extract", "all"] and args.har_file: |
|
extract_geojson_requests(args.har_file, csv_file) |
|
|
|
if args.task in ["download", "all"] and csv_file: |
|
download_geojson(csv_file, geojson_folder) |
|
|
|
if args.task in ["merge", "all"]: |
|
if args.tsv_file: |
|
process_tsv(args.tsv_file, merged_csv) |
|
elif geojson_folder.exists(): |
|
merge_geojson_messages(geojson_folder, merged_csv) |
|
else: |
|
logging.error("No input provided. Specify either --tsv-file or provide a valid geojson folder.") |
|
return |
|
|
|
if args.task in ["segment", "all"]: |
|
generate_daily_segments(args.strategy or "cost", merged_csv, args.daily_max, output_gpx, first_day_max=args.first_day_max) |
|
|
|
if __name__ == "__main__": |
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") |
|
main() |