Skip to content

Instantly share code, notes, and snippets.

@drheinheimer
Last active October 22, 2019 19:46
Show Gist options
  • Save drheinheimer/8a18d8465a345eec18f4a6793a221c08 to your computer and use it in GitHub Desktop.
Save drheinheimer/8a18d8465a345eec18f4a6793a221c08 to your computer and use it in GitHub Desktop.
Simplify Pywr json file
def simplify_network(m, delete_gauges=False):
# simplify the network
mission_complete = False
obsolete_gauges = []
while not mission_complete:
mission_complete = True
up_nodes = []
down_nodes = []
up_edges = {}
down_edges = {}
for edge in m['edges']:
a, b = edge
up_nodes.append(a)
down_nodes.append(b)
down_edges[a] = edge
up_edges[b] = [edge] if b not in up_edges else up_edges[b] + [edge]
obsolete_nodes = []
obsolete_edges = []
new_edges = []
for node in m['nodes']:
# is the node a simple link? if so, we might be able to remove it
node_name = node['name']
node_type = node['type'].lower()
if set(node.keys()) == {'name', 'type'} \
or delete_gauges and node_type == 'rivergauge' \
or node_type == 'storage' and not node.get('max_volume'):
if delete_gauges and node_type == 'rivergauge':
obsolete_gauges.append(node_name)
if down_nodes.count(node_name) == 0:
# upstream-most node
obsolete_nodes.append(node_name)
obsolete_edges.append(down_edges[node_name])
mission_complete = False
break
elif up_nodes.count(node_name) == 1:
obsolete_nodes.append(node_name)
down_edge = down_edges[node_name]
obsolete_edges.append(down_edge)
for up_edge in up_edges[node_name]:
obsolete_edges.append(up_edge)
new_edges.append([up_edge[0], down_edge[1]])
mission_complete = False
break
m['nodes'] = [node for node in m['nodes'] if node['name'] not in obsolete_nodes]
m['edges'] = [edge for edge in m['edges'] if edge not in obsolete_edges] + new_edges
edges_set = []
for edge in m['edges']:
if edge not in edges_set:
edges_set.append(edge)
m['edges'] = edges_set
for gauge in obsolete_gauges:
for p in list(m['parameters']):
parts = p.split('/')
if len(parts) > 1 and parts[1] == p:
m['parameters'].pop(p, None)
for r in list(m['recorders']):
if m['recorders'][r].get('node') == gauge:
m['recorders'].pop(r, None)
return m
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment