Created
April 4, 2025 06:28
-
-
Save thomasdullien/951189573242916dc5e768cfd4f5fed9 to your computer and use it in GitHub Desktop.
Leaky ReLU MLP training dynamics visualisation PyTorch code
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import imageio | |
import torch | |
import torch.nn as nn | |
import torch.optim as optim | |
import numpy as np | |
import math | |
import cv2 | |
from torch.utils.data import DataLoader, TensorDataset | |
import matplotlib.pyplot as plt | |
import random | |
import base64 | |
### Neural Network Definition ### | |
class ReLUNetwork(nn.Module): | |
def __init__(self, input_dim, layer_sizes, debug=False): | |
super(ReLUNetwork, self).__init__() | |
layers = [] | |
self.activation_layers = [] | |
self.debug = debug | |
prev_dim = input_dim | |
for size in layer_sizes: | |
# Create linear layer | |
linear_layer = nn.Linear(prev_dim, size) | |
# Initialize weights using He initialization | |
nn.init.kaiming_normal_(linear_layer.weight, mode='fan_in', nonlinearity='leaky_relu') | |
# Initialize biases to small positive values | |
nn.init.constant_(linear_layer.bias, 0.1) | |
layers.append(linear_layer) | |
# Use LeakyReLU with a small negative slope (0.01 is the default) | |
leaky_relu_layer = nn.LeakyReLU(negative_slope=0.01) | |
layers.append(leaky_relu_layer) | |
self.activation_layers.append(leaky_relu_layer) | |
prev_dim = size | |
# Create and initialize output layer | |
output_layer = nn.Linear(prev_dim, 1) | |
nn.init.kaiming_normal_(output_layer.weight, mode='fan_in', nonlinearity='leaky_relu') | |
nn.init.constant_(output_layer.bias, 0.1) | |
layers.append(output_layer) | |
self.network = nn.Sequential(*layers) | |
def forward(self, x): | |
activations = [] | |
# Debug: Track the range of values at each layer | |
if self.debug: | |
print("\nLayer activations:") | |
current = x | |
for i, layer in enumerate(self.network): | |
current = layer(current) | |
if isinstance(layer, nn.LeakyReLU): | |
# Count dead neurons (output = 0) | |
dead_neurons = (current == 0).float().mean().item() | |
if self.debug: | |
print(f"Layer {i//2} LeakyReLU: dead neurons = {dead_neurons:.2%}, range = [{current.min():.6f}, {current.max():.6f}]") | |
activations.append((current > 0).int()) # Activation tracking | |
elif isinstance(layer, nn.Linear) and self.debug: | |
print(f"Layer {i//2} Linear: range = [{current.min():.6f}, {current.max():.6f}]") | |
activation_pattern = torch.cat(activations, dim=1) if activations else None | |
return current, activation_pattern | |
### Data Preprocessing ### | |
def preprocess_image(image_path): | |
image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE) | |
if image is None: | |
raise ValueError(f"Could not open image: {image_path}") | |
image = image.astype(np.float32) / 255.0 | |
height, width = image.shape | |
data = [(j / width, i / height, image[i, j]) for i in range(height) for j in range(width)] | |
return np.array(data) | |
def preprocess_video(video_path): | |
cap = cv2.VideoCapture(video_path) | |
if not cap.isOpened(): | |
raise ValueError(f"Could not open video file: {video_path}") | |
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
data = [] | |
frame_idx = 0 | |
while cap.isOpened(): | |
ret, frame = cap.read() | |
if not ret: | |
break | |
gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY).astype(np.float32) / 255.0 | |
t_norm = frame_idx / frame_count | |
for i in range(height): | |
for j in range(width): | |
data.append((j / width, i / height, t_norm, gray_frame[i, j])) | |
frame_idx += 1 | |
cap.release() | |
return np.array(data) | |
### Sampling ### | |
def sample_data(data, train_size, val_size): | |
if train_size + val_size > len(data): | |
raise ValueError("Requested training + validation size exceeds dataset size.") | |
np.random.shuffle(data) | |
return data[:train_size], data[train_size:train_size + val_size] | |
def train_network(network, train_data, val_data, epochs=10, batch_size=64, learning_rate=0.001): | |
""" | |
Train the ReLU network using GPU acceleration where available. | |
""" | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
network.to(device) # Move model to GPU | |
# Convert numpy data to PyTorch tensors and move them to GPU | |
train_inputs = torch.tensor(train_data[:, :-1], dtype=torch.float32).to(device) | |
train_targets = torch.tensor(train_data[:, -1], dtype=torch.float32).unsqueeze(1).to(device) | |
val_inputs = torch.tensor(val_data[:, :-1], dtype=torch.float32).to(device) | |
val_targets = torch.tensor(val_data[:, -1], dtype=torch.float32).unsqueeze(1).to(device) | |
train_loader = DataLoader(TensorDataset(train_inputs, train_targets), batch_size=batch_size, shuffle=True) | |
val_loader = DataLoader(TensorDataset(val_inputs, val_targets), batch_size=batch_size) | |
criterion = nn.MSELoss() | |
optimizer = optim.Adam(network.parameters(), lr=learning_rate) | |
for epoch in range(epochs): | |
network.train() | |
epoch_loss = 0.0 | |
for batch_inputs, batch_targets in train_loader: | |
batch_inputs, batch_targets = batch_inputs.to(device), batch_targets.to(device) # Move to GPU | |
optimizer.zero_grad() | |
outputs, _ = network(batch_inputs) | |
loss = criterion(outputs, batch_targets) | |
loss.backward() | |
optimizer.step() | |
epoch_loss += loss.item() | |
# Evaluate on validation set | |
network.eval() | |
val_loss = 0.0 | |
with torch.no_grad(): | |
for val_inputs, val_targets in val_loader: | |
val_inputs, val_targets = val_inputs.to(device), val_targets.to(device) # Move to GPU | |
val_outputs, _ = network(val_inputs) | |
val_loss += criterion(val_outputs, val_targets).item() | |
# Prevent memory buildup | |
if epoch % 100 == 0: | |
torch.cuda.empty_cache() | |
print(f"Epoch {epoch+1}/{epochs} - Train Loss: {epoch_loss/len(train_loader):.4f}, Val Loss: {val_loss/len(val_loader):.4f}") | |
return epoch_loss/len(train_loader), val_loss/len(val_loader) | |
def visualize_decision_boundary_with_predictions(network, data, train_data, val_data, image_shape, output_path, target_image_path, train_loss=None, val_loss=None, network_shape_str=None, random_seed=None): | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
network.to(device) # Ensure network is on GPU | |
height, width = image_shape | |
activation_map = np.zeros((height, width), dtype=np.uint64) | |
prediction_map = np.zeros((height, width), dtype=np.float32) | |
boundary_map = np.zeros((height, width), dtype=np.uint8) | |
# Move input data to GPU | |
inputs = torch.tensor(data[:, :-1], dtype=torch.float32).to(device) | |
try: | |
with torch.no_grad(): | |
outputs, activation_patterns = network(inputs) | |
except Exception as e: | |
print(f"Error during network forward pass: {str(e)}") | |
print(f"Input shape: {inputs.shape}") | |
print(f"Input range: min={inputs.min():.6f}, max={inputs.max():.6f}") | |
print(f"Input dtype: {inputs.dtype}") | |
raise ValueError(f"Network forward pass failed: {str(e)}") | |
# Move back to CPU only when necessary | |
outputs = outputs.detach().cpu().numpy().flatten() | |
activation_patterns = activation_patterns.detach().cpu().numpy() | |
# Debug network outputs | |
print(f"Network outputs (len {len(outputs)}) range: min={outputs.min():.6f}, max={outputs.max():.6f}") | |
if outputs.min() == outputs.max(): | |
print("ERROR: Network outputs are exactly constant (min == max). This indicates a serious problem with the network.") | |
dump_network_weights(network, "weights.txt") | |
exit(1) | |
points_processed = 0 | |
for i, (x_norm, y_norm, _) in enumerate(data): | |
x, y = int(x_norm * width), int(y_norm * height) | |
activation_map[y, x] = hash(tuple(activation_patterns[i])) & 0xFFFFFFFFFFFFFFFF | |
# Scale network output (0-1) back to 0-255 range and clamp values | |
prediction_map[y, x] = np.clip(outputs[i] * 255, 0, 255) | |
points_processed += 1 | |
if points_processed == 0: | |
raise ValueError("No points were processed in the activation map loop. Check data format and dimensions.") | |
print(f"Prediction map range before uint8: min={prediction_map.min():.6f}, max={prediction_map.max():.6f}") | |
# Detect decision boundaries | |
for y in range(1, height - 1): | |
for x in range(1, width - 1): | |
neighbors = [ | |
activation_map[y - 1, x], activation_map[y + 1, x], | |
activation_map[y, x - 1], activation_map[y, x + 1] | |
] | |
if any(n != activation_map[y, x] for n in neighbors): | |
boundary_map[y, x] = 255 | |
# Convert prediction map to RGB | |
prediction_map = prediction_map.astype(np.uint8) | |
print(f"Prediction map range after uint8: min={prediction_map.min()}, max={prediction_map.max()}") | |
rgb_prediction = cv2.cvtColor(prediction_map, cv2.COLOR_GRAY2RGB) | |
# Create version without boundaries | |
rgb_prediction_no_boundaries = rgb_prediction.copy() | |
# Overlay decision boundaries in red | |
rgb_prediction[boundary_map == 255] = [255, 0, 0] | |
target_image = cv2.imread(target_image_path, cv2.IMREAD_GRAYSCALE) | |
target_image = cv2.resize(target_image, (width, height), interpolation=cv2.INTER_NEAREST) # Preserve sharp edges | |
target_image_rgb = cv2.cvtColor(target_image, cv2.COLOR_GRAY2RGB).astype(np.uint8) # Ensure correct format | |
# Apply training points (pure red) | |
for (x_norm, y_norm, _) in train_data: | |
x = int(np.round(x_norm * width)) | |
y = int(np.round(y_norm * height)) | |
target_image_rgb[y, x] = (0, 0, 0) # Ensure no grayscale influence | |
target_image_rgb[y, x] = (255, 0, 0) # Apply red | |
# Apply validation points (pure blue) | |
for (x_norm, y_norm, _) in val_data: | |
x = int(np.round(x_norm * width)) | |
y = int(np.round(y_norm * height)) | |
target_image_rgb[y, x] = (0, 0, 0) # Ensure no grayscale influence | |
target_image_rgb[y, x] = (0, 0, 255) # Apply blue | |
# Concatenate three images side by side | |
combined_image = np.hstack((rgb_prediction, rgb_prediction_no_boundaries, target_image_rgb)) | |
# Get epoch number from filename | |
epoch_num = int(output_path.split('_epoch_')[1].split('.')[0]) | |
# Create figure and display image | |
plt.figure(figsize=(18, 6)) | |
plt.imshow(combined_image, interpolation='nearest', vmin=0, vmax=255) | |
# Add text using matplotlib with increased vertical spacing | |
# Position text at the start of the third image (2*width pixels from the left) | |
text_x = 2 * width + 10 # Start 10 pixels into the third image | |
text_y = 30 # Initial y position | |
vertical_spacing = 30 # Spacing between lines | |
plt.text(text_x, text_y, f'Epoch: {epoch_num}', color='red', fontsize=10) | |
if train_loss is not None: | |
plt.text(text_x, text_y + vertical_spacing, f'Train Loss: {train_loss:.4f}', color='red', fontsize=10) | |
if val_loss is not None: | |
plt.text(text_x, text_y + 2*vertical_spacing, f'Val Loss: {val_loss:.4f}', color='red', fontsize=10) | |
if network_shape_str is not None: | |
plt.text(text_x, text_y + 3*vertical_spacing, f'Shape: {network_shape_str}', color='red', fontsize=10) | |
if random_seed is not None: | |
plt.text(text_x, text_y + 4*vertical_spacing, f'Seed: {random_seed}', color='red', fontsize=10) | |
plt.axis("off") | |
plt.savefig(output_path, bbox_inches='tight', pad_inches=0) | |
plt.show() | |
plt.close() | |
def dump_network_weights(network, filename): | |
"""Dump network weights to a file in a human-readable format.""" | |
with open(filename, 'w') as f: | |
# Count the number of linear layers to determine network architecture | |
linear_layers = [m for m in network.network if isinstance(m, nn.Linear)] | |
f.write(f"Network architecture: {[l.in_features for l in linear_layers] + [linear_layers[-1].out_features]}\n") | |
f.write(f"Number of layers: {len(linear_layers)}\n\n") | |
# Iterate through the sequential layers | |
for i, module in enumerate(network.network): | |
if isinstance(module, nn.Linear): | |
f.write(f"Layer {i} (Linear):\n") | |
f.write(f" Shape: {module.weight.shape}\n") | |
f.write(f" Weight range: min={module.weight.min():.6f}, max={module.weight.max():.6f}\n") | |
f.write(f" Weight mean: {module.weight.mean():.6f}, std: {module.weight.std():.6f}\n") | |
f.write(f" Bias range: min={module.bias.min():.6f}, max={module.bias.max():.6f}\n") | |
f.write(f" Bias mean: {module.bias.mean():.6f}, std: {module.bias.std():.6f}\n\n") | |
# Write weight matrix | |
f.write(" Weight matrix:\n") | |
weight_matrix = module.weight.detach().cpu().numpy() | |
for row in weight_matrix: | |
f.write(" " + " ".join(f"{x:.6f}" for x in row) + "\n") | |
# Write bias vector | |
f.write("\n Bias vector:\n") | |
bias_vector = module.bias.detach().cpu().numpy() | |
f.write(" " + " ".join(f"{x:.6f}" for x in bias_vector) + "\n\n") | |
elif isinstance(module, nn.LeakyReLU): | |
f.write(f"Layer {i} (LeakyReLU activation)\n\n") | |
### Full Training and Visualization Pipeline ### | |
def full_pipeline( | |
input_path, | |
is_video=False, | |
train_size=5000, | |
val_size=1000, | |
layer_sizes=[10, 10, 10, 10, 10, 10, 10, 10], | |
epochs=10, | |
batch_size=1024, | |
learning_rate=0.001, | |
output_dir="results", | |
random_seed=None, | |
network_shape_b64=None, | |
network_shape_str=None, | |
debug=False | |
): | |
os.makedirs(output_dir, exist_ok=True) | |
# Step 1: Preprocess data | |
if is_video: | |
data = preprocess_video(input_path) | |
height, width = 64, 64 # Assuming a fixed resolution for visualization | |
else: | |
data = preprocess_image(input_path) | |
img = cv2.imread(input_path, cv2.IMREAD_GRAYSCALE) | |
height, width = img.shape | |
# Step 2: Sample training and validation data | |
train_data, val_data = sample_data(data, train_size, val_size) | |
# Step 3: Initialize the network | |
input_dim = 3 if is_video else 2 | |
network = ReLUNetwork(input_dim, layer_sizes, debug=debug) | |
trainable_params = sum(p.numel() for p in network.parameters() if p.requires_grad) | |
print(f"Trainable parameters: {trainable_params}") | |
from torchinfo import summary | |
summary(network, input_size=(1024, 2)) | |
# Step 4: Train and visualize decision boundaries | |
boundary_frames = [] | |
for epoch in range(epochs): | |
print(f"Epoch {epoch + 1}/{epochs}") | |
train_loss, val_loss = train_network(network, train_data, val_data, epochs=1, batch_size=batch_size, learning_rate=learning_rate) | |
epoch_str = "%04d" % (epoch + 1) | |
seed_str = str(random_seed) if random_seed is not None else "none" | |
output_path = os.path.join(output_dir, f"{os.path.basename(input_path)}_{network_shape_b64}_{seed_str}_epoch_{epoch_str}.png") | |
visualize_decision_boundary_with_predictions( | |
network, data, train_data, val_data, | |
(height, width), output_path, input_path, | |
train_loss=train_loss, val_loss=val_loss, | |
network_shape_str=network_shape_str, | |
random_seed=random_seed | |
) | |
#visualize_decision_boundary_with_predictions(network, data, (height, width), output_path) | |
#boundary_frames.append(imageio.imread(output_path)) | |
# Step 5: Save video if processing a video input | |
if is_video: | |
video_output_path = os.path.join(output_dir, "boundary_evolution.mp4") | |
imageio.mimsave(video_output_path, boundary_frames, fps=5) | |
print(f"Boundary evolution video saved at: {video_output_path}") | |
# Step 6: Dump network weights | |
weights_filename = os.path.join(output_dir, f"weights_{network_shape_b64}_{seed_str}.txt") | |
dump_network_weights(network, weights_filename) | |
import sys | |
import imageio | |
#print(torch.cuda.get_device_name(0)) | |
if len(sys.argv) < 2: | |
print("Usage: python experiments2.py <input_image> <NN_shape> <epochs> [random_seed] [--debug]") | |
exit(1) | |
input_image = sys.argv[1] | |
network_shape_str = sys.argv[2] # Get the raw string | |
network_shape_b64 = base64.b64encode(network_shape_str.encode()).decode() # Encode it before evaluation | |
NN_shape = eval(network_shape_str) # Now evaluate it | |
nepochs = int(sys.argv[3]) | |
random_seed = int(sys.argv[4]) if len(sys.argv) > 4 else None | |
debug = "--debug" in sys.argv | |
# Set random seed if provided | |
if random_seed is not None: | |
random.seed(random_seed) | |
np.random.seed(random_seed) | |
torch.manual_seed(random_seed) | |
if torch.cuda.is_available(): | |
torch.cuda.manual_seed(random_seed) | |
torch.cuda.manual_seed_all(random_seed) | |
torch.backends.cudnn.deterministic = True | |
torch.backends.cudnn.benchmark = False | |
# A number of different ReLU network shapes, all with the same number of | |
# parameters, but with depth varying from 2 layers to 10. | |
#shapes_to_try = [] | |
#for i in range(2, 14): | |
# shape = generate_network_shape(1024, i) | |
# shapes_to_try.append(shape) | |
#print(shapes_to_try) | |
#for shape in shapes_to_try: | |
# Spawn a new Python interpreter process to deal with various memory leaks | |
# and run the full pipeline for that shape: | |
# print(f"Trying shape: {shape}") | |
full_pipeline(input_image, is_video=False, epochs=nepochs, layer_sizes=NN_shape, random_seed=random_seed, network_shape_b64=network_shape_b64, network_shape_str=network_shape_str, debug=debug) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment