Created
June 21, 2023 20:07
-
-
Save CyberAstronaut101/7ad6d99e9638fe632c43e212b4ee588d to your computer and use it in GitHub Desktop.
osage house
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Node: | |
def __init__(self, name, capacity, flow=0, is_arrow_head=False): | |
self.name = name | |
self.capacity = capacity | |
self.flow = flow | |
self.is_arrow_head = is_arrow_head | |
class Edge: | |
def __init__(self, start_node, end_node, capacity, is_bidirectional=False): | |
self.start_node = start_node | |
self.end_node = end_node | |
self.capacity = capacity | |
self.is_bidirectional = is_bidirectional | |
class Osage: | |
def __init__(self, nodes={}, edges={}): | |
self.nodes = {} | |
self.edges = {} | |
def print_state(self): | |
print('Nodes:') | |
print(self.nodes) | |
print('Edges:') | |
print(self.edges) | |
def add_node(self, node): | |
self.nodes[node.name] = node | |
def add_edge(self, start_node_name, end_node_name, capacity, is_bidirectional=False): | |
if start_node_name not in self.nodes or end_node_name not in self.nodes: | |
raise ValueError("Both nodes need to exist in graph!") | |
edge = Edge(self.nodes[start_node_name], | |
self.nodes[end_node_name], capacity, is_bidirectional) | |
self.edges[(start_node_name, end_node_name)] = edge | |
if is_bidirectional: | |
rev_edge = Edge( | |
self.nodes[end_node_name], self.nodes[start_node_name], capacity, is_bidirectional) | |
self.edges[(end_node_name, start_node_name)] = rev_edge | |
def bfs(self, source, sink, parent): | |
visited = {node: False for node in self.nodes} | |
queue = [source] | |
visited[source] = True | |
while queue: | |
current_node = queue.pop(0) | |
for (start, end), edge in self.edges.items(): | |
if visited[end] is False and edge.capacity > 0 and start == current_node: | |
queue.append(end) | |
visited[end] = True | |
parent[end] = current_node | |
if end == sink: | |
return True | |
return False | |
def ford_fulkerson(self, source, sink): | |
parent = {node: -1 for node in self.nodes} | |
max_flow = 0 | |
while self.bfs(source, sink, parent): | |
path_flow = float("Inf") | |
s = sink | |
while s != source: | |
path_flow = min(path_flow, self.edges[(parent[s], s)].capacity) | |
s = parent[s] | |
max_flow += path_flow | |
v = sink | |
while v != source: | |
u = parent[v] | |
self.edges[(u, v)].capacity -= path_flow | |
if self.edges[(u, v)].is_bidirectional: | |
self.edges[(v, u)].capacity += path_flow | |
v = parent[v] | |
return max_flow | |
def check_arrow_heads(self): | |
""" | |
Check if any of the sink nodes, or generators, are constricted | |
constricted == requested flow is more than their capacity | |
""" | |
constricted_arrow_heads = [] | |
for node in self.nodes.values(): | |
if node.is_arrow_head and node.flow > node.capacity: | |
constricted_arrow_heads.append(node.name) | |
return constricted_arrow_heads | |
# # Define nodes | |
# nodes = [Node('A', 100, 40, False), Node( | |
# 'B', 60, 40, False), Node('C', 20, 40, False)] | |
# # Define edges | |
# edges = [ | |
# Edge(nodes[0], nodes[1], 16), | |
# Edge(nodes[0], nodes[2], 13), | |
# Edge(nodes[1], nodes[2], 10) | |
# ] | |
# # Create network | |
# network = Osage(nodes, edges) | |
# network.print_state() | |
# # Define source and sink | |
# source = nodes[0] | |
# sink = nodes[2] | |
# # Compute max flow | |
# max_flow = network.ford_fulkerson(source, sink) | |
# print('Max flow:', max_flow) | |
# Create Osage object | |
osage = Osage() | |
# Add nodes | |
osage.add_node(Node('A', 20)) | |
osage.add_node(Node('B', 15)) | |
osage.add_node(Node('C', 10)) | |
osage.add_node(Node('D', 5, is_arrow_head=True)) | |
# Add edges | |
osage.add_edge('A', 'B', 10) | |
osage.add_edge('B', 'C', 5) | |
osage.add_edge('C', 'D', 10) | |
osage.add_edge('A', 'D', 5) | |
# Run Ford-Fulkerson algorithm | |
max_flow = osage.ford_fulkerson('A', 'D') | |
print(f"Maximum Flow: {max_flow}") | |
# Check for constricted arrow heads | |
constricted_arrow_heads = osage.check_arrow_heads() | |
if constricted_arrow_heads: | |
print("Constricted Arrow Heads:") | |
for node in constricted_arrow_heads: | |
print(node) | |
else: | |
print("No constricted Arrow Heads") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment