aiidateam / aiida-core

The official repository for the AiiDA code
https://aiida-core.readthedocs.io
Other
437 stars 192 forks source link

timeouterror when submitting many processes #4841

Open ltalirz opened 3 years ago

ltalirz commented 3 years ago

Describe the bug

My plan is to submit ~12k processes, always 3 at a time; with 5s delay in order no to stress the daemon.

However, after a while I'm running into the following timeout error:

  File "2a_run_zeopp_aiida.py", line 192, in <module>
    future = engine.submit(NetworkCalculation, **builder)
  File "/home/ubuntu/aiida-core/aiida/engine/launch.py", line 116, in submit
    runner.controller.continue_process(process_inited.pid, nowait=False, no_reply=True)
  File "/home/ubuntu/miniconda/Miniconda3-py39_4.9.2-Linux-x86_64/envs/aiida-zeopp/lib/python3.8/site-packages/plumpy/process_comms.py", line 412, in continue_process
    return self.task_send(message, no_reply=no_reply)
  File "/home/ubuntu/miniconda/Miniconda3-py39_4.9.2-Linux-x86_64/envs/aiida-zeopp/lib/python3.8/site-packages/plumpy/process_comms.py", line 485, in task_send
    return self._communicator.task_send(message, no_reply=no_reply)
  File "/home/ubuntu/miniconda/Miniconda3-py39_4.9.2-Linux-x86_64/envs/aiida-zeopp/lib/python3.8/site-packages/kiwipy/rmq/threadcomms.py", line 212, in task_send
    return self._loop_scheduler.await_(self._communicator.task_send(task, no_reply))
  File "/home/ubuntu/miniconda/Miniconda3-py39_4.9.2-Linux-x86_64/envs/aiida-zeopp/lib/python3.8/site-packages/pytray/aiothreads.py", line 155, in await_
    return self.await_submit(awaitable).result(timeout=self.task_timeout)
  File "/home/ubuntu/miniconda/Miniconda3-py39_4.9.2-Linux-x86_64/envs/aiida-zeopp/lib/python3.8/concurrent/futures/_base.py", line 441, in result
    raise TimeoutError()
concurrent.futures._base.TimeoutError

I suppose this tells me that the worker is "too busy to accept my new calculation" (?)

This is a bit nasty since it will exit my submission loop. Is there a way to mitigate this?

@chrisjsewell

Expected behavior

AiiDA should never be too busy to accept new calculations.

Your environment

chrisjsewell commented 3 years ago

After approximately how many processes have already been submitted

I suppose this tells me that the worker is "too busy to accept my new calculation" (?)

This is not a timeout on the worker, its a timeout on the communication with RabbitMQ (either it or your main python instance are too busy)

You can increase the timeout: https://github.com/aiidateam/aiida-core/blob/d35a9d7905d8d94c35b6e6eebaae2e5440fec975/aiida/manage/configuration/schema/config-v5.schema.json#L179

but you might also want to look if there is anything that can be improved on the rabbitmq end and what the limitations are there: Here it says "one queue can handle up to about 50 thousand messages": https://www.cloudamqp.com/blog/part1-rabbitmq-best-practice.html#number-of-queues, but I don't know off-hand how well it handles receiving this order of magnitude of messages in a short space of time?

ltalirz commented 3 years ago

Thanks for the quick reply!

After approximately how many processes have already been submitted

A couple of hundred (less than thousand). I'm using 2 daemon workers with 1000 slots each.

This is what I see when I open the rabbitmq management interface (I'm a bit confused by the "zero messages" - perhaps I'm not looking at the right panel?) image

sphuber commented 3 years ago

My plan is to submit ~12k processes, always 3 at a time; with 5s delay in order no to stress the daemon.

I am not sure this approach really destresses the daemon, in any case once they have been submitted the daemon will accept the tasks (up to its maximum) and keep them in memory while iterating over them in the event loop. If your processes are relatively long-lived, which is probably the case for workflows that include jobs on remote clusters, at some point your daemon will be running a 1000 processes regardless.

I have never seen this problem from any of my submission scripts, but then again I have never run on 1.6 and I am pretty sure that we looked at this particular exception after the migration to asyncio. However, this exception showed up in the daemon workers and not a submission script. Could you maybe share the submission script? I doubt there would be anything obviously weird in there, but just for the full picture.

Regarding the RabbitMQ admin panel. In principle, if you have active processes, you should see them in this overview panel. It will give a summary of all profiles, if you run multiple profiles for the same RabbitMQ instance. If the daemons are stopped, you should see messages for "Ready", and if the daemons are running and have accepted tasks, they should show as "Unacked". The fact that there are 0 messages seems to suggest you have no active processes at all. If you just submitted a bunch of stuff and that is actually running, this would indeed be very weird. Are you connecting to the correct RabbitMQ instance? Do you maybe have multiple running, or exposed a remote one on the local standard port and are looking at that one?

ltalirz commented 3 years ago

I am not sure this approach really destresses the daemon, in any case once they have been submitted the daemon will accept the tasks (up to its maximum) and keep them in memory while iterating over them in the event loop. If your processes are relatively long-lived, which is probably the case for workflows that include jobs on remote clusters, at some point your daemon will be running a 1000 processes regardless.

Right... we've had a report of someone encountering problems when submitting a large number of processes in one go, so I was going to spread it out a bit. But of course even better if that is not necessary.

Could you maybe share the submission script? I doubt there would be anything obviously weird in there, but just for the full picture.

The whole submission script is a bit long, but the essence is

builder = NetworkCalculation.get_builder()
builder.metadata.options = {
    "max_wallclock_seconds": 10 * 60,  # no longer than 10 minutes
}
...
for structure in structures:
  builder.structure = cif
  builder.parameters = DataFactory("zeopp.parameters")(
      dict=zeopp_parameters[probe]
  )

  for probe in probes: # 3 probes
  # run cheap calculation locally; more expensive calculation on fidis
    if probe == "geometric":
      builder.code = zeopp_code_local
    else:
      builder.code = zeopp_code_fidis

    future = engine.submit(NetworkCalculation, **builder)  # this is the line of the timeout error
    print(f"Submitted {probe} calc for {identifier}: {future}")

  time.sleep(5)

The fact that there are 0 messages seems to suggest you have no active processes at all. If you just submitted a bunch of stuff and that is actually running, this would indeed be very weird. Are you connecting to the correct RabbitMQ instance?

I think I am. Perhaps after a while, the system gets "stuck" somehow?

Anyhow, I'm not entirely sure whether there might have been an issue with the collected statistics. I now stopped the submission for a while and then restarted it again (around the leftmost point of the time series). The messages now show up, but one can see that there seem to be longer and longer periods of no messages. I'll report how it goes.

image

ltalirz commented 3 years ago

Intermediate

image

slot limits not reached (the calculations are short-running - max. 10 minutes) but high CPU load

$ verdi daemon status
Profile: csd_import
Daemon is running as PID 23850 since 2021-03-24 14:33:16
Active workers [2]:
  PID    MEM %    CPU %  started
-----  -------  -------  -------------------
28605    0.476    120.1  2021-03-30 16:31:41
28606    0.478    115.3  2021-03-30 16:31:41
Use verdi daemon [incr | decr] [num] to increase / decrease the amount of workers

P.S. The processes are all CalcJobs i.e. the calculations aren't running inside the worker processes and this should be mostly communication/provenance overhead (+ a little bit of output parsing; small text files).

ltalirz commented 3 years ago

I encountered the issue now again

Traceback (most recent call last):
  File "2a_run_zeopp_aiida.py", line 192, in <module>
    future = engine.submit(NetworkCalculation, **builder)
  File "/home/ubuntu/aiida-core/aiida/engine/launch.py", line 116, in submit
    runner.controller.continue_process(process_inited.pid, nowait=False, no_reply=True)
  File "/home/ubuntu/miniconda/Miniconda3-py39_4.9.2-Linux-x86_64/envs/aiida-zeopp/lib/python3.8/site-packages/plumpy/process_comms.py",
line 412, in continue_process
    return self.task_send(message, no_reply=no_reply)
  File "/home/ubuntu/miniconda/Miniconda3-py39_4.9.2-Linux-x86_64/envs/aiida-zeopp/lib/python3.8/site-packages/plumpy/process_comms.py",
line 485, in task_send
    return self._communicator.task_send(message, no_reply=no_reply)
  File "/home/ubuntu/miniconda/Miniconda3-py39_4.9.2-Linux-x86_64/envs/aiida-zeopp/lib/python3.8/site-packages/kiwipy/rmq/threadcomms.py", line 212, in task_send
    return self._loop_scheduler.await_(self._communicator.task_send(task, no_reply))
  File "/home/ubuntu/miniconda/Miniconda3-py39_4.9.2-Linux-x86_64/envs/aiida-zeopp/lib/python3.8/site-packages/pytray/aiothreads.py", line 155, in await_
    return self.await_submit(awaitable).result(timeout=self.task_timeout)
  File "/home/ubuntu/miniconda/Miniconda3-py39_4.9.2-Linux-x86_64/envs/aiida-zeopp/lib/python3.8/concurrent/futures/_base.py", line 441,
in result
    raise TimeoutError()
concurrent.futures._base.TimeoutError

As one can see, there seems to have been some oscillation in the number of messages before this issue (note the different time scale).

image

After submission stopped, naturally, messages went down.

P.S. If you know how I can plot the data with more resolution for a longer time window, let me know.

ltalirz commented 3 years ago

Perhaps 1000 slots is indeed already too much for a regular daemon worker?

There is another possibility, however: since I'm running some of the calculations on the same machine where AiiDA is running (through slurm with max. concurrent jobs = 8 = number of cores), it is possible that, occasionally, the machine gets slow to respond as all cores are fully used [1]. I've now reduced the number of CPUs available to slurm to 4, which should always leave enough spare for AiiDA+rmq+postgres. Let's see whether the problem still reoccurs.

[1] Most of the locally run jobs are very short, but occasionally it's possible that some longer-running outliers accumulate.

ltalirz commented 3 years ago

I could not reproduce the issue in the remaining jobs to run - let's assume it was due to the machine being clogged up by slurm + aiida. I'll need to submit substantially more jobs soon, so I'll reopen it in case I encounter the issue again.

Following @sphuber 's comment, I will also remove the artificial "sleep" - let's see what happens :-)

sphuber commented 3 years ago

Right... we've had a report of someone encountering problems when submitting a large number of processes in one go, so I was going to spread it out a bit. But of course even better if that is not necessary

I think the problem there was that the daemon workers could not cope. I wouldn't expect issues with the submission script, but maybe on a less powerful machine that is fully occupied with running daemon workers and actual calcjobs and other heavy code, might become unresponsive and so fail.

Perhaps 1000 slots is indeed already too much for a regular daemon worker?

Could be, but this also depends a lot on the type of machine that is running the daemon and the type of processes that are being run.

slot limits not reached (the calculations are short-running - max. 10 minutes) but high CPU load P.S. The processes are all CalcJobs i.e. the calculations aren't running inside the worker processes and this should be mostly communication/provenance overhead (+ a little bit of output parsing; small text files).

If you are running a lot of processes, even if the majority of actual computational load is off-loaded since it is a CalcJob being run on a remote, there is still the processing power required for retrieving all files and all provenance/parsing, as you say. I wouldn't necessarily be surprised if this requires a lot of CPU while that is running. I take it it dies down when all calcjobs are done or they are with the scheduler.

ltalirz commented 3 years ago

Just wanted to say that I encountered the TimeoutError again - now with 4 cores dedicated to 3 daemon workers (+postgres & rabbitmq).

I'll now reduce the number of slots per worker from 1000 to 500 and increase the rmq.task_timeout to 30s - unfortunately, this means I'm running quite a bit more slowly than I could (e.g. on Fidis one can submit ~10k jobs simultaneously).

Is there some logging that I should enable on the rabbitmq / AiiDA end to understand what's going on here? Looking at the rabbitmq dashboard in the comment above, it seems like rabbitmq is doing fine (?). Is it possible that the timeout is erroneously attributed to rabbitmq and actually a sign of congestion in the worker itself?

Also, do you have any idea what's behind this pattern of ever increasing "bunching" of communications evident from https://github.com/aiidateam/aiida-core/issues/4841#issuecomment-810923586 and https://github.com/aiidateam/aiida-core/issues/4841#issuecomment-810960033 ? It looks to me like either the worker is getting overloaded - or it is, for some reason, waiting longer and longer to communicate with rabbitmq.

My guess is that this pattern is probably quite straightforward to reproduce if you just start submitting lots of calcfunctions/calcjobs and as shown in https://github.com/aiidateam/aiida-core/issues/4841#issuecomment-810923586 it's already evident in the first 10 minutes.

chrisjsewell commented 3 years ago

a sign of congestion in the worker itself?

When you say worker here, the submissions are all being processed on the "master" python process submitting processes to rabbitmq, and so should in principle would be independent of anything happening on daemon worker python processes (except for the fact that the worker processes being busy may slow down your computer as a whole, or if they are flooding rabbitmq with messages this may slow its response to the master process).

In fact, do you still see this timeout if you submit without the daemon running?

ltalirz commented 3 years ago

When you say worker here, the submissions are all being processed on the "master" python process submitting processes to rabbitmq

Ah sorry - you're right. Forgot that this is being raised during calculation submission.

In fact, do you still see this timeout if you submit without the daemon running?

Will try - let's see what happens.

ltalirz commented 3 years ago

When shutting down the daemon completely, I wasn't able to reproduce the issue so far.

I then started a single daemon worker (500 slots, rmq.task_timeout 60s, ssh computer cooldown time 30s).

Here are the statistics of a snapshot from the output of verdi process list -S waiting: 500 processes in state Waiting, of which

The PK of the last waiting process only changes quite infrequently (I guess this coincides with the "surges" in rabbitmq communication) - e.g. the PK remained unchanged for ~6 minutes, and when I checked again after another 6 minutes, it had picked up ~105 new calculations.

If this pattern deteriorates over time, I could imagine that this could lead to congestion at the rabbitmq side when, all of a sudden, it gets a burst of requests.

ltalirz commented 3 years ago

P.S. Out of curiosity, I recorded the time spent on calculation submission in the future = engine.submit(NetworkCalculation, **builder) line.

It's centered around 0.3s, with 99.96% of times within the window of 0.2s-1.3s - plus there are 0.04% of outliers where this line takes 10-32s

ltalirz commented 3 years ago

With the 60s timeout and one daemon worker, the timeouterror did not reoccur. I'll increase it to two daemon workers now.

Anyhow, it seems to me that the underlying problem that we should tackle here is this "bunching" of actions by the daemon workers. Is it possible that this has to do with the reuse of the SSH connection? What happens if 500 processes submit requests to do stuff over SSH (upload/retrieve/scheduler check) - does this have the effect of delaying any communication with rabbitmq until all tasks are processed and the connection is closed?

sphuber commented 3 years ago

When you say worker here, the submissions are all being processed on the "master" python process submitting processes to rabbitmq, and so should in principle would be independent of anything happening on daemon worker python processes (except for the fact that the worker processes being busy may slow down your computer as a whole, or if they are flooding rabbitmq with messages this may slow its response to the master process).

I actually just remember that when writing the AiiDA 1.0 paper I did some benchmarking and this included a test where I submitted a bunch of workchains for a different number of workers. I observed that the submission rate could become slower if I changed the number of workers. This surprised me, because I think they should be completely independent, assuming that we are not even close to saturating the RabbitMQ instance. But now I am starting to think that actually "somehow" they are not so independent and if the daemon workers become more active, i.e. communicating more with RabbitMQ, then this may affect the communication effectiveness of an isolated Python interpreter that is submitting processes and is sending simple tasks to the queue. So at this point I would definitely not rule this out and would actually look in this direction. I don't have any concrete ideas on what to test or look at though for the moment.

sphuber commented 3 years ago

What happens if 500 processes submit requests to do stuff over SSH (upload/retrieve/scheduler check) - does this have the effect of delaying any communication with rabbitmq until all tasks are processed and the connection is closed?

In principle, this shouldn't have an impact since the daemon worker is doing all the SSH stuff on a different thread from the thread that is used for communication. However, this is the theory and maybe under heavy load, the separate thread still cannot get working in time.

ltalirz commented 3 years ago

Below some test results and screenshots.zip from @giovannipizzi

I started submitting 400 WorkChains (submitting sub-workchains and CalcJobs, for a total of ~1200 processes). There is no additional sleep in the submission code.

I took some screenshots: the first ones are all close to each other, a few seconds after each other, showing the submission part. I also took one now, where only 60 processes are left.

To me, it all seems OK (this is Ubuntu, py 3.7.9, 8 workers), I don't think I see bursts. In your case indeed maybe the computer was overloaded?

ltalirz commented 3 years ago

Thanks for checking!

I've looked at the screenshots and indeed there seem to be significant differences compared to my case -  in particular the high number of queued messages (perhaps that comes partly from using work chains rather than calcjobs).

Concerning the "burstiness", however, it's a bit more difficult to draw conclusions since your screenshots show very short time windows (you can change the time window in the dashboard).

For example, at the intermediate stage of https://github.com/aiidateam/aiida-core/issues/4841#issuecomment-810960033 I saw several minutes of inactivity, which then increased further to tens of minutes as I kept submitting.  

In your case indeed maybe the computer was overloaded?

I was running on an openstack VM with 8 CPUs, so I think that is very unlikely (and I never saw it in "top").

However, I did notice that once you have some 10k calculations in the db, certain queries become slower and slower. E.g. now, with 300k calculations in the bank, a verdi process list (that returns no results because no processes are running) takes > 1minute

(aiida-zeopp) ubuntu@qmobile:~$ time verdi process list
PK    Created    Process label    Process State    Process status
----  ---------  ---------------  ---------------  ----------------

Total results: 0

Info: last time an entry changed state: 4D ago (at 19:40:36 on 2021-04-26)

real    1m13.928s
user    0m4.572s
sys 0m1.255s
In [3]: QueryBuilder().append(Node).count()
Out[3]: 1841583
sphuber commented 3 years ago

Just a note on the queries: verdi process list needs to filter on the process_state which is an attribute and so in the attributes JSONB colum. Since there is no index on this, this query gets significantly slower once the database grows. We have looked if GIN indexes on the JSONB could improve things, but it is not clear. We should think if at some point we need to promote this attribute to a separate column to improve query speed since this is queried for so often.

giovannipizzi commented 3 years ago

There's quite some discussion and debugging on GIN indexes on #4664 done by @CasperWA and me. The main lessons we learnt are:

From #4664, the syntax to create a GIN index only on a specific field is this:

CREATE INDEX idx_gin_optimade_elements ON db_dbnode USING GIN ((extras -> 'optimade' -> 'elements') jsonb_path_ops);

and, what's even more interesting, you can create a partial index with a WHERE clause (e.g. to only add the index to processes, for instance, to reduce even more the size - note that the index will be used, however, only if the corresponding queries actually have the very same filter - so, for testing, maybe it's better to start with an index without WHERE clause).

The actual command to run should be:

CREATE INDEX idx_process_state ON db_dbnode USING GIN ((attributes -> 'process_state') jsonb_ops);

(one should check which operators are actually used, and if we can replace jsonb_ops with the faster but more limited json_path_ops, see bottom of this docs page.)

(Note: when timing, note that there a few last operations that are also slow: checking if there are too many slots used and issuing a warning, and checking when the status was last changed - in timing, you should remove that part; I don't remember if the --raw option does that, or you need to change the source code for timing).

However, we'll need also to revise the query generated by the QueryBuilder. In a simplified form a query could look like this:

select count(id) FROM db_dbnode where 
    CASE WHEN (jsonb_typeof(attributes #> '{process_state}') = 'string') THEN (attributes #>> '{process_state}') = 'finished' ELSE false END;

but (at least on my mid-size DB, with 41k calculations) this does not use the new index (checked with EXPLAIN ANALYZE in front of the query, it does a Seq Scan). I think it's because of the CASE.

@ltalirz you could try to:

  1. run the command to create the index (it should be fast, a matter of a few seconds)
  2. Try if the verdi process list becomes faster
  3. If not, check if you can write a query in SQL that becomes fast and report it here If we don't manage, the other option is what @sphuber mentioned (moving that specific field to a DB column, with an index - that will become very fast). It will require a bit of code refactoring, though - but might be worth anyway, if it turns out that to use the GIN index we need to refactor some other part of the code anyway (the QueryBuilder).
giovannipizzi commented 3 years ago

As a further comment, this query:

explain analyze select count(id) FROM db_dbnode where  attributes -> 'process_state' @> '"finished"';

uses the new index, while already this:

explain analyze select count(id) FROM db_dbnode where  attributes @> '{"process_state": "finished"}';

does not - PSQL seems quite picky on using the index only if you are really writing explicitly queries on attributes -> 'process_state', and does not use it for queries on attributes directly, even if in practice the query is the same. @ltalirz you could at least check if the "manual" query (this is a simplified one, not really the one done by verdi process list becomes acceptably fast or not, after adding the GIN index.

Also, this would use the index as well:

explain analyze select count(id) FROM db_dbnode where 
    jsonb_typeof(attributes #> '{process_state}') = 'string' AND attributes -> 'process_state' @> '"finished"';
sphuber commented 3 years ago

@giovannipizzi if ok with you, I would move your last two comments to the dedicated issue for the process_state querying efficiency in #2770 since I think this issue is about something else. Its a very useful summary you gave and would be a shame if it got lost in this thread.

giovannipizzi commented 3 years ago

Sure, please go ahead (or tell me if I should do it)