Last active
January 30, 2025 23:15
-
-
Save kouroshHakha/672206e0454547e3d2cbb4e5ae6c9453 to your computer and use it in GitHub Desktop.
Test peer-to-peer GPU bandwidth
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import ray | |
import torch | |
import time | |
import numpy as np | |
from ray.util.collective.types import Backend | |
from ray.util.collective.const import get_store_name | |
import ray.util.collective as collective | |
import os | |
@ray.remote(num_gpus=1) | |
class GPUActor: | |
def __init__(self, rank, world_size, tensor_size): | |
# Initialize actor | |
self.rank = rank | |
self.world_size = world_size | |
self.tensor_size = tensor_size | |
# Set device for this actor | |
self.device = torch.device(f"cuda:{ray.get_gpu_ids()[0]}") | |
torch.cuda.set_device(self.device) | |
# Initialize collective group | |
collective.init_collective_group( | |
world_size=world_size, | |
rank=rank, | |
backend=Backend.NCCL, | |
group_name="bandwidth_test" | |
) | |
print(f"Initialized actor {rank} on GPU {ray.get_gpu_ids()[0]}") | |
def warmup(self, num_warmup=5): | |
tensor = torch.randn(self.tensor_size, device=self.device) | |
for _ in range(num_warmup): | |
collective.allreduce(tensor, group_name="bandwidth_test") | |
return True | |
def test_allreduce(self, num_iters=20): | |
tensor = torch.randn(self.tensor_size, device=self.device) | |
torch.cuda.synchronize() | |
# Time the all-reduce operations | |
start = time.time() | |
for _ in range(num_iters): | |
collective.allreduce(tensor, group_name="bandwidth_test") | |
torch.cuda.synchronize() | |
end = time.time() | |
# Calculate bandwidth | |
elapsed = end - start | |
bytes_transferred = tensor.element_size() * tensor.nelement() * 2 * num_iters # *2 for reduce | |
bandwidth = (bytes_transferred / elapsed) / (1024**3) # GB/s | |
return bandwidth | |
def test_allgather(self, num_iters=20): | |
tensor = torch.randn(self.tensor_size // self.world_size, device=self.device) | |
output_tensor = torch.empty(self.tensor_size, device=self.device) | |
torch.cuda.synchronize() | |
# Time the all-gather operations | |
start = time.time() | |
for _ in range(num_iters): | |
collective.allgather(tensor, output_tensor, group_name="bandwidth_test") | |
torch.cuda.synchronize() | |
end = time.time() | |
# Calculate bandwidth | |
elapsed = end - start | |
bytes_transferred = tensor.element_size() * tensor.nelement() * self.world_size * num_iters | |
bandwidth = (bytes_transferred / elapsed) / (1024**3) # GB/s | |
return bandwidth | |
def run_bandwidth_test(num_gpus, size_mb=64, iters=20, warmup=5): | |
# Initialize Ray with the specified number of GPUs | |
ray.init(num_gpus=num_gpus) | |
# Convert MB to elements (assuming float32) | |
elements = size_mb * 1024 * 1024 // 4 | |
# Create actors for each GPU | |
actors = [GPUActor.remote(i, num_gpus, elements) for i in range(num_gpus)] | |
# Warmup | |
print("\nWarming up...") | |
ray.get([actor.warmup.remote(warmup) for actor in actors]) | |
# Run all-reduce test | |
print("\nRunning AllReduce test...") | |
allreduce_bw = ray.get([actor.test_allreduce.remote(iters) for actor in actors]) | |
# Run all-gather test | |
print("\nRunning AllGather test...") | |
allgather_bw = ray.get([actor.test_allgather.remote(iters) for actor in actors]) | |
# Print results | |
print(f"\nResults (tensor size: {size_mb}MB, iterations: {iters}):") | |
print(f"AllReduce average bandwidth: {np.mean(allreduce_bw):.2f} GB/s") | |
print(f"AllGather average bandwidth: {np.mean(allgather_bw):.2f} GB/s") | |
# Cleanup | |
ray.shutdown() | |
if __name__ == "__main__": | |
# For 4 nodes with 8 GPUs each (32 total GPUs) | |
TOTAL_GPUS = 32 | |
run_bandwidth_test( | |
num_gpus=TOTAL_GPUS, | |
size_mb=64, | |
iters=20, | |
warmup=5 | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment