ray-project / ray

Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.37k stars 5.65k forks source link

[Core] Trainers/Dataloaders from separate tasks impede each other #42135

Open chunweiyuan opened 9 months ago

chunweiyuan commented 9 months ago

What happened + What you expected to happen

To collect statistics I'd like to run many TemporalFusionTransformer pipelines with different random number seeds. Using Ray to parallelize these tasks leads to a situation where the remote dataloaders seem to impede each other, leading to very slow training. I only have access to cpus.

Running the code snippet below (the first 80 lines copied from here) with n_tasks = 1 shows training occurring faster than 3 it/s:

Epoch 1:   9%|▉         | 15/161 [00:04<00:39,  3.71it/s, v_num=5.97e+7, train_loss_step=111.0, val_loss=158.0, train_loss_epoch=161.0]

But once n_tasks > 1 loading drops to <= 0.1 it/s. Here's what it looks like for n_tasks = 2:

Epoch 0:   1%|          | 2/161 [01:57<2:35:59,  0.02it/s, v_num=5.97e+7, train_loss_step=394.0]
Epoch 0:  10%|▉         | 16/161 [01:59<18:00,  0.13it/s, v_num=5.97e+7, train_loss_step=184.0]

Completely removing Ray from the code, running it serially, returns training to > 3 it/s.

Might the concurrent dataloaders all be competing each other for resources on the driver process, instead of using their own worker process? I've also tried using Actors instead of Tasks, but the problem persists. Closest issue I've found thus far is this one, which is still open. Adding multiprocessing_context="fork", as indicated here (with num_workers=1), shows no effect.

Versions / Dependencies

ubuntu 20.04.6 slurm 23.02.5

lightning 2.1.2 python 3.10.13 pytorch-forecasting 1.0.0 pytorch-lightning 2.1.2 pytorch-optimizer 2.12.0 ray 2.9.0

Reproduction script

import lightning.pytorch as pl
import numpy as np
import ray
from pytorch_forecasting import (TemporalFusionTransformer,
                                 TimeSeriesDataSet)
from pytorch_forecasting.data import GroupNormalizer
from pytorch_forecasting.data.examples import get_stallion_data
from pytorch_forecasting.metrics import QuantileLoss

data = get_stallion_data()

# add time index
data["time_idx"] = data["date"].dt.year * 12 + data["date"].dt.month
data["time_idx"] -= data["time_idx"].min()

# add additional features
data["month"] = data.date.dt.month.astype(str).astype("category")  # categories have be strings
data["log_volume"] = np.log(data.volume + 1e-8)
data["avg_volume_by_sku"] = data.groupby(["time_idx", "sku"], observed=True).volume.transform("mean")
data["avg_volume_by_agency"] = data.groupby(["time_idx", "agency"], observed=True).volume.transform("mean")

# we want to encode special days as one variable and thus need to first reverse one-hot encoding
special_days = [
    "easter_day",
    "good_friday",
    "new_year",
    "christmas",
    "labor_day",
    "independence_day",
    "revolution_day_memorial",
    "regional_games",
    "fifa_u_17_world_cup",
    "football_gold_cup",
    "beer_capital",
    "music_fest",
]
data[special_days] = data[special_days].apply(lambda x: x.map({0: "-", 1: x.name})).astype("category")

max_prediction_length = 6
max_encoder_length = 24
training_cutoff = data["time_idx"].max() - max_prediction_length

training = TimeSeriesDataSet(
    data[lambda x: x.time_idx <= training_cutoff],
    time_idx="time_idx",
    target="volume",
    group_ids=["agency", "sku"],
    min_encoder_length=max_encoder_length // 2,  # keep encoder length long (as it is in the validation set)
    max_encoder_length=max_encoder_length,
    min_prediction_length=1,
    max_prediction_length=max_prediction_length,
    static_categoricals=["agency", "sku"],
    static_reals=["avg_population_2017", "avg_yearly_household_income_2017"],
    time_varying_known_categoricals=["special_days", "month"],
    variable_groups={"special_days": special_days},  # group of categorical variables can be treated as one variable
    time_varying_known_reals=["time_idx", "price_regular", "discount_in_percent"],
    time_varying_unknown_categoricals=[],
    time_varying_unknown_reals=[
        "volume",
        "log_volume",
        "industry_volume",
        "soda_volume",
        "avg_max_temp",
        "avg_volume_by_agency",
        "avg_volume_by_sku",
    ],
    target_normalizer=GroupNormalizer(
        groups=["agency", "sku"], transformation="softplus"
    ),  # use softplus and normalize by group
    add_relative_time_idx=True,
    add_target_scales=True,
    add_encoder_length=True,
)

# create validation set (predict=True) which means to predict 
# the last max_prediction_length points in time for each series
validation = TimeSeriesDataSet.from_dataset(training, data, predict=True,
                                            stop_randomization=True)

# ----- below here pertains to Ray ----- #

n_tasks = 1  # number of concurrent training tasks (>1 leads to slow training)

runtime_env = {"env_vars": {"NCCL_SOCKET_IFNAME": "lo,docker0"}}

ray.init(include_dashboard=False,
         num_cpus=n_tasks + 1,  # just some number >= n_tasks
         object_store_memory=50 * 1e9,  # large enough
         runtime_env=runtime_env)

@ray.remote(num_cpus=1)
def train_task(seed: int,
               training: TimeSeriesDataSet,
               validation: TimeSeriesDataSet) -> TemporalFusionTransformer:
    # create dataloaders for model
    batch_size = 128  # set this between 32 to 128

    # changing to num_workers=1 shows no effect.
    # num_workers=1, multiprocessing_context="fork" shows no effect.
    # neither does num_workers=1, multiprocessing_context="spawn", persistent_workers=True
    train_dataloader = training.to_dataloader(train=True,
                                              batch_size=batch_size,
                                              num_workers=0) 
    val_dataloader = validation.to_dataloader(train=False,
                                              batch_size=batch_size * 10,
                                              num_workers=0)

    pl.seed_everything(seed)

    trainer = pl.Trainer(
        accelerator="cpu",
        gradient_clip_val=0.1,
        max_epochs=1000,  # just so that it runs for a while
    )

    tft = TemporalFusionTransformer.from_dataset(
        training,
        learning_rate=0.03,
        hidden_size=16,
        attention_head_size=2,
        dropout=0.1,
        hidden_continuous_size=8,
        loss=QuantileLoss(),
        optimizer="Ranger",
        reduce_on_plateau_patience=4)

    print(f"Number of parameters in network: {tft.size()/1e3:.1f}k")

    trainer.fit(tft,
                train_dataloaders=train_dataloader,
                val_dataloaders=val_dataloader)

    return tft

tfts = []

for seed in range(n_tasks):
    tft = train_task.remote(seed, training, validation)
    tfts.append(tft)

tfts = ray.get(tfts)

ray.shutdown()

Issue Severity

Medium: It is a significant difficulty but I can work around it.

stephanie-wang commented 9 months ago

Yes, it looks like this is probably an issue with the dataloaders competing for resources. Do you know if the data loading is using multithreading under the hood? If so, there can be contention between the two dataloaders, and you will need to either manually limit the dataloaders to different cores, or share the same dataloader between Ray tasks/actors.

chunweiyuan commented 9 months ago

Hi @stephanie-wang,

  1. Yes, I believe under-the-hood Dataloader uses Python's multiprocessing package. The high-level instruction for the num_workers is here.

  2. The default multiprocessing context, in Unix, seems to be fork link. On my end I've experimented with various permutations of num_workers=0 or 1 and multiprocessing_context="fork" or "spawn" or "forkserver", and none of them solves the bottleneck of n_tasks > 1.

  3. I have played with os.sched_setaffinity() like this within my Ray task:

    cpu_ids = list(os.sched_getaffinity(0))
    
    print(seed, os.sched_getaffinity(0))
    print(f"setting seed {seed} to use {cpu_ids[seed]}")
    
    os.sched_setaffinity(0, [cpu_ids[seed]])
    print(f"now seed {seed} uses cpu_ids: ", os.sched_getaffinity(0))

    and obtained the following

    (train_task pid=1380821) 1 {96, 98, 100, 102, 40, 42, 44, 46, 48, 50, 92, 94}
    (train_task pid=1380821) setting seed 1 to use 98
    (train_task pid=1380821) now seed 1 uses cpu_ids:  {98}
    (train_task pid=1380821) [rank: 0] Seed set to 1
    (train_task pid=1380820) [rank: 0] Seed set to 0
    (train_task pid=1380820) 0 {96, 98, 100, 102, 40, 42, 44, 46, 48, 50, 92, 94}
    (train_task pid=1380820) setting seed 0 to use 96
    (train_task pid=1380820) now seed 0 uses cpu_ids:  {96}

    but then everything moves even slower, whether I set num_workers=0 or 1:

    Epoch 0:   1%|          | 1/161 [01:10<3:08:37,  0.01it/s, v_num=5.98e+7, train_loss_step=301.0]
    Epoch 0:   1%|          | 1/161 [01:10<3:08:48,  0.01it/s, v_num=5.98e+7, train_loss_step=331.0]
    Epoch 0:   1%|          | 2/161 [02:21<3:07:35,  0.01it/s, v_num=5.98e+7, train_loss_step=394.0]
    Epoch 0:   1%|          | 2/161 [02:21<3:08:01,  0.01it/s, v_num=5.98e+7, train_loss_step=320.0]

Not sure if the root cause of the problem is exactly this, but I have tried some of their suggestions, to no avail.

stephanie-wang commented 9 months ago

Hi @chunweiyuan I tried out your script on my laptop and wasn't able to reproduce with num_tasks=2 and 4. The only changes I made were to the ray.init line (my machine did not have enough memory, and setting num_cpus manually should not be necessary).

ray.init(include_dashboard=False,
         #num_cpus=n_tasks + 1,  # just some number >= n_tasks
         #object_store_memory=50 * 1e9,  # large enough
         runtime_env=runtime_env)    

Can you provide more details on the machine that you're running on? Perhaps there is just not enough physical compute.

chunweiyuan commented 9 months ago

Hi @stephanie-wang,

Thank you very much for your reply. I think you're onto something. Here's my long-winded followup, please bear with me:

When I test out the script (n_tasks>=2) on my work laptop (MacBook Pro 2023, Apple M2 Pro, 16 GB, Sonoma 14.2.1, osx-arm64), I get some nice >12 it/s training rates for each task:

Epoch 0:   3%|▎         | 5/161 [00:00<00:12, 12.41it/s, v_num=3, train_loss_step=384.0]
Epoch 0:  27%|██▋       | 43/161 [00:03<00:09, 11.97it/s, v_num=3, train_loss_step=130.0]

But when I run it on a cluster node (Intel Xeon Gold 6230, 768 GB, x86_64, GNU/Linux, ubuntu 20.04.6, slurm 23.02.5), I get ~0.1 it/s again:

Epoch 0:   1%|          | 2/161 [00:19<25:44,  0.10it/s, v_num=6.16e+7, train_loss_step=394.0]
Epoch 0:   1%|          | 2/161 [00:20<27:01,  0.10it/s, v_num=6.16e+7, train_loss_step=320.0]

In both cases, I use conda 23.11.0, and installed my environments with the following steps:

conda create -n tft python=3.10 numpy xarray pandas cython netcdf4 scipy 
conda activate tft
pip install pytorch-forecasting
pip install -U 'ray[default]'

I looked at my two environments (MacBook vs. cluster), and I do notice that on typing conda list openmp, the MacBook env has

# Name                    Version                   Build  Channel
llvm-openmp               14.0.6               hc6e5704_0  

whereas the cluster env has

# Name                    Version                   Build  Channel                                                               
_openmp_mutex             4.5                  2_kmp_llvm    conda-forge                                                         
llvm-openmp               14.0.6               h9e868ea_0

I wonder if the discrepancy comes from the issue mentioned here? Following some of the suggestions within, I rebuilt my env on the cluster. Now conda list openmp shows

# Name                    Version                   Build  Channel
_openmp_mutex             5.1                       1_gnu  
intel-openmp              2023.1.0         hdb19cb5_46306  

and running the script with n_tasks=2 yields

Epoch 0:   2%|▏         | 3/161 [00:01<01:14,  2.11it/s, v_num=6.16e+7, train_loss_step=368.0]
Epoch 0:   1%|          | 2/161 [00:01<01:34,  1.68it/s, v_num=6.16e+7, train_loss_step=320.0]

whereas n_tasks=4 gives

Epoch 0:   2%|▏         | 4/161 [00:04<02:48,  0.93it/s, v_num=6.16e+7, train_loss_step=298.0]
Epoch 0:   2%|▏         | 3/161 [00:04<03:49,  0.69it/s, v_num=6.16e+7, train_loss_step=369.0]

While it is improvement, it does show that the total throughput is ~3.7 it/s (the n_tasks=1 case), and that adding workers just linearly scales down that rate. This is true whether I set num_workers=0 or 1, and it still indicates competition between worker

Long story short, these are my new finds thus far:

  1. On my MacBook, Ray parallelization works.
  2. On my Intel cluster, the workers seems to compete with each other for resources. The situation is slightly ameliorated via updates to openmp packages, but the impedance remains.

I wonder if you observe the same trends on your end?

stephanie-wang commented 9 months ago

Not sure if I can do much else here since I cannot reproduce, but I'd try checking a few things:

  1. Check what CPU affinity is set within Ray worker processes
  2. To avoid issue of raylet inheriting some CPU affinity, manually start Ray with ray start --head before running any Ray script. The Ray script will connect automatically to the raylet server.
  3. Check if the problem is coming from Ray / the driver script's environment by running the "tasks" in parallel as separate bash scripts.
chunweiyuan commented 9 months ago

Hi @stephanie-wang,

Did you try it on a Mac ARM notebook as well? Were you able to test this on an Intel CPUs, perhaps on a cluster?

I've tried step (2) you suggested, to no avail. I think (1) and (3) require a little more time, which I will investigate later. I've also tried installing/removing different openmp/llvm packages from my environment, which seem to have some effect, but the impedance issue remains.

I don't expect to have anything meaningful to add to this ticket over the next few days. How should we handle its status?

stephanie-wang commented 9 months ago

Did you try it on a Mac ARM notebook as well? Were you able to test this on an Intel CPUs, perhaps on a cluster?

I tested on Linux with Intel. Most likely it has something to do with the runtime env.

We can keep the ticket open for now if you find out more information, but there is not much we can do without a repro. Feel free to remove the needs-repro label once you have something.

chunweiyuan commented 9 months ago

Hi @stephanie-wang,

Sounds good. Do you mind sharing your environment information (python, unix, etc.) with me? I would like to, if possible, duplicate your environment as much as I could. Thanks.

chunweiyuan commented 7 months ago

Hi @stephanie-wang, you mentioned that you tested the code on an Intel machine. Would it be possible to share the CPU info, and the version of openmp packages in your env? On my end I have different Xeon Golds to choose from. Would like to replicate your run to the best I can. Thanks.

jjyao commented 7 months ago

@iycheng could you try to reproduce it on Mac as oncall?