aiidateam / aiida-core

The official repository for the AiiDA code
https://aiida-core.readthedocs.io
Other
431 stars 187 forks source link

Direct scheduler with one job at a time #3366

Open bonfus opened 4 years ago

bonfus commented 4 years ago

This issue stems from this discussion in the mailing list: https://groups.google.com/forum/#!topic/aiidausers/LHSRzcFfQvw

The idea is to provide an alternative way to exploit the direct scheduler on a workstation. The current implementation starts all calcjobs as soon as they are submitted. Obviously this quickly becomes a problem.

A quick and dirty solution is provided below:

The direct scheduler should be modified as follow:

https://github.com/aiidateam/aiida-core/blob/1fdffdcca3d461c7ae1df949c39568807c5814f8/aiida/schedulers/plugins/direct.py#L58

should instead report a QUEUED state, i.e.

'T': JobState.QUEUED,

and the jobs should be started in stopped state and the process IDs collected in order to later be started by an external scheduler. This can be done like this:

submit_command = '( kill -SIGSTOP $BASHPID; exec bash -e {} ) > /dev/null 2>&1 & echo $! | tee -a /tmp/aiida_procs'.format(submit_script) 

Replacing this line: https://github.com/aiidateam/aiida-core/blob/1fdffdcca3d461c7ae1df949c39568807c5814f8/aiida/schedulers/plugins/direct.py#L209

A very simple external scheduler could look like this:

#!/bin/bash

# Clean log files
echo "" > /tmp/dummy_sched_log

# Used to store which process to start, if there's one
first_pid_queued=""

while IFS= read -r line; do
    p_status=$(ps -p $line -o stat=)
    if [ -z "$p_status" ];
    then
        # Report about process not found so the user knows its time to cleanup
        echo "Process $line not found" >> /tmp/dummy_sched_log
    else
        if [[ $p_status =~ ^S ]];
        then
            ps -p $line -o cmd= | grep aiida > /dev/null
            if [ $? == 0 ];
            then
                echo "Process $line is running. Status is $p_status" >> /tmp/dummy_sched_log
                break
            fi
        else
            if [ "$p_status" == "T" ] ;
            then
                echo "Process $line will be started" >> /tmp/dummy_sched_log
                first_pid_queued=$line
                break
            fi
        fi

    fi
done < "$1"

if [ -z "$first_pid_queued" ];
then
    echo "A job is running or nothing to do. Maybe next time..."
else
    kill -CONT $first_pid_queued
    echo "Started $first_pid_queued"
fi

Notice that the file dummy_sched_log will have to be cleaned manually. PIDs are reused so the command associated with the various processes must be checked as well.

ltalirz commented 4 years ago

The basic functionality of not submitting more than X jobs on computer Y is something that makes sense to include in AiiDA in general. I suggest that this metadata should be stored at the computer level - and I would say in the configuration, not in the setup so that it can be modified after the computer is stored. It could be called max_jobs, with default value -1 (no limit).

@sphuber @giovannipizzi Does this sound reasonable?

P.S. This would be about implementing the basic functionality of limiting the number of jobs that are submitted to a machine, not necessarily with the view of moving towards a full-blown scheduler inside AiiDA itself as suggested in https://github.com/aiidateam/aiida-core/issues/2026 .

giovannipizzi commented 4 years ago
ltalirz commented 4 years ago

Regarding the last point by @giovannipizzi : is this something that rabbitmq could potentially take care of @sphuber?

sphuber commented 4 years ago

Most likely not. Definitely not in the current setup, because all processes are sent to the same queue, including work chains and it is important to have RabbitMQ differentiate between them. We could decide to have different queue for calculation jobs and everything else. That being said, I am not even sure (pretty sure you can't) that you can "limit" the number of messages being doled out by RabbitMQ. On a queue subscriber you can limit how many concurrent messages you consume, which is what we already do. We limit it to 100 per worker, which gives rise to the infamous gridlock. Again, even if all of this would be possible, we would not be able to control this on a per-computer or even per-user basis. TL;DR: RabbitMQ is not the tool to try and solve this problem.

Our best shot is to do this within aiida-core, which is fine, we just cannot give hard guarantees. However, as long as the discrepancies are "reasonable", I don't think users will care if we make this clear. They can always set it to a lower value than the actually desired value, so there is some margin.

sphuber commented 4 years ago

Another comment with respect to potential problems. Leaving the respecting of the limit aside and the overhead that this adds of daemon workers having to query for currently active calcjobs for the given authinfo, there are more challenges we will face.

Where do we "stop" the submission of a CalcJob:

First option would be fine'ish if submitting the job from a top level script. It would just except and you could retry later. However, if this happens when a work chain tries to submit it, the job excepting straight away is highly undesirable.

That brings us to the second option, but then it is not clear what to do when the daemon gets the task from RabbitMQ. It can reject the task and send it back, but RabbitMQ will resend it straight away, ending in the tasks ping-ponging until the daemon can actually continue it after a slot on the computer has freed up. There is no way to control RabbitMQ to hold off with resending the task. The alternative would be to keep the task in the daemon worker and simply reschedule it within its internal event loop. The challenge there would be to choose the sleep value. Too short and we get internal ping-ponging, too long and we might risk having slots available but not being used, because the jobs are internally "sleeping". Would have to see if it would be possible to have them sleep and respond to an event of another calcjob on the computer finishing, but this is clearly non-trivial.

Final option is that start running the process and already perform the upload task, but then before going to the next submit task, pause the process. This has the advantage that we can pass a reason for the pause, which will be shown in verdi process list so the user has an explanation for why the job is not being submitted. We would another internal system that periodically unpauses paused processes for this reason, which is again not trivial.

dev-zero commented 3 years ago

@sphuber your last comment somehow reminded me of the problems we faced when trying to implement the pause for computers in #3648. Did you maybe elaborate somewhere already why having separate queues for different things is not that easily possible? And is this also related to a submission rate-limiting as discussed in #4634?

sphuber commented 3 years ago

The problem is not so much to have different queues, you can create many queues in RabbitMQ just fine. The problem is that those queues are designed to be as dumb as possible. That is to say, as soon as a consumer subscribes, it will send it the tasks it has. Only the consumer can do "rate" limiting by saying how many tasks it can deal with concurrently. So technically, yes you could create a dedicated queue for each computer. You would have to also have a dedicated worker that than sets its maximum slots to the maximum number of concurrent jobs for that computer. Still there are other complications with this model. 1) Not all processes are CalcJobs, so you would also need a queue (and workers) for all other processes, such as WorkChains. 2) When you start the daemon, you should launch 1 worker for each computer that currently has associated active calcjobs in the database. However, what do you do when you submit a new calcjob to a computer that doesn't have a worker yet? Now you have to start adding logic to submit to launch a daemon worker if need be. This seems to be a bad intermingling of responsibilities to me.

dev-zero commented 3 years ago

for 1) I don't think we'd need queues for each message type, only for things directly related to running something on a computer (well, transfer and submission, the first one you want to serialize the second one you want to limit, all others can go directly - assuming a multiplexed transport). I definitely have to check the current design, but the messages also do not have to be tied 1:1 to processes, you should be able to simply use messages as a means for designing the system (only important thing is then persistence or recovery from queue loss) for 2) this sounds like nice a distributed system. In an ideal world you would make the remote scheduler directly listen on your queues, but until that happens we need proxies which do that for us. That you need a supervisor launching workers as needed is also normal. The only thing the supervisor has to know is that there should be a worker running for a submission queue (unless disabled), but it doesn't have to worry about what that worker exactly does and how it does it (separation of concerns). From what I've gathered the broker should provide basic facilities (qos, nack) which help the implementation of rate/queue-length-limitations, so it is not entirely on the worker to do it. But I'd have to gather more information about it.