motional / nuplan-devkit

The devkit of the nuPlan dataset.
https://www.nuplan.org
Other
669 stars 128 forks source link

Why the training always fails on the full data sets? #253

Open yaqlee opened 1 year ago

yaqlee commented 1 year ago

When training with the full dataset, I always encounter one of the following two issues:

  1. When training with a single worker, I get an error message

(raylet) [2023-04-01 15:58:04,464 E 1339 1339] (raylet) node_manager.cc:3040: 1 Workers (tasks / actors) killed due to memory pressure (OOM), 0 Workers crashed due to other reasons at node (ID: 73a5fa71ed2792f3449870753501e568369cce1730dc911c2c9328b6, IP: XX.254.XX.127) over the last time period. To see more information about the Workers killed on this node, useray logs raylet.out -ip XX.254.XX.127 (raylet) (raylet) Refer to the documentation on how to address the out of memory issue: https://docs.ray.io/en/latest/ray-core/scheduling/ray-oom-prevention.html. Consider provisioning more memory on this node or reducing task parallelism by requesting more CPUs per task. To adjust the kill threshold, set the environment variableRAY_memory_usage_thresholdwhen starting Ray. To disable worker killing, set the environment variableRAY_memory_monitor_refresh_msto zero.

  1. When training with multiple workers, I get error messages as these:

File "/opt/conda/envs/nuplan/lib/python3.9/site-packages/torch/distributed/distributed_c10d.py", line 219, in _store_based_barrier raise RuntimeError( RuntimeError: Timed out initializing process group in store based barrier on rank: 2, for key: store_based_barrier_key:1 (world_size=32, worker_count=123, timeout=0:30:00)

Each worker used for training has 352GB of memory and 8 GPUs, and the training command is like this:

conda run -n nuplan --no-capture-output python -m torch.distributed.launch --nproc_per_node $MLP_WORKER_GPU --master_addr $MLP_WORKER_0_HOST --node_rank $MLP_ROLE_INDEX --master_port $MLP_WORKER_0_PORT --nnodes $MLP_WORKER_NUM /root/code/nuplan-devkit/nuplan/planning/script/run_training.py

the configuration I composed is like this:

'# Compose the configuration cfg = hydra.compose(config_name=CONFIG_NAME, overrides=[ f'group={str(SAVE_DIR)}', f'cache.cache_path={str(SAVE_DIR)}/cache', f'experiment_name={EXPERIMENT}', f'job_name={JOB_NAME}', 'py_func=train', '+training=training_vector_model', #

raster model that consumes ego, agents and map raster layers and regresses the ego's trajectory

    'scenario_builder=nuplan',  # use nuplan trainval database
    'scenario_filter.limit_total_scenarios=0.01',  # Choose 500 scenarios to train with, int; float should be in (0,1)
    'lightning.trainer.params.accelerator=ddp_spawn',
    # ddp is not allowed in interactive environment, using ddp_spawn instead - this can bottleneck the data pipeline, it is recommended to run training outside the notebook
    'lightning.trainer.params.max_epochs=10',
    'data_loader.params.batch_size=16',
    'data_loader.params.num_workers=8',
])'

Could you please advise on how to resolve the above issues? What adjustments should be made to the training configuration or command?

patk-motional commented 1 year ago

Hi @yaqlee,

If I understand correctly, worker here means an instance/machine with 352GB of memory and 8 GPUs.

However, we do not use ray for distributed training but rather for multi-processing. Can you first try to train on one machine using worker=sequential to isolate the issue? This will be very slow however, it should help with getting to the root cause

yaqlee commented 1 year ago

As described in advanced_model_training.ipynb,

Some common pitfalls:

If preprocessing takes a large amount of time, it can cause training to fail (especially in a distributed setting). It may be beneficial to create the feature cache by first running the caching with the argument py_func=cache cache.force_feature_computation=True. This will generate the features using CPU, which should speed up training dramatically. Once caching is complete, supply the overrides cache.force_feature_computation=False cache.cache_path=/path/to/cache cache.use_cache_without_dataset=True to avoid re-computing the features.

so I should run caching first, but when I run caching on distributed 4 instance with cpu, it takes too long(one thirds of full data almost would take 9 days), which makes me wonder if the configure was right. could you please give a demonstration on how to run caching with multi machines?Thanks a lot

yaqlee commented 1 year ago

anyway, what's the normal duration on running caching? It will helps me estimate if the preprocessor was normally running.

bhyang commented 1 year ago

anyway, what's the normal duration on running caching? It will helps me estimate if the preprocessor was normally running.

@patk-motional Bump on this? For me caching 10% of the dataset takes nearly 3 days and I'm wondering if this is expected. I've tried both worker=ray_distributed and worker=single_machine_thread_pool (with both ProcessPoolExecutor and ThreadPoolExecutor), and this is the fastest I can get it to run. I suspect the computation is I/O bound since the CPU utilization is not that high.

Maybe it's just a hardware diff?

patk-motional commented 1 year ago

Hi @bhyang,

Can you share what machines and command you are using? Internally we use 4 CPUs with 70 cores. That takes about an hour.

edit: on the training set with scenario_filter.timestamp_threshold_s=8

bAmpT commented 1 year ago

@patk-motional Is it possible for you to share the full scenario_filter/scenario_builder hydra config that you are using for baseline training? Are you using the tagged scenarios or the complete logs?

bhyang commented 1 year ago

Hi @bhyang,

Can you share what machines and command you are using? Internally we use 4 CPUs with 70 cores. That takes about an hour.

edit: on the training set with scenario_filter.timestamp_threshold_s=8

Here's my CPU info (from lscpu): image

Here's the exact command:

python nuplan/planning/script/run_training.py \
    experiment_name=test_exp \
    py_func=cache \
    cache.cache_path=[my_cache_path] \
    cache.force_feature_computation=True \
    +training=training_urban_driver_open_loop_model \
    scenario_builder=nuplan \
    scenario_filter.limit_total_scenarios=0.1 \
    lightning.trainer.params.max_epochs=10 \
    data_loader.params.batch_size=32 \
    data_loader.params.num_workers=8 \
    worker=single_machine_thread_pool \
    worker.use_process_pool=True
bhyang commented 1 year ago

@patk-motional Any update on this? Thanks!

kanikel commented 1 year ago

@patk-motional Same here. My training task of lanegcn keeps getting killed due to memory being full. I already set limit_total_scenarios=0.01 but still get out of memory. I only have several machines each with one or two gpus. How am I supposed to train on my machine with your dataset? And pytorch ddp could actually work on distributed machines so there's no need for ray distributed system. Your code is encapsulated too well. How can we better remove the Ray framework and use PyTorch DDP for multi-node training? Do you have any worker code example available internally for this purpose? If not, can you tell us how to proceed without using the Ray distributed system?

bhyang commented 1 year ago

@patk-motional Same here. My training task of lanegcn keeps getting killed due to memory being full. I already set limit_total_scenarios=0.01 but still get out of memory. I only have several machines each with one or two gpus. How am I supposed to train on my machine with your dataset? And pytorch ddp could actually work on distributed machines so there's no need for ray distributed system. Your code is encapsulated too well. How can we better remove the Ray framework and use PyTorch DDP for multi-node training? Do you have any worker code example available internally for this purpose? If not, can you tell us how to proceed without using the Ray distributed system?

I found that setting worker.threads_per_node (with worker=ray_distributed) to a value much smaller than the number of cores on my machine considerably reduced the memory consumption without any slowdown during training. On a machine with 88 cores, the memory consumption was over 200GB with the default worker config. Setting threads_per_node to 20 reduced it to ~40GB, but you could probably reduce it even more if there's still memory issues.

patk-motional commented 1 year ago

Hi all,

Here is the full config to cache the features for the urban driver model. You will see that this is nothing special.

python nuplan_devkit/nuplan/planning/script/run_training.py \
+training=training_urban_driver_open_loop_model \
py_func=cache cache.force_feature_computation=true \
cache.cache_path=path/to/cache \
scenario_builder.db_files=path/to/train_set/ \
scenario_filter.limit_total_scenarios=0.1 \
data_augmentation="[]" \

We have a system to dispatch jobs for distributed caching and training internally. There are a few environment variables to set:

Caching job

There is no node-to-node communication required. The code looks at the environment variable NUM_NODES (example) and NODE_RANK (example) to chunk and distribute the work to each worker.

Lastly, there is an assumption that your data (db files) are stored in AWS somewhere for multi-node caching. Otherwise, you will need to load all of the data you need to each machine prior to running the job. You will also need to modify the code in this function here if you are not using AWS S3.

Training job

You can follow https://pytorch-lightning.readthedocs.io/en/1.6.5/clouds/cluster.html as a general guide. We are using pl.plugins.DDPPlugin not ray for node-to-node communication.

The same note about requiring AWS S3 applies to training as well. Otherwise, make the changes stated in the caching section.

kanikel commented 1 year ago

Thank you for your information. I will follow your instructions to give it a try.

polcomicepute commented 1 year ago

Hi, @patk-motional I've encountered a failure while attempting to train on the full dataset using the DDPPlugin after caching in local. The loading time for the complete dataset from cache files exceeds 30 minutes , resulting in an error originating from 'torch/distributed/distributed_c10d.py:460': INFO {/opt/conda/envs/nuplan/lib/python3.9/site-packages/torch/distributed/distributed_c10d.py:460} Waiting in store based barrier to initialize process group for rank: 0, key: store_based_barrier_key:1 (world_size=4, worker_count=1, timeout=0:30:00) Timed out initializing process group in store-based barrier on rank: 0, for key: store_based_barrier_key:1 (world_size=4, worker_count=1, timeout=0:30:00)

I have measured the time it takes for data loading(approximately 19 million datasets) in different sections of the extract_scenarios_from_cache function within the scenario_builder.py file, and the results were as follows:

image image image

During the process of placing a barrier and synchronizing until processing on all other GPUs is finished, it seems that a prolonged duration is being consumed while reading the candidate_scenario_dirs path. This extended time consumption leads to a timeout and results in a failure during the preprocessing stage of training.)

I suspect that the large dataset is the underlying reason for this issue. This problem might potentially be resolved by manipulating the timeout arguments introduced in DDPStrategy in PyTorch Lightning version 2.0.7. However, the current version being used by Nuplan is PyTorch Lightning 1.4.9, which employs the DDPPlugin.

Could you give me some advice? Thanks for your assistance with the excellent dataset!

CristianGariboldi commented 7 months ago

I found that setting worker.threads_per_node (with worker=ray_distributed) to a value much smaller than the number of cores on my machine considerably reduced the memory consumption without any slowdown during training. On a machine with 88 cores, the memory consumption was over 200GB with the default worker config. Setting threads_per_node to 20 reduced it to ~40GB, but you could probably reduce it even more if there's still memory issues.

Hi @bhyang , I am facing the same memory problem during cache generation, and I found your solution interesting. Where do you set worker.threads_per_node to a smaller value in order to reduce memory usage? I'd like to give it a try, thanks!

bhyang commented 7 months ago

Hi @bhyang , I am facing the same memory problem during cache generation, and I found your solution interesting. Where do you set worker.threads_per_node to a smaller value in order to reduce memory usage? I'd like to give it a try, thanks!

@CristianGariboldi You can see my earlier comment for the exact command I used, except you can replace the worker arguments with worker=ray_distributed and worker.threads_per_node=20.

lijiekiff commented 1 month ago

你好,@patk-motional在本地缓存后尝试使用 DDPPlugin对完整数据集进行训练 时,我遇到了失败。从缓存文件加载完整数据集的时间超过 30 分钟 ,导致源自“torch/distributed/distributed_c10d.py:460”的错误:**** INFO {/opt/conda/envs/nuplan/lib/python3.9/site-packages/torch/distributed/distributed_c10d.py:460} Waiting in store based barrier to initialize process group for rank: 0, key: store_based_barrier_key:1 (world_size=4, worker_count=1, timeout=0:30:00) Timed out initializing process group in store-based barrier on rank: 0, for key: store_based_barrier_key:1 (world_size=4, worker_count=1, timeout=0:30:00)

我测量了scenario_builder.py文件中extract_scenarios_from_cache函数不同部分的数据加载时间(大约1900万个数据集),结果如下:****

图像 图像 图像

在放置屏障并同步直到所有其他 GPU 上的处理完成的过程中,似乎在读取 candidates_scenario_dirs 路径时会耗费较长的时间。这种延长的时间消耗会导致超时,并导致训练预处理阶段失败。)

我怀疑数据集过大是导致此问题的根本原因。通过操纵 PyTorch Lightning 版本 2.0.7 中 DDPStrategy 中引入的超时参数,可能会解决此问题。但是,Nuplan 当前使用的版本是 PyTorch Lightning 1.4.9,它采用了 DDPPlugin。

您能给我一些建议吗? 感谢您提供的优秀数据集!

hello, I met the same problem. How do you do to solve the probelm? thanks

lijiekiff commented 1 month ago

When training with the full dataset, I always encounter one of the following two issues:

  1. When training with a single worker, I get an error message

(raylet) [2023-04-01 15:58:04,464 E 1339 1339] (raylet) node_manager.cc:3040: 1 Workers (tasks / actors) killed due to memory pressure (OOM), 0 Workers crashed due to other reasons at node (ID: 73a5fa71ed2792f3449870753501e568369cce1730dc911c2c9328b6, IP: XX.254.XX.127) over the last time period. To see more information about the Workers killed on this node, useray logs raylet.out -ip XX.254.XX.127(raylet) (raylet) Refer to the documentation on how to address the out of memory issue: https://docs.ray.io/en/latest/ray-core/scheduling/ray-oom-prevention.html. Consider provisioning more memory on this node or reducing task parallelism by requesting more CPUs per task. To adjust the kill threshold, set the environment variableRAY_memory_usage_thresholdwhen starting Ray. To disable worker killing, set the environment variableRAY_memory_monitor_refresh_msto zero.

  1. When training with multiple workers, I get error messages as these:

File "/opt/conda/envs/nuplan/lib/python3.9/site-packages/torch/distributed/distributed_c10d.py", line 219, in _store_based_barrier raise RuntimeError( RuntimeError: Timed out initializing process group in store based barrier on rank: 2, for key: store_based_barrier_key:1 (world_size=32, worker_count=123, timeout=0:30:00)

Each worker used for training has 352GB of memory and 8 GPUs, and the training command is like this:

conda run -n nuplan --no-capture-output python -m torch.distributed.launch --nproc_per_node $MLP_WORKER_GPU --master_addr $MLP_WORKER_0_HOST --node_rank $MLP_ROLE_INDEX --master_port $MLP_WORKER_0_PORT --nnodes $MLP_WORKER_NUM /root/code/nuplan-devkit/nuplan/planning/script/run_training.py

the configuration I composed is like this:

'# Compose the configuration cfg = hydra.compose(config_name=CONFIG_NAME, overrides=[ f'group={str(SAVE_DIR)}', f'cache.cache_path={str(SAVE_DIR)}/cache', f'experiment_name={EXPERIMENT}', f'job_name={JOB_NAME}', 'py_func=train', '+training=training_vector_model', #

raster model that consumes ego, agents and map raster layers and regresses the ego's trajectory

'scenario_builder=nuplan', # use nuplan trainval database 'scenario_filter.limit_total_scenarios=0.01', # Choose 500 scenarios to train with, int; float should be in (0,1) 'lightning.trainer.params.accelerator=ddp_spawn',

ddp is not allowed in interactive environment, using ddp_spawn instead - this can bottleneck the data pipeline, it is recommended to run training outside the notebook

'lightning.trainer.params.max_epochs=10', 'data_loader.params.batch_size=16', 'data_loader.params.num_workers=8', ])'

Could you please advise on how to resolve the above issues? What adjustments should be made to the training configuration or command?

Hello. I met the same question: RuntimeError: Timed out initializing process group in store based barrier on rank: 0, for key: store_based_barrier_key:1 (world_size=8, worker_count=1, timeout=0:30:00)

How did you deal with it? Thanks