Last active
January 10, 2025 11:31
-
-
Save emersion/6894f11c0d17dd273a8d9b55e579b589 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import requests | |
railjson_path = '/home/simon/dl/france_railjson.json' | |
api_endpoint = 'http://localhost:4000/api' | |
timetable_id = 13 # 4 | |
infra_id = 4 | |
delay_threshold = 30_000 # ms | |
def get_timetable(id): | |
return requests.get(f'{api_endpoint}/timetable/{id}').json() | |
def get_train_schedules(ids): | |
return requests.post(f'{api_endpoint}/train_schedule', json={ | |
'ids': ids, | |
}).json() | |
def get_train_schedule_simulation(id, infra_id): | |
return requests.get(f'{api_endpoint}/train_schedule/{id}/simulation', params={ | |
'infra_id': infra_id, | |
}).json() | |
def get_train_schedule_path(id, infra_id): | |
return requests.get(f'{api_endpoint}/train_schedule/{id}/path', params={ | |
'infra_id': infra_id, | |
}).json() | |
def clear_line(): | |
print('\x1b[1A\x1b[2K', end='') | |
# poor man's ISO 8601 duration parser | |
def parse_iso8601_duration(s): | |
assert(s.startswith('PT') and s.endswith('S')) | |
return int(s.replace('PT', '').replace('S', '')) * 1000 | |
train_schedule_ids = get_timetable(timetable_id)['train_ids'] | |
#train_schedule_ids = train_schedule_ids[:1000] | |
print(f'Fetching {len(train_schedule_ids)} train schedules...') | |
train_schedules = get_train_schedules(train_schedule_ids) | |
simulations = {} | |
try: | |
for i, id in enumerate(train_schedule_ids): | |
if i > 0: | |
clear_line() | |
print(f'Simulating ({i + 1}/{len(train_schedule_ids)})...') | |
simulations[id] = get_train_schedule_simulation(id, infra_id) | |
except KeyboardInterrupt: | |
print('Stopping simulations') | |
train_schedules = [ts for ts in train_schedules if ts['id'] in simulations] | |
print('Filtering delayed train schedules...') | |
invalid = 0 | |
delayed_train_schedules = [] | |
for train_schedule in train_schedules: | |
simulation = simulations[train_schedule['id']] | |
if simulation['status'] != 'success': | |
invalid += 1 | |
continue | |
expected_times_by_id = {} | |
for schedule in train_schedule['schedule']: | |
expected_times_by_id[schedule['at']] = schedule['arrival'] | |
expected_times = [expected_times_by_id.get(item['id'], None) for item in train_schedule['path']] | |
delayed_at_index = None | |
delay_duration = 0 | |
for i, got in enumerate(simulation['final_output']['path_item_times']): | |
want = expected_times[i] | |
if want == None: | |
continue | |
want = parse_iso8601_duration(want) | |
delay_duration = got - want | |
if delay_duration > delay_threshold: | |
delayed_at_index = i | |
break | |
if delayed_at_index != None: | |
delayed_train_schedules.append({ | |
'train_schedule': train_schedule, | |
'at_index': delayed_at_index, | |
'duration': delay_duration, | |
}) | |
print(f'Found {invalid} invalid, {len(delayed_train_schedules)} delayed out of {len(train_schedules)} train schedules') | |
delayed_track_sections = {} | |
i = 0 | |
for delayed_train_schedule in delayed_train_schedules: | |
train_schedule = delayed_train_schedule['train_schedule'] | |
delayed_at_index = delayed_train_schedule['at_index'] | |
if i > 0: | |
clear_line() | |
print(f'Performing path finding ({i + 1}/{len(delayed_train_schedules)})...') | |
i += 1 | |
path = get_train_schedule_path(train_schedule['id'], infra_id) | |
assert(path['status'] == 'success') | |
delayed_at_position = path['path_item_positions'][delayed_at_index] | |
delayed_from_position = 0 | |
if delayed_at_index > 0: | |
delayed_from_position = path['path_item_positions'][delayed_at_index - 1] | |
pos = 0 | |
for track_section_range in path['track_section_ranges']: | |
if pos >= delayed_from_position and pos <= delayed_at_position: | |
delayed_track_section = delayed_track_sections.get(track_section_range['track_section'], { | |
'track_section_ranges': [], | |
'delayed_train_schedules': {}, | |
}) | |
delayed_track_section['track_section_ranges'].append(track_section_range) | |
delayed_track_section['delayed_train_schedules'][train_schedule['id']] = delayed_train_schedule | |
delayed_track_sections[track_section_range['track_section']] = delayed_track_section | |
pos += track_section_range['end'] - track_section_range['begin'] | |
print(f'Found {len(delayed_track_sections)} potential track sections with delays') | |
print('Loading infrastructure...') | |
with open(railjson_path, 'r') as f: | |
infra = json.load(f) | |
track_sections = {} | |
for track_section in infra['track_sections']: | |
track_sections[track_section['id']] = track_section | |
switches_by_track_section = {} | |
for switch in infra['switches']: | |
for port in switch['ports'].values(): | |
key = (port['track'], port['endpoint']) | |
switches = switches_by_track_section.get(key, []) | |
switches.append(switch) | |
switches_by_track_section[key] = switches | |
def expand_applicable_directions(dirs): | |
return ['START_TO_STOP', 'STOP_TO_START'] if dirs == 'BOTH' else [dirs] | |
def opposite_endpoint(endpoint): | |
return { | |
'BEGIN': 'END', | |
'END': 'BEGIN', | |
}[endpoint] | |
def origin_from_direction(dir): | |
return { | |
'START_TO_STOP': 'BEGIN', | |
'STOP_TO_START': 'END', | |
}[dir] | |
def destination_from_direction(dir): | |
return opposite_endpoint(origin_from_direction(dir)) | |
def direction_from_origin(endpoint): | |
return { | |
'BEGIN': 'START_TO_STOP', | |
'END': 'STOP_TO_START', | |
}[endpoint] | |
def direction_from_destination(endpoint): | |
return direction_from_origin(opposite_endpoint(endpoint)) | |
def switch_has_ingress(switch, track_section_set): | |
for port in switch['ports'].values(): | |
dir = direction_from_destination(port['endpoint']) | |
if (port['track'], dir) in track_section_set: | |
return True | |
return False | |
def _max_speed_section_length(track_range, dir, speed_section, l=0, seen=set()): | |
l += track_range['end'] - track_range['begin'] | |
seen.add(track_range['track']) | |
switches = switches_by_track_section.get((track_range['track'], destination_from_direction(dir)), []) | |
result = l | |
for switch in switches: | |
for port in switch['ports'].values(): | |
d = direction_from_origin(port['endpoint']) | |
for tr in speed_section['track_ranges']: | |
if tr['track'] in seen: # TODO: too loose | |
continue | |
if port['track'] != tr['track']: | |
continue | |
if d not in expand_applicable_directions(tr['applicable_directions']): | |
continue | |
ll = _max_speed_section_length(tr, d, speed_section, l, seen) | |
if ll > result: | |
result = ll | |
break | |
return result | |
def max_speed_section_length(speed_section): | |
track_section_set = set() | |
for track_range in speed_section['track_ranges']: | |
for dir in expand_applicable_directions(track_range['applicable_directions']): | |
track_section_set.add((track_range['track'], dir)) | |
roots = [] | |
for track_range in speed_section['track_ranges']: | |
for dir in expand_applicable_directions(track_range['applicable_directions']): | |
switches = switches_by_track_section.get((track_range['track'], origin_from_direction(dir)), []) | |
has_ingress = False | |
for switch in switches: | |
has_ingress = switch_has_ingress(switch, track_section_set) | |
if has_ingress: | |
break | |
if not has_ingress: | |
roots.append((track_range, dir)) | |
max_length = 0 | |
for (root, dir) in roots: | |
l = _max_speed_section_length(root, dir, speed_section) | |
if l > max_length: | |
max_length = l | |
return max_length | |
def track_section_ranges_overlap(a, b): | |
return a['begin'] < b['end'] and a['end'] > b['begin'] | |
delayed_speed_sections = [] | |
for speed_section in infra['speed_sections']: | |
# Selects LPVs | |
if speed_section.get('extensions', {}).get('psl_sncf', {}).get('z', None) == None: | |
continue | |
delayed_train_schedules = {} | |
for track_range in speed_section['track_ranges']: | |
delayed = delayed_track_sections.get(track_range['track'], None) | |
if delayed == None: | |
continue | |
is_delayed = False | |
for d in delayed['track_section_ranges']: | |
if d['direction'] in expand_applicable_directions(track_range['applicable_directions']) and track_section_ranges_overlap(d, track_range): | |
is_delayed = True | |
break | |
if is_delayed: | |
delayed_train_schedules |= delayed['delayed_train_schedules'] | |
if len(delayed_train_schedules) > 0: | |
delayed_speed_sections.append({ | |
'speed_section': speed_section, | |
'delayed_train_schedules': delayed_train_schedules, | |
'max_length': max_speed_section_length(speed_section), | |
}) | |
#max_length = max_speed_section_length(speed_section) | |
#limit = speed_section.get('speed_limit', 0) | |
#if max_length >= 3000 and limit < 60 / 3.6: | |
# if len(speed_section.get('speed_limit_by_tag', {})) > 0: | |
# continue | |
# reasonable_ones.append((speed_section, max_length)) | |
print(f'Found {len(delayed_speed_sections)} potential speed sections with delays') | |
delayed_speed_sections.sort(key=lambda dss: sum([dts['duration'] for dts in dss['delayed_train_schedules'].values()])) | |
for delayed_speed_section in delayed_speed_sections: | |
speed_section = delayed_speed_section['speed_section'] | |
id = speed_section['id'] | |
limit = round(speed_section.get('speed_limit', 0) * 3.6) | |
max_length = round(delayed_speed_section['max_length'] / 1000) | |
n = len(delayed_speed_section['delayed_train_schedules']) | |
delays = [dts['duration'] for dts in delayed_speed_section['delayed_train_schedules'].values()] | |
total_delay = round(sum(delays) / 1000 / 60) | |
max_delay = round(max(delays) / 1000 / 60) | |
avg_delay = round(total_delay / n) | |
print(f"{id} {limit} km/h, {max_length} km, {n} trains, total {total_delay} min, max {max_delay} min, avg {avg_delay} min") | |
with open('output.json', 'w') as f: | |
json.dump([{ | |
'id': dss['speed_section']['id'], | |
'max_length': dss['max_length'], | |
'train_schedules': [{ | |
'id': dts['train_schedule']['id'], | |
'duration': dts['duration'], | |
} for dts in dss['delayed_train_schedules'].values()], | |
} for dss in delayed_speed_sections], f, indent=4) | |
print('Wrote summary to output.json') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment