Skip to content

Instantly share code, notes, and snippets.

@kouroshHakha
Last active January 30, 2025 23:15
Show Gist options
  • Save kouroshHakha/672206e0454547e3d2cbb4e5ae6c9453 to your computer and use it in GitHub Desktop.
Save kouroshHakha/672206e0454547e3d2cbb4e5ae6c9453 to your computer and use it in GitHub Desktop.
Test peer-to-peer GPU bandwidth
import ray
import torch
import time
import numpy as np
from ray.util.collective.types import Backend
from ray.util.collective.const import get_store_name
import ray.util.collective as collective
import os
@ray.remote(num_gpus=1)
class GPUActor:
def __init__(self, rank, world_size, tensor_size):
# Initialize actor
self.rank = rank
self.world_size = world_size
self.tensor_size = tensor_size
# Set device for this actor
self.device = torch.device(f"cuda:{ray.get_gpu_ids()[0]}")
torch.cuda.set_device(self.device)
# Initialize collective group
collective.init_collective_group(
world_size=world_size,
rank=rank,
backend=Backend.NCCL,
group_name="bandwidth_test"
)
print(f"Initialized actor {rank} on GPU {ray.get_gpu_ids()[0]}")
def warmup(self, num_warmup=5):
tensor = torch.randn(self.tensor_size, device=self.device)
for _ in range(num_warmup):
collective.allreduce(tensor, group_name="bandwidth_test")
return True
def test_allreduce(self, num_iters=20):
tensor = torch.randn(self.tensor_size, device=self.device)
torch.cuda.synchronize()
# Time the all-reduce operations
start = time.time()
for _ in range(num_iters):
collective.allreduce(tensor, group_name="bandwidth_test")
torch.cuda.synchronize()
end = time.time()
# Calculate bandwidth
elapsed = end - start
bytes_transferred = tensor.element_size() * tensor.nelement() * 2 * num_iters # *2 for reduce
bandwidth = (bytes_transferred / elapsed) / (1024**3) # GB/s
return bandwidth
def test_allgather(self, num_iters=20):
tensor = torch.randn(self.tensor_size // self.world_size, device=self.device)
output_tensor = torch.empty(self.tensor_size, device=self.device)
torch.cuda.synchronize()
# Time the all-gather operations
start = time.time()
for _ in range(num_iters):
collective.allgather(tensor, output_tensor, group_name="bandwidth_test")
torch.cuda.synchronize()
end = time.time()
# Calculate bandwidth
elapsed = end - start
bytes_transferred = tensor.element_size() * tensor.nelement() * self.world_size * num_iters
bandwidth = (bytes_transferred / elapsed) / (1024**3) # GB/s
return bandwidth
def run_bandwidth_test(num_gpus, size_mb=64, iters=20, warmup=5):
# Initialize Ray with the specified number of GPUs
ray.init(num_gpus=num_gpus)
# Convert MB to elements (assuming float32)
elements = size_mb * 1024 * 1024 // 4
# Create actors for each GPU
actors = [GPUActor.remote(i, num_gpus, elements) for i in range(num_gpus)]
# Warmup
print("\nWarming up...")
ray.get([actor.warmup.remote(warmup) for actor in actors])
# Run all-reduce test
print("\nRunning AllReduce test...")
allreduce_bw = ray.get([actor.test_allreduce.remote(iters) for actor in actors])
# Run all-gather test
print("\nRunning AllGather test...")
allgather_bw = ray.get([actor.test_allgather.remote(iters) for actor in actors])
# Print results
print(f"\nResults (tensor size: {size_mb}MB, iterations: {iters}):")
print(f"AllReduce average bandwidth: {np.mean(allreduce_bw):.2f} GB/s")
print(f"AllGather average bandwidth: {np.mean(allgather_bw):.2f} GB/s")
# Cleanup
ray.shutdown()
if __name__ == "__main__":
# For 4 nodes with 8 GPUs each (32 total GPUs)
TOTAL_GPUS = 32
run_bandwidth_test(
num_gpus=TOTAL_GPUS,
size_mb=64,
iters=20,
warmup=5
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment