radiocosmology / alpenhorn

Alpenhorn is a service for managing an archive of scientific data.
MIT License
2 stars 1 forks source link

Rewrite 3/10: Tasks and Worker Pool #146

Closed ketiltrout closed 1 year ago

ketiltrout commented 1 year ago

This PR adds the asynchronous Task (a callable object containing the code of an I/O job), the Worker (a thread object that executes Tasks), and the Pool (a container which manages the Workers). This PR does not integrate these new objects with the rest of alpenhorn (for that, see #148), but does make use of the queue introduced in #145.

Tasks

The Task class is a callable container class. I/O tasks are made by creating instances of this class by providing the Task class with a function (containing the I/O code) along with any arguments needed to be passed to the function. The first positional argument to a task function will be the task object itself.

Tasks are immediately and automatically placed onto the queue when they are instantiated, which might seem a little unintuitive, and originally the queueing needed to be done explicitly, but because the queue is one of the parameters that is needed when instantiating the task anyways, I found the task-creation code was always the following:

task = Task(queue=queue, key=key, <other-args>...)
queue.put(task, key)

which is a bit redundant, so I streamlined it.

The Task instance itself is callable, and when called it will, in turn, execute the provided I/O function. (For lack of a better term, I call these inner functions containing the guts of the I/O task "async"s. These asyncs can either be regular functions or else they can be generator-functions (because they contain a yield statement). Yielding asyncs are used in places where the task needs to wait for something external to happen (i.e. nearline recall). When this happens, the async yields a number indicating how long to wait before resuming the task.

The task takes care of these generator-functions and will requeue them if they yield (which is why the queue and FIFO key need to given to them when a new Task is created.

The async functions can also register clean-up functions (via the passed-in Task object) which will run after the async returns. These are used sparingly within alpenhorn, mostly for bookkeeping purposes.

Workers

A Worker is a Thread subclass which contains the logic for pulling Tasks from the queue an executing them. On start-up, each Worker calls db.connect() to create a database connection and then enters a loop where it waits (via queue.get()) for I/O Tasks to be enqueued. It executes a Task once it gets one and then calls queue.task_done() when completed. Each Worker has a threading.Event which can be used to stop it in-between task executions.

Workers will try to exit cleanly when a task they're executing raises peewee.OperationalError. If necessary, it will requeue the task it was working on, so it will get executed again, before it exits. (The pool will notice these clean exits and restart the worker; see below.)

If tasks raise some other Exception, then the worker exits uncleanly. In this case it will trigger the global_abort Event before it exits. Triggering the global_abort will cause alpenhorn as a whole to attempt to exit cleanly. All workers monitor the global abort and will exit cleanly as soon as they finish their current task when they notice it has been set.

The Pool

The workers are managed by the pool, which is responsible for starting and stopping them. Calling the pool.check method (which alpenhorn will do once per update loop) will restart any cleanly-exited workers (assuming a global abort isn't in progress).

Because I don't yet know how many workers is reasonable, the number of workers can be increased or decreased on the fly by sending SIGUSR1 and SIGUSR2. Alpenhorn is designed (or will be in a future PR in this series) to keep working even with zero workers. In that case, tasks will be executed synchronously during the main update loop.

If the database extension in use is not threadsafe (the default fallback db in db.py is not, although the CHIMEDB one we will be using is), then a separate worker pool called EmptyPool is (will be) used by alpenhorn instead. This EmptyPool behaves just like the default worker Pool except it never has any workers in it and attempts to add more won't do that. As a result, when using EmptyPool, all I/O happens synchronously.