Skip to content

Instantly share code, notes, and snippets.

@thomasdullien
Created April 4, 2025 06:28
Show Gist options
  • Save thomasdullien/951189573242916dc5e768cfd4f5fed9 to your computer and use it in GitHub Desktop.
Save thomasdullien/951189573242916dc5e768cfd4f5fed9 to your computer and use it in GitHub Desktop.
Leaky ReLU MLP training dynamics visualisation PyTorch code
import os
import imageio
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import math
import cv2
from torch.utils.data import DataLoader, TensorDataset
import matplotlib.pyplot as plt
import random
import base64
### Neural Network Definition ###
class ReLUNetwork(nn.Module):
def __init__(self, input_dim, layer_sizes, debug=False):
super(ReLUNetwork, self).__init__()
layers = []
self.activation_layers = []
self.debug = debug
prev_dim = input_dim
for size in layer_sizes:
# Create linear layer
linear_layer = nn.Linear(prev_dim, size)
# Initialize weights using He initialization
nn.init.kaiming_normal_(linear_layer.weight, mode='fan_in', nonlinearity='leaky_relu')
# Initialize biases to small positive values
nn.init.constant_(linear_layer.bias, 0.1)
layers.append(linear_layer)
# Use LeakyReLU with a small negative slope (0.01 is the default)
leaky_relu_layer = nn.LeakyReLU(negative_slope=0.01)
layers.append(leaky_relu_layer)
self.activation_layers.append(leaky_relu_layer)
prev_dim = size
# Create and initialize output layer
output_layer = nn.Linear(prev_dim, 1)
nn.init.kaiming_normal_(output_layer.weight, mode='fan_in', nonlinearity='leaky_relu')
nn.init.constant_(output_layer.bias, 0.1)
layers.append(output_layer)
self.network = nn.Sequential(*layers)
def forward(self, x):
activations = []
# Debug: Track the range of values at each layer
if self.debug:
print("\nLayer activations:")
current = x
for i, layer in enumerate(self.network):
current = layer(current)
if isinstance(layer, nn.LeakyReLU):
# Count dead neurons (output = 0)
dead_neurons = (current == 0).float().mean().item()
if self.debug:
print(f"Layer {i//2} LeakyReLU: dead neurons = {dead_neurons:.2%}, range = [{current.min():.6f}, {current.max():.6f}]")
activations.append((current > 0).int()) # Activation tracking
elif isinstance(layer, nn.Linear) and self.debug:
print(f"Layer {i//2} Linear: range = [{current.min():.6f}, {current.max():.6f}]")
activation_pattern = torch.cat(activations, dim=1) if activations else None
return current, activation_pattern
### Data Preprocessing ###
def preprocess_image(image_path):
image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
if image is None:
raise ValueError(f"Could not open image: {image_path}")
image = image.astype(np.float32) / 255.0
height, width = image.shape
data = [(j / width, i / height, image[i, j]) for i in range(height) for j in range(width)]
return np.array(data)
def preprocess_video(video_path):
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise ValueError(f"Could not open video file: {video_path}")
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
data = []
frame_idx = 0
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY).astype(np.float32) / 255.0
t_norm = frame_idx / frame_count
for i in range(height):
for j in range(width):
data.append((j / width, i / height, t_norm, gray_frame[i, j]))
frame_idx += 1
cap.release()
return np.array(data)
### Sampling ###
def sample_data(data, train_size, val_size):
if train_size + val_size > len(data):
raise ValueError("Requested training + validation size exceeds dataset size.")
np.random.shuffle(data)
return data[:train_size], data[train_size:train_size + val_size]
def train_network(network, train_data, val_data, epochs=10, batch_size=64, learning_rate=0.001):
"""
Train the ReLU network using GPU acceleration where available.
"""
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
network.to(device) # Move model to GPU
# Convert numpy data to PyTorch tensors and move them to GPU
train_inputs = torch.tensor(train_data[:, :-1], dtype=torch.float32).to(device)
train_targets = torch.tensor(train_data[:, -1], dtype=torch.float32).unsqueeze(1).to(device)
val_inputs = torch.tensor(val_data[:, :-1], dtype=torch.float32).to(device)
val_targets = torch.tensor(val_data[:, -1], dtype=torch.float32).unsqueeze(1).to(device)
train_loader = DataLoader(TensorDataset(train_inputs, train_targets), batch_size=batch_size, shuffle=True)
val_loader = DataLoader(TensorDataset(val_inputs, val_targets), batch_size=batch_size)
criterion = nn.MSELoss()
optimizer = optim.Adam(network.parameters(), lr=learning_rate)
for epoch in range(epochs):
network.train()
epoch_loss = 0.0
for batch_inputs, batch_targets in train_loader:
batch_inputs, batch_targets = batch_inputs.to(device), batch_targets.to(device) # Move to GPU
optimizer.zero_grad()
outputs, _ = network(batch_inputs)
loss = criterion(outputs, batch_targets)
loss.backward()
optimizer.step()
epoch_loss += loss.item()
# Evaluate on validation set
network.eval()
val_loss = 0.0
with torch.no_grad():
for val_inputs, val_targets in val_loader:
val_inputs, val_targets = val_inputs.to(device), val_targets.to(device) # Move to GPU
val_outputs, _ = network(val_inputs)
val_loss += criterion(val_outputs, val_targets).item()
# Prevent memory buildup
if epoch % 100 == 0:
torch.cuda.empty_cache()
print(f"Epoch {epoch+1}/{epochs} - Train Loss: {epoch_loss/len(train_loader):.4f}, Val Loss: {val_loss/len(val_loader):.4f}")
return epoch_loss/len(train_loader), val_loss/len(val_loader)
def visualize_decision_boundary_with_predictions(network, data, train_data, val_data, image_shape, output_path, target_image_path, train_loss=None, val_loss=None, network_shape_str=None, random_seed=None):
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
network.to(device) # Ensure network is on GPU
height, width = image_shape
activation_map = np.zeros((height, width), dtype=np.uint64)
prediction_map = np.zeros((height, width), dtype=np.float32)
boundary_map = np.zeros((height, width), dtype=np.uint8)
# Move input data to GPU
inputs = torch.tensor(data[:, :-1], dtype=torch.float32).to(device)
try:
with torch.no_grad():
outputs, activation_patterns = network(inputs)
except Exception as e:
print(f"Error during network forward pass: {str(e)}")
print(f"Input shape: {inputs.shape}")
print(f"Input range: min={inputs.min():.6f}, max={inputs.max():.6f}")
print(f"Input dtype: {inputs.dtype}")
raise ValueError(f"Network forward pass failed: {str(e)}")
# Move back to CPU only when necessary
outputs = outputs.detach().cpu().numpy().flatten()
activation_patterns = activation_patterns.detach().cpu().numpy()
# Debug network outputs
print(f"Network outputs (len {len(outputs)}) range: min={outputs.min():.6f}, max={outputs.max():.6f}")
if outputs.min() == outputs.max():
print("ERROR: Network outputs are exactly constant (min == max). This indicates a serious problem with the network.")
dump_network_weights(network, "weights.txt")
exit(1)
points_processed = 0
for i, (x_norm, y_norm, _) in enumerate(data):
x, y = int(x_norm * width), int(y_norm * height)
activation_map[y, x] = hash(tuple(activation_patterns[i])) & 0xFFFFFFFFFFFFFFFF
# Scale network output (0-1) back to 0-255 range and clamp values
prediction_map[y, x] = np.clip(outputs[i] * 255, 0, 255)
points_processed += 1
if points_processed == 0:
raise ValueError("No points were processed in the activation map loop. Check data format and dimensions.")
print(f"Prediction map range before uint8: min={prediction_map.min():.6f}, max={prediction_map.max():.6f}")
# Detect decision boundaries
for y in range(1, height - 1):
for x in range(1, width - 1):
neighbors = [
activation_map[y - 1, x], activation_map[y + 1, x],
activation_map[y, x - 1], activation_map[y, x + 1]
]
if any(n != activation_map[y, x] for n in neighbors):
boundary_map[y, x] = 255
# Convert prediction map to RGB
prediction_map = prediction_map.astype(np.uint8)
print(f"Prediction map range after uint8: min={prediction_map.min()}, max={prediction_map.max()}")
rgb_prediction = cv2.cvtColor(prediction_map, cv2.COLOR_GRAY2RGB)
# Create version without boundaries
rgb_prediction_no_boundaries = rgb_prediction.copy()
# Overlay decision boundaries in red
rgb_prediction[boundary_map == 255] = [255, 0, 0]
target_image = cv2.imread(target_image_path, cv2.IMREAD_GRAYSCALE)
target_image = cv2.resize(target_image, (width, height), interpolation=cv2.INTER_NEAREST) # Preserve sharp edges
target_image_rgb = cv2.cvtColor(target_image, cv2.COLOR_GRAY2RGB).astype(np.uint8) # Ensure correct format
# Apply training points (pure red)
for (x_norm, y_norm, _) in train_data:
x = int(np.round(x_norm * width))
y = int(np.round(y_norm * height))
target_image_rgb[y, x] = (0, 0, 0) # Ensure no grayscale influence
target_image_rgb[y, x] = (255, 0, 0) # Apply red
# Apply validation points (pure blue)
for (x_norm, y_norm, _) in val_data:
x = int(np.round(x_norm * width))
y = int(np.round(y_norm * height))
target_image_rgb[y, x] = (0, 0, 0) # Ensure no grayscale influence
target_image_rgb[y, x] = (0, 0, 255) # Apply blue
# Concatenate three images side by side
combined_image = np.hstack((rgb_prediction, rgb_prediction_no_boundaries, target_image_rgb))
# Get epoch number from filename
epoch_num = int(output_path.split('_epoch_')[1].split('.')[0])
# Create figure and display image
plt.figure(figsize=(18, 6))
plt.imshow(combined_image, interpolation='nearest', vmin=0, vmax=255)
# Add text using matplotlib with increased vertical spacing
# Position text at the start of the third image (2*width pixels from the left)
text_x = 2 * width + 10 # Start 10 pixels into the third image
text_y = 30 # Initial y position
vertical_spacing = 30 # Spacing between lines
plt.text(text_x, text_y, f'Epoch: {epoch_num}', color='red', fontsize=10)
if train_loss is not None:
plt.text(text_x, text_y + vertical_spacing, f'Train Loss: {train_loss:.4f}', color='red', fontsize=10)
if val_loss is not None:
plt.text(text_x, text_y + 2*vertical_spacing, f'Val Loss: {val_loss:.4f}', color='red', fontsize=10)
if network_shape_str is not None:
plt.text(text_x, text_y + 3*vertical_spacing, f'Shape: {network_shape_str}', color='red', fontsize=10)
if random_seed is not None:
plt.text(text_x, text_y + 4*vertical_spacing, f'Seed: {random_seed}', color='red', fontsize=10)
plt.axis("off")
plt.savefig(output_path, bbox_inches='tight', pad_inches=0)
plt.show()
plt.close()
def dump_network_weights(network, filename):
"""Dump network weights to a file in a human-readable format."""
with open(filename, 'w') as f:
# Count the number of linear layers to determine network architecture
linear_layers = [m for m in network.network if isinstance(m, nn.Linear)]
f.write(f"Network architecture: {[l.in_features for l in linear_layers] + [linear_layers[-1].out_features]}\n")
f.write(f"Number of layers: {len(linear_layers)}\n\n")
# Iterate through the sequential layers
for i, module in enumerate(network.network):
if isinstance(module, nn.Linear):
f.write(f"Layer {i} (Linear):\n")
f.write(f" Shape: {module.weight.shape}\n")
f.write(f" Weight range: min={module.weight.min():.6f}, max={module.weight.max():.6f}\n")
f.write(f" Weight mean: {module.weight.mean():.6f}, std: {module.weight.std():.6f}\n")
f.write(f" Bias range: min={module.bias.min():.6f}, max={module.bias.max():.6f}\n")
f.write(f" Bias mean: {module.bias.mean():.6f}, std: {module.bias.std():.6f}\n\n")
# Write weight matrix
f.write(" Weight matrix:\n")
weight_matrix = module.weight.detach().cpu().numpy()
for row in weight_matrix:
f.write(" " + " ".join(f"{x:.6f}" for x in row) + "\n")
# Write bias vector
f.write("\n Bias vector:\n")
bias_vector = module.bias.detach().cpu().numpy()
f.write(" " + " ".join(f"{x:.6f}" for x in bias_vector) + "\n\n")
elif isinstance(module, nn.LeakyReLU):
f.write(f"Layer {i} (LeakyReLU activation)\n\n")
### Full Training and Visualization Pipeline ###
def full_pipeline(
input_path,
is_video=False,
train_size=5000,
val_size=1000,
layer_sizes=[10, 10, 10, 10, 10, 10, 10, 10],
epochs=10,
batch_size=1024,
learning_rate=0.001,
output_dir="results",
random_seed=None,
network_shape_b64=None,
network_shape_str=None,
debug=False
):
os.makedirs(output_dir, exist_ok=True)
# Step 1: Preprocess data
if is_video:
data = preprocess_video(input_path)
height, width = 64, 64 # Assuming a fixed resolution for visualization
else:
data = preprocess_image(input_path)
img = cv2.imread(input_path, cv2.IMREAD_GRAYSCALE)
height, width = img.shape
# Step 2: Sample training and validation data
train_data, val_data = sample_data(data, train_size, val_size)
# Step 3: Initialize the network
input_dim = 3 if is_video else 2
network = ReLUNetwork(input_dim, layer_sizes, debug=debug)
trainable_params = sum(p.numel() for p in network.parameters() if p.requires_grad)
print(f"Trainable parameters: {trainable_params}")
from torchinfo import summary
summary(network, input_size=(1024, 2))
# Step 4: Train and visualize decision boundaries
boundary_frames = []
for epoch in range(epochs):
print(f"Epoch {epoch + 1}/{epochs}")
train_loss, val_loss = train_network(network, train_data, val_data, epochs=1, batch_size=batch_size, learning_rate=learning_rate)
epoch_str = "%04d" % (epoch + 1)
seed_str = str(random_seed) if random_seed is not None else "none"
output_path = os.path.join(output_dir, f"{os.path.basename(input_path)}_{network_shape_b64}_{seed_str}_epoch_{epoch_str}.png")
visualize_decision_boundary_with_predictions(
network, data, train_data, val_data,
(height, width), output_path, input_path,
train_loss=train_loss, val_loss=val_loss,
network_shape_str=network_shape_str,
random_seed=random_seed
)
#visualize_decision_boundary_with_predictions(network, data, (height, width), output_path)
#boundary_frames.append(imageio.imread(output_path))
# Step 5: Save video if processing a video input
if is_video:
video_output_path = os.path.join(output_dir, "boundary_evolution.mp4")
imageio.mimsave(video_output_path, boundary_frames, fps=5)
print(f"Boundary evolution video saved at: {video_output_path}")
# Step 6: Dump network weights
weights_filename = os.path.join(output_dir, f"weights_{network_shape_b64}_{seed_str}.txt")
dump_network_weights(network, weights_filename)
import sys
import imageio
#print(torch.cuda.get_device_name(0))
if len(sys.argv) < 2:
print("Usage: python experiments2.py <input_image> <NN_shape> <epochs> [random_seed] [--debug]")
exit(1)
input_image = sys.argv[1]
network_shape_str = sys.argv[2] # Get the raw string
network_shape_b64 = base64.b64encode(network_shape_str.encode()).decode() # Encode it before evaluation
NN_shape = eval(network_shape_str) # Now evaluate it
nepochs = int(sys.argv[3])
random_seed = int(sys.argv[4]) if len(sys.argv) > 4 else None
debug = "--debug" in sys.argv
# Set random seed if provided
if random_seed is not None:
random.seed(random_seed)
np.random.seed(random_seed)
torch.manual_seed(random_seed)
if torch.cuda.is_available():
torch.cuda.manual_seed(random_seed)
torch.cuda.manual_seed_all(random_seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
# A number of different ReLU network shapes, all with the same number of
# parameters, but with depth varying from 2 layers to 10.
#shapes_to_try = []
#for i in range(2, 14):
# shape = generate_network_shape(1024, i)
# shapes_to_try.append(shape)
#print(shapes_to_try)
#for shape in shapes_to_try:
# Spawn a new Python interpreter process to deal with various memory leaks
# and run the full pipeline for that shape:
# print(f"Trying shape: {shape}")
full_pipeline(input_image, is_video=False, epochs=nepochs, layer_sizes=NN_shape, random_seed=random_seed, network_shape_b64=network_shape_b64, network_shape_str=network_shape_str, debug=debug)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment