Skip to content

Instantly share code, notes, and snippets.

@daviddwlee84
Created September 20, 2024 05:30
Show Gist options
  • Save daviddwlee84/04d21a2374b40f75cb5a9f8c795b88d4 to your computer and use it in GitHub Desktop.
Save daviddwlee84/04d21a2374b40f75cb5a9f8c795b88d4 to your computer and use it in GitHub Desktop.
Negative Cycle Detector (using NetworkX)
# Python 3.11
# https://peps.python.org/pep-0673/
from typing import Literal, Self
import pandas as pd
import networkx as nx
import numpy as np
import matplotlib.pyplot as plt
class NegCycleDetector:
def __init__(self, graph: nx.DiGraph, commission: float = 0.0) -> None:
self.graph = graph
self.commission = commission
@classmethod
def from_weighted_edges(
cls, weighted_edges: list[tuple[str, str, float]], **kwargs
) -> Self:
graph = nx.DiGraph()
graph.add_weighted_edges_from(weighted_edges)
return cls(graph, **kwargs)
@classmethod
def from_adj_matrix(cls, adj_matrix_df: pd.DataFrame, **kwargs) -> Self:
graph = nx.DiGraph(-np.log(adj_matrix_df.fillna(1)).T)
return cls(graph, **kwargs)
def _has_negative_cycle(self) -> bool:
return nx.negative_edge_cycle(self.graph)
@staticmethod
def _bellman_ford_find_negative_cycles(
graph: nx.DiGraph, source: str
) -> list[list[str]]:
"""
Bellman-Ford Algorithm Overview:
The Bellman-Ford algorithm is commonly used for finding the shortest paths
from a single source node to all other nodes in a graph.
It can also detect if there are negative weight cycles in the graph,
which is the focus of this implementation.
Key Concepts:
- Relaxation: For each edge (u, v) in the graph, the algorithm checks
if the distance to v can be minimized by passing through u.
If so, the distance to v is updated (relaxed).
- Negative Cycles: If after n-1 relaxations (where n is the number of nodes),
you can still relax an edge, then a negative weight cycle exists.
This is because a negative weight cycle would allow the distance to keep decreasing indefinitely.
"""
# Initialize distance and predecessor dictionaries
distance = {node: float("inf") for node in graph.nodes}
distance[source] = 0
predecessor = {node: None for node in graph.nodes}
# Relax edges |V| - 1 times
for _ in range(len(graph.nodes) - 1):
for u, v, data in graph.edges(data=True):
weight = data["weight"]
if distance[u] + weight < distance[v]:
distance[v] = distance[u] + weight
predecessor[v] = u
# Check for negative-weight cycles and backtrack to find cycles
negative_cycles = []
for u, v, data in graph.edges(data=True):
weight = data["weight"]
if distance[u] + weight < distance[v]:
# Negative cycle detected, now backtrack to find the full cycle
cycle = []
visited = set()
node = v
# Backtrack until we revisit the same node to form a cycle
while node not in visited:
visited.add(node)
cycle.append(node)
node = predecessor[node]
cycle.append(node) # Close the cycle by appending the start node again
# Reorder the cycle to start with the first node in the cycle
cycle_start = node
cycle = cycle[
cycle.index(cycle_start) :
] # Remove any prefix before the cycle starts
if set(cycle) not in [
set(c) for c in negative_cycles
]: # Avoid duplicate cycles
negative_cycles.append(
cycle[::-1]
) # Reverse to have the cycle in the correct order
return negative_cycles
@staticmethod
def _floyd_warshall_find_negative_cycles(graph: nx.DiGraph) -> list[list[str]]:
V = list(graph.nodes())
dist = {v: {u: float("inf") for u in graph} for v in graph}
pred = {v: {u: None for u in graph} for v in graph}
# Initialize the distance and predecessor
for v in V:
dist[v][v] = 0
for u, v, data in graph.edges(data=True):
dist[u][v] = data["weight"]
pred[u][v] = u
# Floyd-Warshall with path reconstruction
for k in V:
for i in V:
for j in V:
if dist[i][j] > dist[i][k] + dist[k][j]:
dist[i][j] = dist[i][k] + dist[k][j]
pred[i][j] = pred[k][j]
# Detect and extract negative cycles
negative_cycles = []
for v in V:
if dist[v][v] < 0: # A negative cycle is reachable from v
# Reconstruct the cycle
cycle = [v]
while True:
v = pred[v][cycle[-1]]
if v in cycle:
cycle.append(v)
cycle = cycle[cycle.index(v) :]
break
cycle.append(v)
if cycle not in negative_cycles:
negative_cycles.append(cycle)
return negative_cycles
def find_all_negative_cycles(
self,
algorithm: Literal[
"bellman_ford", "floyd_warshall", "networkx_bellman_ford"
] = "networkx_bellman_ford",
) -> list[list[str]]:
negative_cycles = []
for node in self.graph.nodes:
if algorithm == "bellman_ford":
cycles = self._bellman_ford_find_negative_cycles(
self.graph, source=node
)
elif algorithm == "floyd_warshall":
cycles = self._floyd_warshall_find_negative_cycles(self.graph)
elif algorithm == "networkx_bellman_ford":
# https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.shortest_paths.weighted.find_negative_cycle.html
cycles = (
[nx.find_negative_cycle(self.graph, node)]
if self._has_negative_cycle()
else []
)
negative_cycles.extend(cycles)
# Remove duplicates
unique_negative_cycles = []
for cycle in negative_cycles:
if set(cycle) not in [set(c) for c in unique_negative_cycles]:
unique_negative_cycles.append(cycle)
return unique_negative_cycles
def draw_graph(self):
# Define a layout for the graph
pos = nx.circular_layout(self.graph)
# Draw nodes with a specific color and size
nx.draw_networkx_nodes(self.graph, pos, node_color="skyblue", node_size=1000)
# Draw edges with arrows and widths based on the weight
weights = [self.graph[u][v]["weight"] for u, v in self.graph.edges()]
nx.draw_networkx_edges(
self.graph,
pos,
edgelist=self.graph.edges(),
width=[abs(w) for w in weights],
arrowstyle="->",
arrowsize=15,
edge_color="gray",
)
# Draw edge labels (the weights)
nx.draw_networkx_edge_labels(
self.graph,
pos,
edge_labels={
(u, v): d["weight"] for u, v, d in self.graph.edges(data=True)
},
)
# Draw the node labels
nx.draw_networkx_labels(self.graph, pos, font_size=12, font_color="black")
# Display the plot
plt.title("DiGraph with Weighted Edges")
plt.axis("off") # Turn off the axis
plt.show()
if __name__ == "__main__":
# Add weighted edges (source, target, weight)
# Example: G.add_weighted_edges_from([(u, v, weight), (v, w, weight), ...])
detector = NegCycleDetector.from_weighted_edges(
[
("A", "B", 1),
("B", "C", 1),
("C", "A", -3), # This creates a negative cycle: A -> B -> C -> A
("C", "D", 2),
("D", "E", 1),
("E", "C", -4), # This creates another negative cycle: C -> D -> E -> C
]
)
# BUG: failed to find C -> D -> E -> C (only capture A -> B -> C -> A) using Bellman Ford
print("Bellman-Ford:", detector.find_all_negative_cycles("bellman_ford"))
# BUG: failed to find A -> B -> C -> A (only capture C -> D -> E -> C) using Floyd Warshall
print("Floyd Warshall:", detector.find_all_negative_cycles("floyd_warshall"))
# BUG: failed to find A -> B -> C -> A (only capture C -> D -> E -> C) using NetworkX's Bellman Ford
print(
"NetworkX Bellman-Ford:",
detector.find_all_negative_cycles("networkx_bellman_ford"),
)
detector.draw_graph()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment