Created
September 20, 2024 05:30
-
-
Save daviddwlee84/04d21a2374b40f75cb5a9f8c795b88d4 to your computer and use it in GitHub Desktop.
Negative Cycle Detector (using NetworkX)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Python 3.11 | |
# https://peps.python.org/pep-0673/ | |
from typing import Literal, Self | |
import pandas as pd | |
import networkx as nx | |
import numpy as np | |
import matplotlib.pyplot as plt | |
class NegCycleDetector: | |
def __init__(self, graph: nx.DiGraph, commission: float = 0.0) -> None: | |
self.graph = graph | |
self.commission = commission | |
@classmethod | |
def from_weighted_edges( | |
cls, weighted_edges: list[tuple[str, str, float]], **kwargs | |
) -> Self: | |
graph = nx.DiGraph() | |
graph.add_weighted_edges_from(weighted_edges) | |
return cls(graph, **kwargs) | |
@classmethod | |
def from_adj_matrix(cls, adj_matrix_df: pd.DataFrame, **kwargs) -> Self: | |
graph = nx.DiGraph(-np.log(adj_matrix_df.fillna(1)).T) | |
return cls(graph, **kwargs) | |
def _has_negative_cycle(self) -> bool: | |
return nx.negative_edge_cycle(self.graph) | |
@staticmethod | |
def _bellman_ford_find_negative_cycles( | |
graph: nx.DiGraph, source: str | |
) -> list[list[str]]: | |
""" | |
Bellman-Ford Algorithm Overview: | |
The Bellman-Ford algorithm is commonly used for finding the shortest paths | |
from a single source node to all other nodes in a graph. | |
It can also detect if there are negative weight cycles in the graph, | |
which is the focus of this implementation. | |
Key Concepts: | |
- Relaxation: For each edge (u, v) in the graph, the algorithm checks | |
if the distance to v can be minimized by passing through u. | |
If so, the distance to v is updated (relaxed). | |
- Negative Cycles: If after n-1 relaxations (where n is the number of nodes), | |
you can still relax an edge, then a negative weight cycle exists. | |
This is because a negative weight cycle would allow the distance to keep decreasing indefinitely. | |
""" | |
# Initialize distance and predecessor dictionaries | |
distance = {node: float("inf") for node in graph.nodes} | |
distance[source] = 0 | |
predecessor = {node: None for node in graph.nodes} | |
# Relax edges |V| - 1 times | |
for _ in range(len(graph.nodes) - 1): | |
for u, v, data in graph.edges(data=True): | |
weight = data["weight"] | |
if distance[u] + weight < distance[v]: | |
distance[v] = distance[u] + weight | |
predecessor[v] = u | |
# Check for negative-weight cycles and backtrack to find cycles | |
negative_cycles = [] | |
for u, v, data in graph.edges(data=True): | |
weight = data["weight"] | |
if distance[u] + weight < distance[v]: | |
# Negative cycle detected, now backtrack to find the full cycle | |
cycle = [] | |
visited = set() | |
node = v | |
# Backtrack until we revisit the same node to form a cycle | |
while node not in visited: | |
visited.add(node) | |
cycle.append(node) | |
node = predecessor[node] | |
cycle.append(node) # Close the cycle by appending the start node again | |
# Reorder the cycle to start with the first node in the cycle | |
cycle_start = node | |
cycle = cycle[ | |
cycle.index(cycle_start) : | |
] # Remove any prefix before the cycle starts | |
if set(cycle) not in [ | |
set(c) for c in negative_cycles | |
]: # Avoid duplicate cycles | |
negative_cycles.append( | |
cycle[::-1] | |
) # Reverse to have the cycle in the correct order | |
return negative_cycles | |
@staticmethod | |
def _floyd_warshall_find_negative_cycles(graph: nx.DiGraph) -> list[list[str]]: | |
V = list(graph.nodes()) | |
dist = {v: {u: float("inf") for u in graph} for v in graph} | |
pred = {v: {u: None for u in graph} for v in graph} | |
# Initialize the distance and predecessor | |
for v in V: | |
dist[v][v] = 0 | |
for u, v, data in graph.edges(data=True): | |
dist[u][v] = data["weight"] | |
pred[u][v] = u | |
# Floyd-Warshall with path reconstruction | |
for k in V: | |
for i in V: | |
for j in V: | |
if dist[i][j] > dist[i][k] + dist[k][j]: | |
dist[i][j] = dist[i][k] + dist[k][j] | |
pred[i][j] = pred[k][j] | |
# Detect and extract negative cycles | |
negative_cycles = [] | |
for v in V: | |
if dist[v][v] < 0: # A negative cycle is reachable from v | |
# Reconstruct the cycle | |
cycle = [v] | |
while True: | |
v = pred[v][cycle[-1]] | |
if v in cycle: | |
cycle.append(v) | |
cycle = cycle[cycle.index(v) :] | |
break | |
cycle.append(v) | |
if cycle not in negative_cycles: | |
negative_cycles.append(cycle) | |
return negative_cycles | |
def find_all_negative_cycles( | |
self, | |
algorithm: Literal[ | |
"bellman_ford", "floyd_warshall", "networkx_bellman_ford" | |
] = "networkx_bellman_ford", | |
) -> list[list[str]]: | |
negative_cycles = [] | |
for node in self.graph.nodes: | |
if algorithm == "bellman_ford": | |
cycles = self._bellman_ford_find_negative_cycles( | |
self.graph, source=node | |
) | |
elif algorithm == "floyd_warshall": | |
cycles = self._floyd_warshall_find_negative_cycles(self.graph) | |
elif algorithm == "networkx_bellman_ford": | |
# https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.shortest_paths.weighted.find_negative_cycle.html | |
cycles = ( | |
[nx.find_negative_cycle(self.graph, node)] | |
if self._has_negative_cycle() | |
else [] | |
) | |
negative_cycles.extend(cycles) | |
# Remove duplicates | |
unique_negative_cycles = [] | |
for cycle in negative_cycles: | |
if set(cycle) not in [set(c) for c in unique_negative_cycles]: | |
unique_negative_cycles.append(cycle) | |
return unique_negative_cycles | |
def draw_graph(self): | |
# Define a layout for the graph | |
pos = nx.circular_layout(self.graph) | |
# Draw nodes with a specific color and size | |
nx.draw_networkx_nodes(self.graph, pos, node_color="skyblue", node_size=1000) | |
# Draw edges with arrows and widths based on the weight | |
weights = [self.graph[u][v]["weight"] for u, v in self.graph.edges()] | |
nx.draw_networkx_edges( | |
self.graph, | |
pos, | |
edgelist=self.graph.edges(), | |
width=[abs(w) for w in weights], | |
arrowstyle="->", | |
arrowsize=15, | |
edge_color="gray", | |
) | |
# Draw edge labels (the weights) | |
nx.draw_networkx_edge_labels( | |
self.graph, | |
pos, | |
edge_labels={ | |
(u, v): d["weight"] for u, v, d in self.graph.edges(data=True) | |
}, | |
) | |
# Draw the node labels | |
nx.draw_networkx_labels(self.graph, pos, font_size=12, font_color="black") | |
# Display the plot | |
plt.title("DiGraph with Weighted Edges") | |
plt.axis("off") # Turn off the axis | |
plt.show() | |
if __name__ == "__main__": | |
# Add weighted edges (source, target, weight) | |
# Example: G.add_weighted_edges_from([(u, v, weight), (v, w, weight), ...]) | |
detector = NegCycleDetector.from_weighted_edges( | |
[ | |
("A", "B", 1), | |
("B", "C", 1), | |
("C", "A", -3), # This creates a negative cycle: A -> B -> C -> A | |
("C", "D", 2), | |
("D", "E", 1), | |
("E", "C", -4), # This creates another negative cycle: C -> D -> E -> C | |
] | |
) | |
# BUG: failed to find C -> D -> E -> C (only capture A -> B -> C -> A) using Bellman Ford | |
print("Bellman-Ford:", detector.find_all_negative_cycles("bellman_ford")) | |
# BUG: failed to find A -> B -> C -> A (only capture C -> D -> E -> C) using Floyd Warshall | |
print("Floyd Warshall:", detector.find_all_negative_cycles("floyd_warshall")) | |
# BUG: failed to find A -> B -> C -> A (only capture C -> D -> E -> C) using NetworkX's Bellman Ford | |
print( | |
"NetworkX Bellman-Ford:", | |
detector.find_all_negative_cycles("networkx_bellman_ford"), | |
) | |
detector.draw_graph() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment