kagami-tsukimura / pytorch-deeplearning

0 stars 0 forks source link

ddp #80

Open kagami-tsukimura opened 1 week ago

kagami-tsukimura commented 1 week ago

PyTorchのDistributed Data Parallel (DDP) を使って、2台のマシンで学習を行うためのサンプルコードを提供します。このコードは、torch.distributed パッケージを使用して、マルチマシン・マルチGPUでのトレーニングを実現します。

前提として、以下の点を確認してください:

各マシン上でPyTorchと必要なライブラリがインストールされていること。 各マシンのIPアドレスとポート番号を知っていること。 各マシン間でパスワードなしでSSH接続ができるように設定されていること。 メインのマシンには2枚のGPU、サブのマシンには1枚のGPUがあるため、全体で3つのプロセスを起動します。

スクリプト全体の流れ 各マシンでPyTorchのDDPを設定。 モデル、データセット、オプティマイザの設定。 トレーニングループの実装。

kagami-tsukimura commented 1 week ago

スクリプト train.py (メインスクリプト) python コードをコピーする import os import torch import torch.distributed as dist import torch.multiprocessing as mp from torch.nn.parallel import DistributedDataParallel as DDP from torch.utils.data import DataLoader, DistributedSampler from torchvision import datasets, transforms

def setup(rank, world_size, master_addr, master_port): os.environ['MASTER_ADDR'] = master_addr os.environ['MASTER_PORT'] = master_port dist.init_process_group("nccl", rank=rank, world_size=world_size)

def cleanup(): dist.destroy_process_group()

def train(rank, world_size, master_addr, master_port): setup(rank, world_size, master_addr, master_port)

# モデルの定義
model = MyModel().to(rank)
model = DDP(model, device_ids=[rank])

# データローダーの定義
transform = transforms.Compose([transforms.ToTensor()])
dataset = datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank)
dataloader = DataLoader(dataset, batch_size=32, sampler=sampler)

# オプティマイザの定義
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)

# トレーニングループ
for epoch in range(10):
    sampler.set_epoch(epoch)
    for batch_idx, (data, target) in enumerate(dataloader):
        data, target = data.to(rank), target.to(rank)
        optimizer.zero_grad()
        output = model(data)
        loss = F.cross_entropy(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % 10 == 0:
            print(f'Rank {rank}, Epoch {epoch}, Batch {batch_idx}, Loss {loss.item()}')

cleanup()

if name == "main": world_size = 3 # 全体のGPU数 master_addr = 'メインマシンのIPアドレス' master_port = 'メインマシンのポート番号'

mp.spawn(train, args=(world_size, master_addr, master_port), nprocs=world_size, join=True)
kagami-tsukimura commented 1 week ago

各マシンでの実行方法 メインマシン (2 GPUs) bash コードをコピーする python -m torch.distributed.launch --nproc_per_node=2 --nnodes=2 --node_rank=0 --master_addr="メインマシンのIPアドレス" --master_port="メインマシンのポート番号" train.py サブマシン (1 GPU) bash コードをコピーする python -m torch.distributed.launch --nproc_per_node=1 --nnodes=2 --node_rank=1 --master_addr="メインマシンのIPアドレス" --master_port="メインマシンのポート番号" train.py このスクリプトは、各マシン上で分散トレーニングを設定し、train.py スクリプトを各プロセスに対して起動します。--nproc_per_node は各マシンのGPU数を指定し、--nnodes は全体のノード数を指定します。--node_rank は各マシンのランクを指定し、--master_addr と --master_port はメインマシンのIPアドレスとポート番号を指定します。

kagami-tsukimura commented 1 week ago

パスワードなしでSSH接続を設定するための手順を以下に示します。この設定は、公開鍵認証を使用して行います。

  1. SSHキーの生成 まず、メインマシンでSSHキーを生成します。

bash コードをコピーする ssh-keygen -t rsa コマンドを実行すると、デフォルトの保存場所(通常は ~/.ssh/id_rsa)を聞かれます。エンターキーを押してデフォルトの場所を使用します。 パスフレーズは空のままにしておくか、必要に応じて設定します。

  1. 公開鍵をサブマシンにコピー 次に、生成された公開鍵をサブマシンにコピーします。

bash コードをコピーする ssh-copy-id username@サブマシンのIPアドレス username にはサブマシンのユーザー名を、サブマシンのIPアドレス にはサブマシンのIPアドレスを入力します。 初回接続時にサブマシンのパスワードを入力します。

  1. 設定の確認 公開鍵がサブマシンにコピーされると、~/.ssh/authorized_keys ファイルに追記されます。これにより、メインマシンからサブマシンへのSSH接続がパスワードなしで行えるようになります。

確認のため、メインマシンからサブマシンにSSH接続を試みます。

bash コードをコピーする ssh username@サブマシンのIPアドレス パスワードの入力を求められない場合、設定は正しく行われています。 補足情報 サブマシンのSSH設定ファイル (/etc/ssh/sshd_config) において、PubkeyAuthentication が yes になっていることを確認します。この設定が有効になっていない場合、公開鍵認証が動作しません。 サービスを再起動して設定を反映させます。 bash コードをコピーする sudo systemctl restart sshd これで、メインマシンからサブマシンへのパスワードなしでのSSH接続が設定されました。次に、分散トレーニングのスクリプトを実行できます。

kagami-tsukimura commented 1 week ago

DETRのmain.pyに分散データパラレル(DDP)を実装するためのコードを以下に示します。以下の手順で進めます。

必要なインポートを追加する。 DDPセットアップおよびクリーンアップ関数を追加する。 main関数を修正してDDPを導入する。 データローダーに分散サンプラーを導入する。

追加

import os
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP

def setup(rank, world_size, master_addr, master_port):
    os.environ['MASTER_ADDR'] = master_addr
    os.environ['MASTER_PORT'] = master_port
    dist.init_process_group("nccl", rank=rank, world_size=world_size)

def cleanup():
    dist.destroy_process_group()

修正

def main(rank, world_size, args):
    setup(rank, world_size, args.master_addr, args.master_port)

    utils.init_distributed_mode(args)
    print("git:\n  {}\n".format(utils.get_sha()))

    if args.frozen_weights is not None:
        assert args.masks, "Frozen training is meant for segmentation only"
    print(args)

    device = torch.device(rank)
    torch.manual_seed(args.seed)
    np.random.seed(args.seed)
    random.seed(args.seed)

    model, criterion, postprocessors = build_model(args)
    model.to(device)

    model_without_ddp = model
    if args.distributed:
        model = DDP(model, device_ids=[rank])
        model_without_ddp = model.module
    n_parameters = sum(p.numel() for p in model.parameters() if p.requires_grad)
    print('number of params:', n_parameters)

    param_dicts = [
        {"params": [p for n, p in model_without_ddp.named_parameters() if "backbone" not in n and p.requires_grad]},
        {
            "params": [p for n, p in model_without_ddp.named_parameters() if "backbone" in n and p.requires_grad],
            "lr": args.lr_backbone,
        },
    ]
    optimizer = torch.optim.AdamW(param_dicts, lr=args.lr, weight_decay=args.weight_decay)
    lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, args.lr_drop)

    dataset_train = build_dataset(image_set='train', args=args)
    dataset_val = build_dataset(image_set='val', args=args)

    if args.distributed:
        sampler_train = DistributedSampler(dataset_train, num_replicas=world_size, rank=rank)
        sampler_val = DistributedSampler(dataset_val, num_replicas=world_size, rank=rank, shuffle=False)
    else:
        sampler_train = torch.utils.data.RandomSampler(dataset_train)
        sampler_val = torch.utils.data.SequentialSampler(dataset_val)

    batch_sampler_train = torch.utils.data.BatchSampler(sampler_train, args.batch_size, drop_last=True)
    data_loader_train = DataLoader(dataset_train, batch_sampler=batch_sampler_train,
                                   collate_fn=utils.collate_fn, num_workers=args.num_workers)
    data_loader_val = DataLoader(dataset_val, args.batch_size, sampler=sampler_val,
                                 drop_last=False, collate_fn=utils.collate_fn, num_workers=args.num_workers)

    if args.dataset_file == "coco_panoptic":
        coco_val = datasets.coco.build("val", args)
        base_ds = get_coco_api_from_dataset(coco_val)
    else:
        base_ds = get_coco_api_from_dataset(dataset_val)

    if args.frozen_weights is not None:
        checkpoint = torch.load(args.frozen_weights, map_location='cpu')
        model_without_ddp.detr.load_state_dict(checkpoint['model'])

    output_dir = Path(args.output_dir)
    if args.resume:
        if args.resume.startswith('https'):
            checkpoint = torch.hub.load_state_dict_from_url(args.resume, map_location='cpu', check_hash=True)
        else:
            checkpoint = torch.load(args.resume, map_location='cpu')
        model_without_ddp.load_state_dict(checkpoint['model'])
        if not args.eval and 'optimizer' in checkpoint and 'lr_scheduler' in checkpoint and 'epoch' in checkpoint:
            optimizer.load_state_dict(checkpoint['optimizer'])
            lr_scheduler.load_state_dict(checkpoint['lr_scheduler'])
            args.start_epoch = checkpoint['epoch'] + 1

    if args.eval:
        test_stats, coco_evaluator = evaluate(model, criterion, postprocessors, data_loader_val, base_ds, device, args.output_dir)
        if args.output_dir:
            utils.save_on_master(coco_evaluator.coco_eval["bbox"].eval, output_dir / "eval.pth")
        return

    print("Start training")
    start_time = time.time()
    for epoch in range(args.start_epoch, args.epochs):
        if args.distributed:
            sampler_train.set_epoch(epoch)
        train_stats = train_one_epoch(model, criterion, data_loader_train, optimizer, device, epoch, args.clip_max_norm)
        lr_scheduler.step()
        if args.output_dir:
            checkpoint_paths = [output_dir / 'checkpoint.pth']
            if (epoch + 1) % args.lr_drop == 0 or (epoch + 1) % 100 == 0:
                checkpoint_paths.append(output_dir / f'checkpoint{epoch:04}.pth')
            for checkpoint_path in checkpoint_paths:
                utils.save_on_master({
                    'model': model_without_ddp.state_dict(),
                    'optimizer': optimizer.state_dict(),
                    'lr_scheduler': lr_scheduler.state_dict(),
                    'epoch': epoch,
                    'args': args,
                }, checkpoint_path)

        test_stats, coco_evaluator = evaluate(model, criterion, postprocessors, data_loader_val, base_ds, device, args.output_dir)

        log_stats = {**{f'train_{k}': v for k, v in train_stats.items()},
                     **{f'test_{k}': v for k, v in test_stats.items()},
                     'epoch': epoch,
                     'n_parameters': n_parameters}

        if args.output_dir and utils.is_main_process():
            with (output_dir / "log.txt").open("a") as f:
                f.write(json.dumps(log_stats) + "\n")

            if coco_evaluator is not None:
                (output_dir / 'eval').mkdir(exist_ok=True)
                if "bbox" in coco_evaluator.coco_eval:
                    filenames = ['latest.pth']
                    if epoch % 50 == 0:
                        filenames.append(f'{epoch:03}.pth')
                    for name in filenames:
                        torch.save(coco_evaluator.coco_eval["bbox"].eval, output_dir / "eval" / name)

    total_time = time.time() - start_time
    total_time_str = str(datetime.timedelta(seconds=int(total_time)))
    print('Training time {}'.format(total_time_str))

    cleanup()

if __name__ == '__main__':
    parser = argparse.ArgumentParser('DETR training and evaluation script', parents=[get_args_parser()])
    args = parser.parse_args()
    if args.output_dir:
        Path(args.output_dir).mkdir(parents=True, exist_ok=True)

    args.distributed = True
    world_size = 3  # 全体のGPU数(2枚と1枚の合計)
    args.master_addr = 'メインマシンのIPアドレス'
    args.master_port = 'メインマシンのポート番号'

    mp.spawn(main, args=(world_size, args), nprocs=world_size, join=True)

説明 セットアップ関数: setupとcleanup関数を追加して、プロセスグループの初期化と終了を行います。 main関数の修正: DDPに対応するように、main関数を修正しました。各プロセスが同期され、モデルがDDPとして初期化されます。 分散データローダー: DistributedSamplerを使用してデータローダーを分散トレーニングに対応させます。 これにより、DETRのmain.pyは複数マシンでの分散トレーニングを行うことができます。

kagami-tsukimura commented 1 week ago

処理の待ち合わせ 複数マシンでのDDPを使用する場合、各プロセスは torch.distributed.init_process_group によって初期化されます。この初期化が完了すると、全てのプロセスが揃ったことになります。その後、学習を開始します。

setup(rank, world_size, master_addr, master_port) 関数の中で、dist.init_process_group が呼ばれ、各プロセスが同期されます。 各プロセスが揃うまで、init_process_group はブロックされます。 これにより、全てのプロセスが揃った後に学習が開始されるようになります。