Skip to content

Instantly share code, notes, and snippets.

@emersion
Last active January 10, 2025 11:31
Show Gist options
  • Save emersion/6894f11c0d17dd273a8d9b55e579b589 to your computer and use it in GitHub Desktop.
Save emersion/6894f11c0d17dd273a8d9b55e579b589 to your computer and use it in GitHub Desktop.
import json
import requests
railjson_path = '/home/simon/dl/france_railjson.json'
api_endpoint = 'http://localhost:4000/api'
timetable_id = 13 # 4
infra_id = 4
delay_threshold = 30_000 # ms
def get_timetable(id):
return requests.get(f'{api_endpoint}/timetable/{id}').json()
def get_train_schedules(ids):
return requests.post(f'{api_endpoint}/train_schedule', json={
'ids': ids,
}).json()
def get_train_schedule_simulation(id, infra_id):
return requests.get(f'{api_endpoint}/train_schedule/{id}/simulation', params={
'infra_id': infra_id,
}).json()
def get_train_schedule_path(id, infra_id):
return requests.get(f'{api_endpoint}/train_schedule/{id}/path', params={
'infra_id': infra_id,
}).json()
def clear_line():
print('\x1b[1A\x1b[2K', end='')
# poor man's ISO 8601 duration parser
def parse_iso8601_duration(s):
assert(s.startswith('PT') and s.endswith('S'))
return int(s.replace('PT', '').replace('S', '')) * 1000
train_schedule_ids = get_timetable(timetable_id)['train_ids']
#train_schedule_ids = train_schedule_ids[:1000]
print(f'Fetching {len(train_schedule_ids)} train schedules...')
train_schedules = get_train_schedules(train_schedule_ids)
simulations = {}
try:
for i, id in enumerate(train_schedule_ids):
if i > 0:
clear_line()
print(f'Simulating ({i + 1}/{len(train_schedule_ids)})...')
simulations[id] = get_train_schedule_simulation(id, infra_id)
except KeyboardInterrupt:
print('Stopping simulations')
train_schedules = [ts for ts in train_schedules if ts['id'] in simulations]
print('Filtering delayed train schedules...')
invalid = 0
delayed_train_schedules = []
for train_schedule in train_schedules:
simulation = simulations[train_schedule['id']]
if simulation['status'] != 'success':
invalid += 1
continue
expected_times_by_id = {}
for schedule in train_schedule['schedule']:
expected_times_by_id[schedule['at']] = schedule['arrival']
expected_times = [expected_times_by_id.get(item['id'], None) for item in train_schedule['path']]
delayed_at_index = None
delay_duration = 0
for i, got in enumerate(simulation['final_output']['path_item_times']):
want = expected_times[i]
if want == None:
continue
want = parse_iso8601_duration(want)
delay_duration = got - want
if delay_duration > delay_threshold:
delayed_at_index = i
break
if delayed_at_index != None:
delayed_train_schedules.append({
'train_schedule': train_schedule,
'at_index': delayed_at_index,
'duration': delay_duration,
})
print(f'Found {invalid} invalid, {len(delayed_train_schedules)} delayed out of {len(train_schedules)} train schedules')
delayed_track_sections = {}
i = 0
for delayed_train_schedule in delayed_train_schedules:
train_schedule = delayed_train_schedule['train_schedule']
delayed_at_index = delayed_train_schedule['at_index']
if i > 0:
clear_line()
print(f'Performing path finding ({i + 1}/{len(delayed_train_schedules)})...')
i += 1
path = get_train_schedule_path(train_schedule['id'], infra_id)
assert(path['status'] == 'success')
delayed_at_position = path['path_item_positions'][delayed_at_index]
delayed_from_position = 0
if delayed_at_index > 0:
delayed_from_position = path['path_item_positions'][delayed_at_index - 1]
pos = 0
for track_section_range in path['track_section_ranges']:
if pos >= delayed_from_position and pos <= delayed_at_position:
delayed_track_section = delayed_track_sections.get(track_section_range['track_section'], {
'track_section_ranges': [],
'delayed_train_schedules': {},
})
delayed_track_section['track_section_ranges'].append(track_section_range)
delayed_track_section['delayed_train_schedules'][train_schedule['id']] = delayed_train_schedule
delayed_track_sections[track_section_range['track_section']] = delayed_track_section
pos += track_section_range['end'] - track_section_range['begin']
print(f'Found {len(delayed_track_sections)} potential track sections with delays')
print('Loading infrastructure...')
with open(railjson_path, 'r') as f:
infra = json.load(f)
track_sections = {}
for track_section in infra['track_sections']:
track_sections[track_section['id']] = track_section
switches_by_track_section = {}
for switch in infra['switches']:
for port in switch['ports'].values():
key = (port['track'], port['endpoint'])
switches = switches_by_track_section.get(key, [])
switches.append(switch)
switches_by_track_section[key] = switches
def expand_applicable_directions(dirs):
return ['START_TO_STOP', 'STOP_TO_START'] if dirs == 'BOTH' else [dirs]
def opposite_endpoint(endpoint):
return {
'BEGIN': 'END',
'END': 'BEGIN',
}[endpoint]
def origin_from_direction(dir):
return {
'START_TO_STOP': 'BEGIN',
'STOP_TO_START': 'END',
}[dir]
def destination_from_direction(dir):
return opposite_endpoint(origin_from_direction(dir))
def direction_from_origin(endpoint):
return {
'BEGIN': 'START_TO_STOP',
'END': 'STOP_TO_START',
}[endpoint]
def direction_from_destination(endpoint):
return direction_from_origin(opposite_endpoint(endpoint))
def switch_has_ingress(switch, track_section_set):
for port in switch['ports'].values():
dir = direction_from_destination(port['endpoint'])
if (port['track'], dir) in track_section_set:
return True
return False
def _max_speed_section_length(track_range, dir, speed_section, l=0, seen=set()):
l += track_range['end'] - track_range['begin']
seen.add(track_range['track'])
switches = switches_by_track_section.get((track_range['track'], destination_from_direction(dir)), [])
result = l
for switch in switches:
for port in switch['ports'].values():
d = direction_from_origin(port['endpoint'])
for tr in speed_section['track_ranges']:
if tr['track'] in seen: # TODO: too loose
continue
if port['track'] != tr['track']:
continue
if d not in expand_applicable_directions(tr['applicable_directions']):
continue
ll = _max_speed_section_length(tr, d, speed_section, l, seen)
if ll > result:
result = ll
break
return result
def max_speed_section_length(speed_section):
track_section_set = set()
for track_range in speed_section['track_ranges']:
for dir in expand_applicable_directions(track_range['applicable_directions']):
track_section_set.add((track_range['track'], dir))
roots = []
for track_range in speed_section['track_ranges']:
for dir in expand_applicable_directions(track_range['applicable_directions']):
switches = switches_by_track_section.get((track_range['track'], origin_from_direction(dir)), [])
has_ingress = False
for switch in switches:
has_ingress = switch_has_ingress(switch, track_section_set)
if has_ingress:
break
if not has_ingress:
roots.append((track_range, dir))
max_length = 0
for (root, dir) in roots:
l = _max_speed_section_length(root, dir, speed_section)
if l > max_length:
max_length = l
return max_length
def track_section_ranges_overlap(a, b):
return a['begin'] < b['end'] and a['end'] > b['begin']
delayed_speed_sections = []
for speed_section in infra['speed_sections']:
# Selects LPVs
if speed_section.get('extensions', {}).get('psl_sncf', {}).get('z', None) == None:
continue
delayed_train_schedules = {}
for track_range in speed_section['track_ranges']:
delayed = delayed_track_sections.get(track_range['track'], None)
if delayed == None:
continue
is_delayed = False
for d in delayed['track_section_ranges']:
if d['direction'] in expand_applicable_directions(track_range['applicable_directions']) and track_section_ranges_overlap(d, track_range):
is_delayed = True
break
if is_delayed:
delayed_train_schedules |= delayed['delayed_train_schedules']
if len(delayed_train_schedules) > 0:
delayed_speed_sections.append({
'speed_section': speed_section,
'delayed_train_schedules': delayed_train_schedules,
'max_length': max_speed_section_length(speed_section),
})
#max_length = max_speed_section_length(speed_section)
#limit = speed_section.get('speed_limit', 0)
#if max_length >= 3000 and limit < 60 / 3.6:
# if len(speed_section.get('speed_limit_by_tag', {})) > 0:
# continue
# reasonable_ones.append((speed_section, max_length))
print(f'Found {len(delayed_speed_sections)} potential speed sections with delays')
delayed_speed_sections.sort(key=lambda dss: sum([dts['duration'] for dts in dss['delayed_train_schedules'].values()]))
for delayed_speed_section in delayed_speed_sections:
speed_section = delayed_speed_section['speed_section']
id = speed_section['id']
limit = round(speed_section.get('speed_limit', 0) * 3.6)
max_length = round(delayed_speed_section['max_length'] / 1000)
n = len(delayed_speed_section['delayed_train_schedules'])
delays = [dts['duration'] for dts in delayed_speed_section['delayed_train_schedules'].values()]
total_delay = round(sum(delays) / 1000 / 60)
max_delay = round(max(delays) / 1000 / 60)
avg_delay = round(total_delay / n)
print(f"{id} {limit} km/h, {max_length} km, {n} trains, total {total_delay} min, max {max_delay} min, avg {avg_delay} min")
with open('output.json', 'w') as f:
json.dump([{
'id': dss['speed_section']['id'],
'max_length': dss['max_length'],
'train_schedules': [{
'id': dts['train_schedule']['id'],
'duration': dts['duration'],
} for dts in dss['delayed_train_schedules'].values()],
} for dss in delayed_speed_sections], f, indent=4)
print('Wrote summary to output.json')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment