wenet-e2e / wenet

Production First and Production Ready End-to-End Speech Recognition Toolkit
https://wenet-e2e.github.io/wenet/
Apache License 2.0
4.16k stars 1.08k forks source link

DDP model.join warning ? #2266

Closed kobenaxie closed 5 months ago

kobenaxie commented 10 months ago

Describe the bug 在AISHELL2+librispeech+cstal上训练的时候遇到的警告:

~/miniconda/envs/wenet/lib/python3.8/site-packages/torch/distributed/algorithms/join.py:258: UserWarning: Detected uneven input skew of greater than 1000. This means that rank 1 has at least 1000 fewer inputs than other currently-active ranks. This level of skew could lead to performance degradation during training.

这是训练一轮快结束的时候数据不均衡导致的吧?这个warning会影响最终的识别效果吗?相比于遇到不均衡数据时提前结束的训练策略参考espnet,我看训练一轮的cv_loss差别很大挺大的。

torch==1.13.0

xingchensong commented 10 months ago

可以参考train_utils.py::wenetjoin函数,目前是只对deepspeed打开了,ddp没开,可以把他打开

kobenaxie commented 10 months ago

ok, 回头试下,我理解wenet_join的作用和这里espnet是一样的吧。

kobenaxie commented 10 months ago

可以参考train_utils.py::wenetjoin函数,目前是只对deepspeed打开了,ddp没开,可以把他打开

ddp开了wenet_join之后卡住了(AISHELL):

[E ProcessGroupGloo.cpp:137] Rank 1 successfully reached monitoredBarrier, but received errors while waiting for send/recv from rank 0. Please check rank 0 logs for faulty rank.
2023-12-30 18:39:41,609 INFO Detected uneven workload distribution: Rank 1 successfully reached monitoredBarrier, but received errors while waiting for send/recv from rank 0. Please check rank 0 logs for faulty rank.
 Original exception:
[../third_party/gloo/gloo/transport/tcp/unbound_buffer.cc:81] Timed out waiting 30000ms for recv operation to complete
Break current worker to manually join all workers, world_size 7, current rank 1, current local_rank 1
xingchensong commented 10 months ago

建议直接用deepspeed吧,小模型会快10%,大模型ddp直接没法训,deepspeed配置可参考examples/aishell/whisper/conf/dsstage1.json

xingchensong commented 10 months ago

ok, 回头试下,我理解wenet_join的作用和这里espnet是一样的吧。

功能类似,espnet的实现比较裸

kobenaxie commented 10 months ago

建议直接用deepspeed吧,小模型会快10%,大模型ddp直接没法训,deepspeed配置可参考examples/aishell/whisper/conf/dsstage1.json

是这样的,我用deep speed训的icefall中的zip former,但是卡在了loss.backward(),训conformer是正常的,所以我一直就用的DDP

xingchensong commented 10 months ago

建议直接用deepspeed吧,小模型会快10%,大模型ddp直接没法训,deepspeed配置可参考examples/aishell/whisper/conf/dsstage1.json

是这样的,我用deep speed训的icefall中的zip former,但是卡在了loss.backward(),训conformer是正常的,所以我一直就用的DDP

贡献下zipformer实现,我给你调通deepspeed😁

kobenaxie commented 10 months ago

建议直接用deepspeed吧,小模型会快10%,大模型ddp直接没法训,deepspeed配置可参考examples/aishell/whisper/conf/dsstage1.json

是这样的,我用deep speed训的icefall中的zip former,但是卡在了loss.backward(),训conformer是正常的,所以我一直就用的DDP

贡献下zipformer实现,我给你调通deepspeed😁

好啊好啊,不过我也只是简单的从icefall中移植过来,另外由于公司有限制,所以只能回家的时候有时间再调试了

xiyue961 commented 10 months ago

可以参考train_utils.py::wenetjoin函数,目前是只对deepspeed打开了,ddp没开,可以把他打开

ddp开了wenet_join之后卡住了(AISHELL):

[E ProcessGroupGloo.cpp:137] Rank 1 successfully reached monitoredBarrier, but received errors while waiting for send/recv from rank 0. Please check rank 0 logs for faulty rank.
2023-12-30 18:39:41,609 INFO Detected uneven workload distribution: Rank 1 successfully reached monitoredBarrier, but received errors while waiting for send/recv from rank 0. Please check rank 0 logs for faulty rank.
 Original exception:
[../third_party/gloo/gloo/transport/tcp/unbound_buffer.cc:81] Timed out waiting 30000ms for recv operation to complete
Break current worker to manually join all workers, world_size 7, current rank 1, current local_rank 1

@kobenaxie 使用examples/aishell/whisper例子,用deepspeed训whipser遇到了同样的问题,怎么解决

xingchensong commented 10 months ago

可以参考train_utils.py::wenetjoin函数,目前是只对deepspeed打开了,ddp没开,可以把他打开

ddp开了wenet_join之后卡住了(AISHELL):

[E ProcessGroupGloo.cpp:137] Rank 1 successfully reached monitoredBarrier, but received errors while waiting for send/recv from rank 0. Please check rank 0 logs for faulty rank.
2023-12-30 18:39:41,609 INFO Detected uneven workload distribution: Rank 1 successfully reached monitoredBarrier, but received errors while waiting for send/recv from rank 0. Please check rank 0 logs for faulty rank.
 Original exception:
[../third_party/gloo/gloo/transport/tcp/unbound_buffer.cc:81] Timed out waiting 30000ms for recv operation to complete
Break current worker to manually join all workers, world_size 7, current rank 1, current local_rank 1

@kobenaxie 使用examples/aishell/whisper例子,用deepspeed训whipser遇到了同样的问题,怎么解决

出现这个log,但是会继续训练下一个epcoh,是正常的。

出现这个log,但是卡在这个log很长时间(比如一小时以上),是不正常的。

你这里属于哪种情况?

xiyue961 commented 10 months ago

我出现了好几种情况:

  1. train阶段末尾打印这个错误,然后进到evaluate阶段,正常保存模型,进到下一个epoch,这样重复 这样是没问题的,但今天重新跑实验是直接训不了 2.train阶段开始的时候就打印这个错误,然后进到evaluate阶段,到下一个epoch这样重复,等于没有训练 换了一个云上的环境,又不一样 3.train阶段开始的时候就打印这个错误,然后等半个小时直接退出

怎么解决?

xingchensong commented 10 months ago

我出现了好几种情况:

  1. train阶段末尾打印这个错误,然后进到evaluate阶段,正常保存模型,进到下一个epoch,这样重复 这样是没问题的,但今天重新跑实验是直接训不了 2.train阶段开始的时候就打印这个错误,然后进到evaluate阶段,到下一个epoch这样重复,等于没有训练 换了一个云上的环境,又不一样 3.train阶段开始的时候就打印这个错误,然后等半个小时直接退出

怎么解决?

  1. 这是正常的预期现象

2和3. 可能原因有:

xiyue961 commented 10 months ago

3的环境使用了共享存储,大规模数据,并且同时有其他程序访问,应该是你说的这个原因 另外,3的环境下也试了基于transformers训,可行 2的环境下试了改小num_workers,有用 没有改过源码,这个最新的代码,我试下

xingchensong commented 10 months ago

你用transformers训得是whisper吗?是同样的大规模语音数据吗?

xiyue961 commented 10 months ago

对的,同样的大规模数据,把dataloader改写成了wenet的风格

xingchensong commented 10 months ago

关注一下前100batch的速度和中期的速度,假设你前几个batch平均要花100s来load,timeout现在是30s,必然在训练开始就触发。

transformers相当于没有显示指定过同步操作的timeout,此时同步操作timeout是个默认值,差不多有一个小时,所以数据没到位可以一直等, 体现在用户体感上就是可以正常训练。

如果你在大数据上把timeout设高一点,应该也可以“无感”

image

xiyue961 commented 10 months ago

嗯嗯,多谢指点

xingchensong commented 9 months ago

@kobenaxie 有个比较快的解决方式,就是把data.list重复epoch次,然后按照step去存ckpt(yaml中设置save_interval),这样只会在最终训练完成的时候才会遇到join

kobenaxie commented 9 months ago

@kobenaxie 有个比较快的解决方式,就是把data.list重复epoch次,然后按照step去存ckpt(yaml中设置save_interval),这样只会在最终训练完成的时候才会遇到join

嗯嗯,也是个方法

YueAn329 commented 8 months ago

我尝试使用accelerate.notebook_launcher进行分布式多卡训练,在进行图像分类训练时,如果够造一个IterableDataset的话,训练报错如下:

[E ProcessGroupGloo.cpp:138] Rank 1 successfully reached monitoredBarrier, but received errors while waiting for send/recv from rank 0. Please check rank 0 logs for faulty rank.
Traceback (most recent call last):
  File "/root/anaconda3/envs/py3.10.12/lib/python3.10/contextlib.py", line 153, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/root/anaconda3/envs/py3.10.12/lib/python3.10/site-packages/accelerate/utils/other.py", line 244, in patch_environment
    yield
  File "/root/anaconda3/envs/py3.10.12/lib/python3.10/site-packages/accelerate/launchers.py", line 200, in notebook_launcher
    start_processes(launcher, args=args, nprocs=num_processes, start_method="fork")
  File "/root/anaconda3/envs/py3.10.12/lib/python3.10/site-packages/torch/multiprocessing/spawn.py", line 202, in start_processes
    while not context.join():
  File "/root/anaconda3/envs/py3.10.12/lib/python3.10/site-packages/torch/multiprocessing/spawn.py", line 145, in join
    raise ProcessExitedException(
torch.multiprocessing.spawn.ProcessExitedException: process 0 terminated with signal SIGSEGV

但是如果为Dataset的话则正常进行分布式训练,这个问题该如何解决?

Mddct commented 7 months ago

@kobenaxie 有个比较快的解决方式,就是把data.list重复epoch次,然后按照step去存ckpt(yaml中设置save_interval),这样只会在最终训练完成的时候才会遇到join

嗯嗯,也是个方法

最新已支持这种模式

lsrami commented 3 months ago

@kobenaxie 有个比较快的解决方式,就是把data.list重复epoch次,然后按照step去存ckpt(yaml中设置save_interval),这样只会在最终训练完成的时候才会遇到join

嗯嗯,也是个方法

最新已支持这种模式

@xingchensong 看完上面的讨论,我已经完全了解了添加save_interval的必要性,但是train.py的136行不合理:

  1. 136行更改了end_epoch,等价于更改了max_epoch,但是未写入配置文件,具有迷惑性
  2. 如果要使用xcsong说的复制data.list到达epoch次,从而解决数据分布不均匀导致每个epoch结束从而出现长时间卡住,无法开始下一个epoch,只需要将max_epoch设置为1,而不需要添加136行
  3. 我认为save_interval应该在正常的多epoch训练中也发挥其作用,而不是只能训练一个epoch

建议修改train.py的136行为下面这样:

end_epoch = configs.get('max_epoch', 100)

同时我建议配置文件中额外增加三个参数,用来控制N个epoch保存一次一次;同时cv阶段很耗时,在大模型训练时我认为是不必须的,可以自由控制是否进行验证

save_interval: 1000 # N个step保存一次
save_interval_epoch: 1 # N个epoch保存一次
use_cv_step: true # step训练阶段是否只保存模型不进行验证
use_cv_epoch: true #  epoch阶段是否只保存模型不进行验证

修改的代码示例如下: train.py的片段

    for epoch in range(start_epoch, end_epoch):
        configs['epoch'] = epoch

        lrs = [group['lr'] for group in optimizer.param_groups]
        logging.info('Begin of Epoch {} Step {} TRAIN info lr {} rank {}'.format(
            epoch, executor.step, lrs_to_str(lrs), rank))

        dist.barrier(
        )  # NOTE(xcsong): Ensure all ranks start Train at the same time.
        # NOTE(xcsong): Why we need a new group? see `train_utils.py::wenet_join`
        group_join = dist.new_group(
            backend="gloo", timeout=datetime.timedelta(seconds=args.timeout))
        executor.train(model, optimizer, scheduler, train_data_loader,
                       cv_data_loader, writer, configs, scaler, group_join)
        dist.destroy_process_group(group_join)

        dist.barrier(
        )  # NOTE(xcsong): Ensure all ranks start CV at the same time.
        if (epoch + 1) % configs.get('save_interval_epoch', 1) == 0:
            if configs.get('use_cv_epoch', True):
                loss_dict = executor.cv(model, cv_data_loader, configs)
            else:
                logging.info(f"Note: use_cv_epoch set to False, using fake_loss instead of cv_loss")
                loss_dict = {
                    'acc': -100.0,
                    'loss': -100.0,
                    'loss_att': -100.0,
                    'loss_ctc': -100.0,
                    'th_accuracy': -100.0
                }
            info_dict = {
                'epoch': epoch,
                'lrs': [group['lr'] for group in optimizer.param_groups],
                'step': executor.step,
                'save_time': datetime.datetime.now().strftime('%d/%m/%Y %H:%M:%S'),
                'tag': "epoch_{}".format(epoch),
                'loss_dict': loss_dict,
                **configs
            }
            # epoch cv: tensorboard && log
            log_per_epoch(writer, info_dict=info_dict)
            save_model(model, info_dict=info_dict)

        final_epoch = epoch
        lrs = [group['lr'] for group in optimizer.param_groups]
        logging.info('End of Epoch {} Step {} TRAIN info lr {} rank {}'.format(
            epoch, executor.step, lrs_to_str(lrs), rank))

executor.py的代码片段

                if (self.step +
                        1) % save_interval == 0 and self.step != 0 and (
                            batch_idx + 1) % info_dict["accum_grad"] == 0:
                    import torch.distributed as dist
                    # Ensure all ranks start CV at the same time in step mode
                    dist.barrier()

                    if info_dict.get('use_cv_step', True):
                        loss_dict = self.cv(model, cv_data_loader, configs)
                    else:
                        logging.info(f"Note: use_cv_step set to False, using train_loss instead of cv_loss")
                        loss_dict = info_dict['loss_dict']
                        loss_dict.update({'acc': loss_dict['th_accuracy']})

                    model.train()
                    info_dict.update({
                        "tag":
                        "step_{}".format(self.step),
                        "loss_dict":
                        loss_dict,
                        "save_time":
                        datetime.datetime.now().strftime('%d/%m/%Y %H:%M:%S'),
                        "lrs":
                        [group['lr'] for group in optimizer.param_groups]
                    })
                    save_model(model, info_dict)
                    # write final cv: tensorboard
                    log_per_step(writer, info_dict)
                    # Ensure all ranks start Train at the same time in step mode
                    dist.barrier()
xingchensong commented 3 months ago

maxepoch已经转化为cycle了

https://github.com/wenet-e2e/wenet/blob/main/wenet%2Futils%2Ftrain_utils.py#L332-L332

也就是只要设置了max epoch,就真的会跑max epoch轮,只是在save interval 模式下是把list重复max epoch次(也就是cycle),假设maxepoch是100,这里把max epoch改成1意思是100个list跑1轮(相比于原来每个epoch存一次,是1个list跑100轮),这个1并不是指一个list只跑一轮

lsrami commented 3 months ago

maxepoch已经转化为cycle了

https://github.com/wenet-e2e/wenet/blob/main/wenet%2Futils%2Ftrain_utils.py#L332-L332

也就是只要设置了max epoch,就真的会跑max epoch轮,只是在save interval 模式下是把list重复max epoch次(也就是cycle),假设maxepoch是100,这里把max epoch改成1意思是100个list跑1轮(相比于原来每个epoch存一次,是1个list跑100轮),这个1并不是指一个list只跑一轮

感谢回复,我明白了。很久没看加载dataset部分的代码了,已经通过cycle自动完成了data.list的复制,而不需要手动复制