fractal-analytics-platform / fractal-server

Fractal backend
https://fractal-analytics-platform.github.io/fractal-server/
BSD 3-Clause "New" or "Revised" License
11 stars 3 forks source link

Handling `AssocMaxSubmitJobLimit` #377

Closed jluethi closed 1 year ago

jluethi commented 1 year ago

I've started to test large datasets at FMI. One of them being a full 384 well plate (=> up to 16 images per well for 384 wells, with a few Z slices). Processing runs smooth overall, but we run into the following error (uncaught, on the server side):

sbatch: error: AssocMaxSubmitJobLimit
sbatch: error: Batch job submission failed: Job violates accounting/QOS policy (job submit limit, user's size and/or time limits)

I'll need to investigate what those limits are concretely and how far we are from them. It looks like we can submit a max of 100 jobs at the moment and, if Fractal runs into this, it fails not so gracefully. The 100 jobs are actually being run until they all complete. Then, the workflow just stops (neither submitting more jobs nor the next task). Each job ran for about 20 minutes, so that seems like decent usage of a slurm cluster.

While there are some options to get this limit increased for us (I think), we'll likely anyway have limited gains by running many more jobs in parallel. The infrastructure seems to hold up better than at UZH, but we get the following numbers: Processing 1 well individually: ~ 7 min for Yokogawa to OME-Zarr task Processing ~30 wells at once: ~20 min for each Yokogawa to OME-Zarr task

Thus, even if the cluster had many more nodes available, we likely wouldn't want to scale jobs much further. But scaling to run ~30-50 parallel OME-Zarr parsing jobs appears quite reasonable. And it's still ~10x faster than running the parsing sequentially per well.

Would it be easy to limit the number of jobs that are submitted at any given time? Are we currently limiting how many jobs should be run in parallel or how many jobs can be submitted?

I'll anyway inquire more about those specification after the holidays, given that some of those limits are enforced hard.

jluethi commented 1 year ago

Also, the job status remain on "running", even after the job has failed

tcompa commented 1 year ago

following error (uncaught, on the server side):

Also, the job status remain on "running", even after the job has failed

These are all consequences of #343, and will only change after that issue is closed.


In the stress test at UZH we submitted 400 jobs and they would just appear in the queue. Then SLURM would start them whenever possible.

Would it be easy to limit the number of jobs that are submitted at any given time? Are we currently limiting how many jobs should be run in parallel or how many jobs can be submitted?

Quick answers: no and no.

Let's re-discuss about this (we already did it a few times, but we only tested the UZH SLURM configuration). Our opinion has always been that we would rather not make fractal-server responsible for job scheduling, since this is exactly what SLURM is designed for. If we hit a hard wall because FMI has some SLURM config that cannot be modified and that is blocking for large processing, let's reconsider this.

On how to proceed:

  1. Let's not test these limits with actual fractal tasks, since we already have a stress-test folder (we should just update it, as it was running with fractal-server 1.0.0b1).
  2. Better: you could test this by hand - by submitting a large number of jobs with just a sleep 100 task.
  3. Even better: you could ask the sysadmin what are the current limits in SLURM configuration ;)
tcompa commented 1 year ago

For the record, with @mfranzon we found at least one hypothesis on how to address this. Briefly: in call_parallel_task replace the block

        map_iter = executor.map(partial_call_task, component_list, **extra_setup)
        # Wait for execution of all parallel (this explicitly calls .result()
        # on each parallel task)
        for _ in map_iter:
            pass  # noqa: 701

with a list of blocks like this, each one applied to a subset of component_list.

Complexity ladder:

  1. This means introducing a global FRACTAL_RUNNER_MAX_JOBS variable, which is not specific for SLURM but will be there for all backends (of course it can be set to "infinite").
  2. This only limits the number of submitted jobs for a given task of a given workflow execution. If you have another workflow being executed, this will not be taken into account, and you'll hit the same error as above.
  3. Handling info about the global user's jobs (fractal or non-fractal related) should be done within the FractalSlurmExecutor (as part of the SLURM backend). We would poll squeue at the beginning of a workflow execution, and fail immediately if CURRENT_JOBS+FRACTAL_RUNNER_MAX_JOBS>SOME_LIMIT. This also means we need to know (and let fractal know) the site-specific SLURM configuration.
  4. Even reaching point 3, we won't be fully safe: a user can always submit another job from outside fractal, and then fractal will fail. Or, even simpler, there may appear some jobs from a different workflow - which had not been scheduled yet (e.g. when passing from a global to a parallel task, the number of SLURM jobs greatly
jluethi commented 1 year ago

with a list of blocks like this, each one applied to a subset of component_list.

A list of blocks like what? What would the effect of this be? Would jobs be grouped and executed in the same slurm jobs sequentially? Or would Fractal wait with submitting more jobs and submit them once earlier jobs have finished?

FRACTAL_RUNNER_MAX_JOBS: This would be a useful thing to have potentially for different architectures or even to limit larger tests in local runs. This would limit how many jobs are run at once, right?

2-4. This only limits the number of submitted jobs for a given task of a given workflow execution.

I think that's a fair line to draw. Running workflows with 100s of wells is not the typical case we'll see in the labs. We can tell users not to submit multiple such workflows if the infrastructure is actually limiting there. If we'd want to be super flexible here: Is FRACTAL_RUNNER_MAX_JOBS something we could set/overwrite per user or even per workflow (not that this is something users would normally want to mess with, just an optional overwrite)? In that case, a user could control how many jobs Fractal is submitting for them and if they don't follow the limitations, they'll get an error in Fractal that will be caught with #343 .

jluethi commented 1 year ago

If the result of using FRACTAL_RUNNER_MAX_JOBS above would be having jobs combined into fewer slurm jobs (not sure this is the idea here): Could FRACTAL_RUNNER_MAX_JOBS even be overwritten per task? The reason this may be interesting: Yokogawa to OME-Zarr conversion takes a while per well, so we do want to run some of it in parallel. Some of our other tasks run fairly fast though, e.g. MIP can be quite quick if there aren't too many Z slices, some measurements are very fast and we may eventually have some tasks that e.g. just combine some AnnData tables. That could run in a few seconds per well, thus the overhead of starting new slurm jobs starts to become a bit annoying. Given that we're anyway having some of those discussions at the moment, let's also keep such use cases in mind :)

tcompa commented 1 year ago

Let me clarify what I just sketched above, and then at some point let's switch to a in-person discussion.

In call_parallel_task we currently have a block like:

    map_iter = executor.map(partial_call_task, component_list, **extra_setup)
    # Wait for execution of all parallel (this explicitly calls .result()
    # on each parallel task)
    for _ in map_iter:
        pass  # noqa: 701

that we could replace (not tested!) with

    def split_list_into_chunks(mylist: Sequence, chunksize: int):
        num_chunks = len(mylist) // chunksize
        if len(mylist) % chunksize > 0:
            num_chunks += 1
        chunks = [mylist[ind_chunk*chunksize:(ind_chunk + 1) * chunksize]
                for ind_chunk in range(num_chunks)]
        return chunks

    component_list_batches = split_list_into_chunks(component_list, config.FRACTAL_RUNNER_MAX_JOBS)
    for component_list_batch in component_list_batches:
        map_iter = executor.map(partial_call_task, component_list_batch, **extra_setup)
        for _ in map_iter:
            pass  # noqa: 701

What would the effect of this be?

Out of the 384 jobs, only a batch with the first FRACTAL_RUNNER_MAX_JOBS wells is scheduled. When they are all complete, a new batch of FRACTAL_RUNNER_MAX_JOBS wells are processed, and so on.

This obviously leads to a suboptimal usage of SLURM, here is a (somewhat worst-case scenario) example:

Take-home message: replacing SLURM in its main job (scheduling jobs) is inefficient. Still, if the on-site SLURM config cannot be changed we don't have many ways out.

Would jobs be grouped and executed in the same slurm jobs sequentially? Or would Fractal wait with submitting more jobs and submit them once earlier jobs have finished?

The second one, indeed.

FRACTAL_RUNNER_MAX_JOBS: This would be a useful thing to have potentially for different architectures or even to limit larger tests in local runs. This would limit how many jobs are run at once, right?

This would limit how many jobs are run at once for a given task of a given workflow execution.

If we'd want to be super flexible here: Is FRACTAL_RUNNER_MAX_JOBS something we could set/overwrite per user or even per workflow (not that this is something users would normally want to mess with, just an optional overwrite)? In that case, a user could control how many jobs Fractal is submitting for them and if they don't follow the limitations, they'll get an error in Fractal that will be caught with https://github.com/fractal-analytics-platform/fractal-server/issues/343 .

I rather think of this as "the only way to overcome a restrictive site-specific SLURM config", more than "Fractal lets the user decide how to use SLURM". In my view, having the user (not even the server) handle the scheduling is not ideal. Do we then need to instruct the users about the on-site SLURM config, by telling them how to check from the terminal which are the SLURM-config limits? What will they do when they hit Job violates accounting/QOS policy (job submit limit, user's size and/or time limits)? That appears to me like a very likely source of confusion, at best, or inefficiency/errors at worst.


If the result of using FRACTAL_RUNNER_MAX_JOBS above would be having jobs combined into fewer slurm jobs (not sure this is the idea here): Could FRACTAL_RUNNER_MAX_JOBS even be overwritten per task? The reason this may be interesting: Yokogawa to OME-Zarr conversion takes a while per well, so we do want to run some of it in parallel. Some of our other tasks run fairly fast though, e.g. MIP can be quite quick if there aren't too many Z slices, some measurements are very fast and we may eventually have some tasks that e.g. just combine some AnnData tables. That could run in a few seconds per well, thus the overhead of starting new slurm jobs starts to become a bit annoying. Given that we're anyway having some of those discussions at the moment, let's also keep such use cases in mind :)

This one (combining tasks in the same SLRUM job) was not the intended use of FRACTAL_RUNNER_MAX_JOBS, sorry if it was not clear. Let's open a different issue for that discussion. Spoiler: I will have similar opinions also over there ;) (do we really want the user to assess which tasks are fast/slow and how to combine them in SLURM jobs? This requires a lot of effort on their side, or on our side to provide sensible defaults for free parameters. And what's the problem with some short jobs here and there? The overhead for SLURM submission does not depend on how long the job takes).

jluethi commented 1 year ago

Catching up on this issue: Yes, the most efficient way of scheduling this without adding complexity is certainly leaving this question to slurm. But current FMI setup and maybe even some future UZH setups (e.g. if we start using the main ScienceCloud cluster instead of the custom Pelkmans lab cluster) enforce QOS policies that won't allow us to do this. Many projects are small enough to never run into this though, so this is handling a scaling issue, not a general submission issue.

The proposed FRACTAL_RUNNER_MAX_JOBS is not the most efficient way of handling orchestration, but it avoids workflows failing in a manner users can't work around at least for simple cases (i.e. not running multiple workflows at the same time).

My proposal: Let's implement this "simple" version so that workflows run through. And then let's see what the pain points are. As discussed here, any additional things we add also increase the complexity (both to maintain and to communicate to users).


If the suboptimal scheduling becomes the major pain point, let's revisit this implementation and figure out something more complex.

If multiple parallel submissions become the problem, then let's address this then (I don't currently foresee this for our current users & use-cases, but who knows).

If having many of short-ish jobs (i.e. 100s of jobs that take less than a minute each) mixed in becomes a major pain point, let's discuss ideas around combining jobs

tcompa commented 1 year ago

The new FRACTAL_RUNNER_MAX_TASKS_PER_WORKFLOW in #469 should do (up to more careful testing) what was discussed above:

For tasks executed through a ThreadPoolExecutor, this is already tested - see test_call_parallel_task_max_tasks in https://github.com/fractal-analytics-platform/fractal-server/blob/377-handling-assocmaxsubmitjoblimit/tests/test_backend_local.py. I don't foresee an automated test with the SLURM backend, but let's think about it a bit further.


On a higher level:

We know that there are somewhat compelling reasons for the current change:

  1. It enables some workflows to run at FMI which would otherwise always fail, and these workflows are common among users.
  2. It enables processing many wells (for instance) on a single machine, without hitting memory issues very quickly.

Nevertheless, I would be extremely cautious about any further step in the "let-us-reimplement-slurm" direction - at least because they introduce

Briefly, I think that we'll need very strong reasons spelled out (like: this crucial use case for Fractal is otherwise 100% impossible) before we even think about the other options mentioned by @jluethi above (1. optimizing scheduling, 2. being aware of the total number of running tasks, 3. combining jobs together).

jluethi commented 1 year ago

Great, thanks @tcompa ! And I'm fine with setting the barrier high for additional changes in that direction regarding optimizations & job combinations on the Fractal server level :)

tcompa commented 1 year ago

The FRACTAL_RUNNER_MAX_TASKS_PER_WORKFLOW variable is now available with 1.0.2. Relevant docs are here. Let's close this issue as soon as we have a first real-life test.

(also: feedback on the docs is always welcome.. if something is not clear, let's improve it)

jluethi commented 1 year ago

Ok, tested this now at FMI. Unfortunately, while the concrete issue here is solved, the broader problem was not.

This workaround gets us around the accounting/QOS policy on max number of jobs being submitted. Fractal successfully runs the jobs in batches. But after a while, we run into the following new QOS policy error:

ERROR:slurm_runner:sbatch: error: Reached jobs per hour limit
sbatch: error: Batch job submission failed: Job violates accounting/QOS policy (job submit limit, user's size and/or time limits)

INFO:WF1_job1:FAILED workflow "Workflow 211123_HCA_1-2_Take2_D8-20230130-2", unknown error.

So apparently, FMI limits not only jobs running at once, but also the number of jobs that can be run per hour.

jluethi commented 1 year ago

The core work on handling AssocMaxSubmitJobLimit is done here. Discussion on broader strategy to deal with the new limitations of jobs per hour and batching jobs together will continue here: https://github.com/fractal-analytics-platform/fractal-server/issues/493