chrisjsewell / aiida-process-coordinator

A prototype for a non-RabbitMQ `Process` execution coordinator.
MIT License
1 stars 0 forks source link


A prototype for a non-RabbitMQ Process execution coordinator.

The solution builds on aiida-core's existing database and daemon management infrastructure, with no additional dependencies:


What are the requirements for the process coordinator?

These are the major requirements for the coordinator:

Why replace RabbitMQ?

In short:

Since rabbitmq-server#2990, RabbitMQ >= 3.7 will timeout a task if it is not finished within 30 minutes; much shorter than most use-cases in AiiDA. As discussed in rabbitmq-server#3345, this timeout is not controllable on the client side (i.e. by AiiDA), and will mean all users will have to manually add configuration on the server side to turn off this timeout. This will make AiiDA even less user friendly to get started with and, even if documented, will likely result in a lot of confusion and issues raised.

In detail:

Below is the current schematic of submitting a Process in AiiDA:

  1. The user starts the AiiDA daemon which spawns one or more workers, that connect as clients (a.k.a consumers) to the RabbitMQ server
  2. The user submits a Process
  3. AiiDA stores the initial Process in the database
  4. AiiDA sends a task to the RabbitMQ server task queue
  5. The RabbitMQ server sends out these task messages to each worker in turn, for one to consume, waiting for an ack(nowledgment)/nack/reject response (see also Consumer Acknowledgements)
  6. One worker will execute the task and then (ack)nowledge message receipt to the RabbitMQ server (and the task is removed from the queue)
  7. If the executing worker loses connection with the RabbitMQ server, the task will be requeued and sent to another worker

This current design has a number of shortcomings:

Proposed solution

As shown below, the proposed solution essentially replaces the RabbitMQ server with a (singleton) Process execution coordinator, spawned by the AiiDA daemon. This coordinator acts on persisted data in the database, to coordinate execution of Processs on workers.

The coordinator and workers form a standard server/client private network, with the coordinator acting as the server and the workers acting as the clients. The built-in Python asyncio.streams primitives are used to implement the connections, and messaging follows a simple JSON protocol.

The proposal by example

This package implements a mock AiiDA database with sqlite (created on demand), mock processes which wait for 60 seconds, and exposes a CLI mimicking aspects of verdi.

NOTE: sqlite does not handle read/write concurrency as well as PostgreSQL, so it is possible to encounter some database locking failures in this prototype (most are accounted for).

It is recommended to run the CLI via tox, to auto-create an isolated Python environment (note you must add a -- before CLI arguments).

To see all the commands:

$ tox -- --help
Usage: aiida-task [OPTIONS] COMMAND [ARGS]...

  --version            Show the version and exit.
  -d, --database TEXT  Path to database (or set env AIIDADB)  [default:

  --help               Show this message and exit.

  daemon   Manage the daemon
  process  Manage processes

First (before starting the daemon) we submit a number of process to the database:

$ tox process submit 5
Node pk 1 submitted
Node pk 2 submitted
Node pk 3 submitted
Node pk 4 submitted
Node pk 5 submitted

This will generate an aiida.sqlite3 file in the current directory.

We can list the process nodes (from db_dbnode) in the standard manner:

$ tox process list
  PK  Modified                    Status
----  --------------------------  --------
   5  2021-09-06 12:39:34.407908  created
   4  2021-09-06 12:39:34.402800  created
   3  2021-09-06 12:39:34.397937  created
   2  2021-09-06 12:39:34.393634  created
   1  2021-09-06 12:39:34.386709  created

But now there is also a db_process table we can inspect with the scheduled processes to execute:

$ tox process scheduled
  PK  Modified                    Action    Worker PID
----  --------------------------  --------  ------------
   5  2021-09-06 12:39:34.408866
   4  2021-09-06 12:39:34.403676
   3  2021-09-06 12:39:34.398722
   2  2021-09-06 12:39:34.394237
   1  2021-09-06 12:39:34.387872

We can also add "actions" to this, like kill:

$ tox process kill 3
Scheduled node pk 3 to be killed
$ tox process scheduled
  PK  Modified                    Action    Worker PID
----  --------------------------  --------  ------------
   5  2021-09-06 12:39:34.408866
   4  2021-09-06 12:39:34.403676
   3  2021-09-06 12:39:34.398722  kill
   2  2021-09-06 12:39:34.394237
   1  2021-09-06 12:39:34.387872

If we start the daemon, with two workers, the coordinator will distribute the processes evenly to the workers:

$ tox daemon start 2
$ tox daemon status -q
- '12022'
- '12023'
- '12024'
- '12025'
$ tox process scheduled
  PK  Modified                    Action      Worker PID
----  --------------------------  --------  ------------
   5  2021-09-06 12:46:52.976776                   12024
   4  2021-09-06 12:46:52.971709                   12023
   2  2021-09-06 12:46:52.966902                   12024
   1  2021-09-06 12:46:52.961325                   12023
$ tox process list
  PK  Modified                    Status
----  --------------------------  --------
   3  2021-09-06 12:46:52.993646  killed
   5  2021-09-06 12:46:52.990090  running
   4  2021-09-06 12:46:52.986570  running
   2  2021-09-06 12:46:52.985670  running
   1  2021-09-06 12:46:52.982983  running

If we then decrease the number of workers, the coordinator will re-distribute the processes to remaining workers:

$ tox daemon decr
$ tox process scheduled
  PK  Modified                    Action      Worker PID
----  --------------------------  --------  ------------
   4  2021-09-06 12:47:36.919519                   12024
   1  2021-09-06 12:47:36.913779                   12024
   5  2021-09-06 12:46:52.976776                   12024
   2  2021-09-06 12:46:52.966902                   12024

If we stop the daemon part way through, and re-start with a single worker, the coordinator will re-assign the processes to that worker:

Once completed, the db_dbprocess table will be empty, and the db_dbnode table will contain all terminated processes:

$ tox process scheduled
PK    Modified    Action    Worker PID
----  ----------  --------  ------------
$ tox process list
  PK  Modified                    Status
----  --------------------------  --------
   4  2021-09-06 12:48:36.917000  finished
   1  2021-09-06 12:48:36.912941  finished
   5  2021-09-06 12:47:53.002793  finished
   2  2021-09-06 12:47:52.995835  finished
   3  2021-09-06 12:46:52.993646  killed

If we look in the daemon log files, we can see how progression of the coordinator and workers:


INFO:server-501-12022:[STARTING] coordinator is starting...
INFO:server-501-12022:[SERVING] on ('', 59080)
INFO:server-501-12022:[NEW CONNECTION] ('', 59083)
INFO:server-501-12022:[NEW CONNECTION] ('', 59084)
INFO:server-501-12022:[HANDSHAKE COMPLETE] ('', 59083) PID 12023
INFO:server-501-12022:[STORED CONNECTIONS] 1
INFO:server-501-12022:[HANDSHAKE COMPLETE] ('', 59084) PID 12024
INFO:server-501-12022:[STORED CONNECTIONS] 2
INFO:server-501-12022:[PROCESS] Continuing process 1 (node 1) on worker 12023
INFO:server-501-12022:[PROCESS] Continuing process 2 (node 2) on worker 12024
INFO:server-501-12022:[PROCESS] Continuing process 4 (node 4) on worker 12023
INFO:server-501-12022:[PROCESS] Continuing process 5 (node 5) on worker 12024
INFO:server-501-12022:[PROCESS] Continuing process 3 (node 3) on worker 12023
INFO:server-501-12022:[PROCESS] Killing process 3 (node 3) on worker 12023
INFO:server-501-12022:[REMOVE DEAD WORKER] 12023
INFO:server-501-12022:[PROCESS] Continuing process 1 (node 1) on worker 12024
INFO:server-501-12022:[PROCESS] Continuing process 4 (node 4) on worker 12024


INFO:worker-501-12023:[STARTING] worker is starting...
INFO:worker-501-12023:[CONNECTING] to server: ('', 59080)
INFO:worker-501-12023:[HANDSHAKE COMPLETE]
INFO:worker-501-12024:[STARTING] worker is starting...
INFO:worker-501-12024:[CONNECTING] to server: ('', 59080)
INFO:worker-501-12024:[HANDSHAKE COMPLETE]
INFO:worker-501-12023:[CONTINUE] process node 1
INFO:worker-501-12024:[CONTINUE] process node 2
INFO:worker-501-12023:[CONTINUE] process node 4
INFO:worker-501-12024:[CONTINUE] process node 5
INFO:worker-501-12023:[CONTINUE] process node 3
INFO:worker-501-12023:[KILLED] process 3
INFO:worker-501-12024:[CONTINUE] process node 1
INFO:worker-501-12024:[CONTINUE] process node 4
INFO:worker-501-12024:[FINISH] process 2
INFO:worker-501-12024:[FINISH] process 5
INFO:worker-501-12024:[FINISH] process 1
INFO:worker-501-12024:[FINISH] process 4

If running a large amount of processes you can use process status to summarise the load on the workers:

$ tox daemon incr 2
$ tox process submit 1000
$ tox process status
Process nodes: 1005
Non-terminated nodes: 600
Active processes: 600
Worker loads (PID -> count/max):
  32887: 200 / 200
  32888: 200 / 200
  32962: 200 / 200

To stop the daemon and remove the work directory:

$ tox daemon stop -- --clear

To remove the database:

$ tox process remove-db


See also TODO in the code-base.