dask / dask-jobqueue

Deploy Dask on job schedulers like PBS, SLURM, and SGE
https://jobqueue.dask.org
BSD 3-Clause "New" or "Revised" License
232 stars 139 forks source link

Resource allocation on SLURM cluster #616

Open SamTov opened 8 months ago

SamTov commented 8 months ago

Describe the issue: This is likely a misunderstanding of how to correctly use Dask to deploy cluster jobs. However, the terminology in the documentation suggests what should be happening, so I also see this as a type of bug as the functionality is so different to what one might expect.

I am trying to train a large number of machine learning models on a SLURM cluster. Each Node has 64 cores and 4 GPUs. I want to run each of my model with 1 GPU and 16 cores so I can, theoretically, get 4 models on each Node and maximise my resources.

My input script is summarised as follows:

def train(index: int):
    """
    Run the experiment.
    """

    class Network(nn.Module):
        """
        Perceptron network.
        """

        @nn.compact
        def __call__(self, x):
            """
            Call method for the network
            """
            x = nn.Dense(2, use_bias=False)(x)
            return nn.sigmoid(x)

    generator = DecisionBoundaryGenerator(ds_size, discriminator="line",  one_hot=True)
    model = nl.models.FlaxModel(
        flax_module=Network(), 
        optimizer=optax.adam(0.01), 
        input_shape=(1, 2)
    )
    # Prepare the recorders
    train_recorder = nl.training_recording.JaxRecorder(
        name=f"ce-perceptron/train_recorder_{index}",
        loss=True,
        entropy=True,
        trace=True,
        accuracy=True,
        magnitude_variance=True,
        update_rate=1,
    )
    test_recorder = nl.training_recording.JaxRecorder(
        name=f"ce-perceptron/test_recorder_{index}", loss=True, accuracy=True, update_rate=1,
    )
    train_recorder.instantiate_recorder(data_set=generator.train_ds)
    test_recorder.instantiate_recorder(data_set=generator.test_ds)

    trainer = nl.training_strategies.SimpleTraining(
        model=model,
        loss_fn=nl.loss_functions.CrossEntropyLoss(),
        accuracy_fn=nl.accuracy_functions.LabelAccuracy(),
        recorders=[train_recorder, test_recorder],
    )

    _ = trainer.train_model(
        train_ds=generator.train_ds,
        test_ds=generator.test_ds,
        batch_size=128,
        epochs=5000,
    )

indices = np.linspace(1, 20, 20, dtype=int)

cluster = SLURMCluster(
    cores=16,
    processes=1,
    memory="64GB",
    queue="Anonymised",
    walltime="01:00:00",
    death_timeout="15s",
    worker_extra_args=["--resources GPU=1"],
    log_directory=f'./ce-perceptron/dask-logs',
    job_script_prologue=["module load devel/cuda/12.1"],
    job_extra_directives=["--gres=gpu:1"]
)

cluster.scale(5)

client = Client(cluster)

results =  [client.submit(train, index,  resources={"GPU": 1}) for index in indices]

My expected behaviour is that Dask submits five workers to the queue. Each worker takes a network to train with a given index, trains it on 16 cores and 1 GPU, and starts the next one when the training is finished. What happens, however, is that four workers are submitted to the queue, and only one of them starts to take networks and train them sequentially. The other workers are just idle.

I have tried increasing the number of processes, which I would think means running multiple network trainings on a single worker and splitting the resources. But this is also not correct as, in this case, it gives each process its own GPU despite the worker theoretically only having access to one. It also only runs on a single worker; the others are left idling.

I have also tried using map instead of submit. In this case, the workers die, or they try to run as many network trainings as possible on a single worker.

Finally, I have also tried using adapt, which is preferential to my workflow. However, when I do so, all of my workers keep dying with no logs produced in an endless cycle.

Even though I am reasonably familiar with clusters, especially SLURM clusters, as I mentioned above, I think I am missing something about how the API is supposed to work.

SamTov commented 8 months ago

My best solution at the moment to force this to do what I want it to is to have a function like the following:

def deploy_jobs(index):
    cluster = SLURMCluster(
        cores=16,
        processes=1,
        memory="64GB",
        queue="single",
        walltime="03:00:00",
        death_timeout="15s",
        worker_extra_args=["--resources GPU=1"],
        log_directory=f'./ce-two-layer-10/dask-logs',
        job_script_prologue=["module load devel/cuda/12.1"],
        job_extra_directives=["--gres=gpu:1"]
    )
    cluster.scale(1)
    client = Client(cluster)

    return client.submit(
            train, index,  resources={"GPU": 1}
            )

I then loop over my parameters, create a cluster, scale it, and submit jobs to it. Essentially wrapping the submit function of Dask with a short cluster setup step. Not ideal, but it works perfectly.

The obvious downside is that I can't collect the resource information into a single dashboard to monitor.

KonstiNik commented 8 months ago

That's a nice comment, thank you for bringing that up @SamTov. I stumbled across the same issue and would be happy to learn how to solve it.

guillaumeeb commented 8 months ago

however, is that four workers are submitted to the queue

This is definitely weird, and I don't see how it would happen, do you have enough resources or quota on your cluster to book 5 GPUs?

only one of them starts to take networks and train them sequentially

Maybe the resources mechanism is not working as expected. I will try to build a reproducer. But could you try not using resources, e.g. something like:

cluster = SLURMCluster(
    cores=1, # Force only one task per worker at a time
    processes=1,
    job_cpu=16, # But still book 16 cores with Slurm
    memory="64GB",
    queue="Anonymised",
    walltime="01:00:00",
    death_timeout="15s",
    log_directory=f'./ce-perceptron/dask-logs',
    job_script_prologue=["module load devel/cuda/12.1"],
    job_extra_directives=["--gres=gpu:1"]
)

results =  [client.submit(train, index) for index in indices]
SamTov commented 8 months ago

Hi @guillaumeeb, thanks for the answer. I have enough resources and permissions to access the nodes. Usually run ~30 jobs at a time with GPUs but deploying from a bash script. For this other study though we really need to change a lot of parameters so are turning to Dask. Additionally to this, when I use my hacky solution of just creating many clusters, I can run 20 at a time.

It is also odd, when I use adapt for example, on top of it just killing workers and resubmitting, it only ever submits one at a time.

guillaumeeb commented 8 months ago

It is also odd, when I use adapt for example, on top of it just killing workers and resubmitting, it only ever submits one at a time.

Adapt and Dask resources do not work well together if I remember correctly. So please try without resources and see if it solves some problems.

SamTov commented 8 months ago

The configuration you put up earlier did not really work for me. Admittedly the jobs stopped dying. However, it didn't take any net work training tasks.

guillaumeeb commented 8 months ago

However, it didn't take any net work training tasks.

That sound strange, are you able to submit other tasks?

SamTov commented 8 months ago

With my current study, I ask for 20 workers in the adapt command. It will start 20 but at the moment only 5 of them will pick up jobs. The others fail. I removed the death_timeout="15s" part of the cluster and I now actually get an error from the worker:

2023-10-25 13:48:01,215 - distributed.nanny - INFO - Closing Nanny at 'tcp://10.20.33.12:42733'. Reason: failure-to-start-<class 'OSError'>
2023-10-25 13:48:01,216 - distributed.dask_worker - INFO - End worker
OSError: [Errno 113] No route to host

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/st/st_st/st_ac134186/miniconda3/envs/zincware/lib/python3.10/site-packages/distributed/comm/core.py", line 342, in connect
    comm = await wait_for(
  File "/home/st/st_st/st_ac134186/miniconda3/envs/zincware/lib/python3.10/site-packages/distributed/utils.py", line 1910, in wait_for
    return await asyncio.wait_for(fut, timeout)
  File "/home/st/st_st/st_ac134186/miniconda3/envs/zincware/lib/python3.10/asyncio/tasks.py", line 445, in wait_for
    return fut.result()
  File "/home/st/st_st/st_ac134186/miniconda3/envs/zincware/lib/python3.10/site-packages/distributed/comm/tcp.py", line 503, in connect
    convert_stream_closed_error(self, e)
  File "/home/st/st_st/st_ac134186/miniconda3/envs/zincware/lib/python3.10/site-packages/distributed/comm/tcp.py", line 141, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc.__class__.__name__}: {exc}") from exc
distributed.comm.core.CommClosedError: in <distributed.comm.tcp.TCPConnector object at 0x148ba14b4a60>: OSError: [Errno 113] No route to host

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/st/st_st/st_ac134186/miniconda3/envs/zincware/lib/python3.10/site-packages/distributed/core.py", line 616, in start
    await wait_for(self.start_unsafe(), timeout=timeout)
  File "/home/st/st_st/st_ac134186/miniconda3/envs/zincware/lib/python3.10/site-packages/distributed/utils.py", line 1910, in wait_for
    return await asyncio.wait_for(fut, timeout)
  File "/home/st/st_st/st_ac134186/miniconda3/envs/zincware/lib/python3.10/asyncio/tasks.py", line 445, in wait_for
    return fut.result()
  File "/home/st/st_st/st_ac134186/miniconda3/envs/zincware/lib/python3.10/site-packages/distributed/nanny.py", line 351, in start_unsafe
    comm = await self.rpc.connect(saddr)
  File "/home/st/st_st/st_ac134186/miniconda3/envs/zincware/lib/python3.10/site-packages/distributed/core.py", line 1626, in connect
    return connect_attempt.result()
  File "/home/st/st_st/st_ac134186/miniconda3/envs/zincware/lib/python3.10/site-packages/distributed/core.py", line 1516, in _connect
    comm = await connect(
  File "/home/st/st_st/st_ac134186/miniconda3/envs/zincware/lib/python3.10/site-packages/distributed/comm/core.py", line 368, in connect
    raise OSError(
OSError: Timed out trying to connect to tcp://129.206.9.242:40101 after 30 s

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/st/st_st/st_ac134186/miniconda3/envs/zincware/lib/python3.10/runpy.py", line 196, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/home/st/st_st/st_ac134186/miniconda3/envs/zincware/lib/python3.10/runpy.py", line 86, in _run_code
    exec(code, run_globals)
  File "/home/st/st_st/st_ac134186/miniconda3/envs/zincware/lib/python3.10/site-packages/distributed/cli/dask_worker.py", line 544, in <module>
    main()  # pragma: no cover
  File "/home/st/st_st/st_ac134186/miniconda3/envs/zincware/lib/python3.10/site-packages/click/core.py", line 1157, in __call__
    return self.main(*args, **kwargs)
  File "/home/st/st_st/st_ac134186/miniconda3/envs/zincware/lib/python3.10/site-packages/click/core.py", line 1078, in main
    rv = self.invoke(ctx)
  File "/home/st/st_st/st_ac134186/miniconda3/envs/zincware/lib/python3.10/site-packages/click/core.py", line 1434, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/home/st/st_st/st_ac134186/miniconda3/envs/zincware/lib/python3.10/site-packages/click/core.py", line 783, in invoke
    return __callback(*args, **kwargs)
  File "/home/st/st_st/st_ac134186/miniconda3/envs/zincware/lib/python3.10/site-packages/distributed/cli/dask_worker.py", line 450, in main
    asyncio_run(run(), loop_factory=get_loop_factory())
  File "/home/st/st_st/st_ac134186/miniconda3/envs/zincware/lib/python3.10/site-packages/distributed/compatibility.py", line 236, in asyncio_run
    return loop.run_until_complete(main)
  File "/home/st/st_st/st_ac134186/miniconda3/envs/zincware/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete
    return future.result()
  File "/home/st/st_st/st_ac134186/miniconda3/envs/zincware/lib/python3.10/site-packages/distributed/cli/dask_worker.py", line 447, in run
    [task.result() for task in done]
  File "/home/st/st_st/st_ac134186/miniconda3/envs/zincware/lib/python3.10/site-packages/distributed/cli/dask_worker.py", line 447, in <listcomp>
    [task.result() for task in done]
  File "/home/st/st_st/st_ac134186/miniconda3/envs/zincware/lib/python3.10/site-packages/distributed/cli/dask_worker.py", line 420, in wait_for_nannies_to_finish
    await asyncio.gather(*nannies)
  File "/home/st/st_st/st_ac134186/miniconda3/envs/zincware/lib/python3.10/asyncio/tasks.py", line 650, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/home/st/st_st/st_ac134186/miniconda3/envs/zincware/lib/python3.10/site-packages/distributed/core.py", line 624, in start
    raise RuntimeError(f"{type(self).__name__} failed to start.") from exc
RuntimeError: Nanny failed to start.
guillaumeeb commented 7 months ago

Either this is a network issue, either your scheduler is overloaded and cannot accept new Workers.

Does this also happen if you didn't submit any task yet? Just scaling up the Cluster?