NVIDIA / nccl

Optimized primitives for collective multi-GPU communication
Other
3.23k stars 812 forks source link

The time of alltoall is unstable on two machines #1037

Open kaihaoma opened 1 year ago

kaihaoma commented 1 year ago

I am using Pytorch 2.0.1 with nccl as the communication backend. I run a small alltoall test on two machine with 4GPUs and find the time on alltoall has large variance.

here is the code of my the test, basically each GPU creates a tensor and then perform an alltoall operation

def run_bidirection_mixed(rank, world_size, backend, data_size):
    def run_once():
        input = torch.randn(data_size, dtype=torch.float32, device=f"cuda:{rank}")
        output = torch.randn(data_size, dtype=torch.float32, device=f"cuda:{rank}")
        t1_time = time.time()
        dist.all_to_all_single(output, input)
        # Wait for all data to be sent
        torch.cuda.synchronize(device=f"cuda:{rank}")
        return time.time() - t1_time
    # execute a few rounds of warmup
    warmup_time = 0.0
    for _ in range(2):
        warmup_time += run_once()
    # measure runtime
    benchmark_time = []
    for _ in range(10):
        benchmark_time.append(run_once())
    print(
        f"Rank: {rank} | Backend: {backend} | Data Vol.: {(data_size * 4) / (1000 * 1000)} MB | Warmup: {(warmup_time):.3f} s | Max: {np.max(benchmark_time):.5f} s | Min: {np.min(benchmark_time):.5f} s | Avg: {np.mean(benchmark_time):.5f} s"
    )
ljluestc commented 3 days ago

import torch
import torch.distributed as dist
import numpy as np
import time

def run_bidirection_mixed(rank, world_size, backend, data_size):
    # Initialize the process group
    dist.init_process_group(backend=backend, rank=rank, world_size=world_size)

    def run_once():
        # Create input and output tensors
        input_tensor = torch.randn(data_size, dtype=torch.float32, device=f"cuda:{rank}")
        output_tensor = torch.zeros_like(input_tensor)  # Allocate output with zeroes
        t1_time = time.perf_counter()  # Use high-resolution timer
        dist.all_to_all_single(output_tensor, input_tensor)
        # Wait for all data to be sent
        torch.cuda.synchronize()  # Synchronize for all GPUs
        return time.perf_counter() - t1_time  # Calculate elapsed time

    # Execute a few rounds of warmup
    warmup_time = 0.0
    for _ in range(5):  # Increased number of warmup iterations
        warmup_time += run_once()

    # Measure runtime
    benchmark_time = []
    for _ in range(10):  # Number of benchmark runs
        benchmark_time.append(run_once())

    print(
        f"Rank: {rank} | Backend: {backend} | Data Vol.: {(data_size * 4) / (1000 * 1000)} MB | "
        f"Warmup: {warmup_time:.3f} s | Max: {np.max(benchmark_time):.5f} s | "
        f"Min: {np.min(benchmark_time):.5f} s | Avg: {np.mean(benchmark_time):.5f} s"
    )

    # Finalize the process group
    dist.destroy_process_group()

if __name__ == "__main__":
    import sys
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument("--world_size", type=int, default=2, help="Number of processes participating in the job")
    parser.add_argument("--data_size", type=int, default=1024 * 1024, help="Size of data per GPU")
    parser.add_argument("--backend", type=str, default="nccl", help="Backend for distributed communication")

    args = parser.parse_args()

    # Run the mixed bidirectional communication
    run_bidirection_mixed(rank=int(sys.argv[1]), world_size=args.world_size, backend=args.backend, data_size=args.data_size)
kiskra-nvidia commented 3 days ago

Do you observe that with the alltoall_perf from nccl-tests as well?