Closed ketiltrout closed 1 year ago
I think I've addressed all the comments.
I've also added a second commit that's not directly relevant to these changes, but it's convenient to add here. It adds chimedb
to the test requirements (and also fixes a concurrency test that's failing due to something in the Sqlite3 connector which I don't get.)
This PR introduces a "Fair Multi-FIFO Queue", which will be used by alpenhorn as the task scheduler. (It doesn't use it yet, as the code introduced here isn't actually used anywhere yet.
Overview
The FMFQ is a multi-producer, multi-consumer queue designed to be similar to the standard
queue.Queue
though it doesn't derive from that code, nor is it a drop-in replacement for it. All items in the queue are assigned to a FIFO (of which there are many). The FIFO for a given item is specified by a key. The queue is "fair" in the sense that it tries to keep the same number of items in-progress at any given time. (Same number in-progress not queued.)The FIFOs are implemented as
deque
s, though I don't know if that really saves us a whole lot of time.The queue is unbounded.
Queue basics
Like the standard
queue.Queue
, there are three primary functions used to interact with the queue:put(item, key, wait=0)
addsitem
to the queue, putting it at the end of the FIFO namedkey
. (Forwait
, see "Deferred puts" below).get(timeout=None)
retrieves the next item from the queue (where the concept of "next" adheres to the fairness criterion described above). If there is currently nothing in the queue, this function either waits for ever or else untiltimeout
expires. When returning an item from the queue (i.e. not timing out), it also provides thekey
to indicate which particular FIFO the item came from. When this function provides an item from the queue, that item is marked as being in-progress.task_done(key)
tells the queue that an item that was obtained from the FIFO namedkey
is no longer in-progress. A typical consumer will have code that looks something like this:The queue doesn't actually care which items are in-progress. It only keeps track of the number of in-progress items per FIFO.
Other methods
There is a
join
method used to shutdown the queue. It basically waits until everything from the queue is finished. It's used at the end of alpenhorn when it attempts to shutdown cleanly.There are also a number of queue-size methods which will be used by alpenhorn to determine whether a node is "idle" or not.
How the queue is used in alpenhorn
In alpenhorn, queued items are I/O tasks (which will be introduced in a subsequent PR), but that doesn't matter to the queue itself. The queue can accept any object. In alpenhorn, the FIFO
key
s are node names, meaning there is a separate queue per node. The fairness of the queue is needed to prevent one node (i.e. Nearline) from monopolising the worker threads. If only nearline tasks are present in the queue, they'll be provided to consumers, but if tasks from other nodes are also present, nearline jobs won't overwhelm the system and prevent these other tasks from being handled in a timely manner.While the queue code doesn't have anything alpenhorn-specific, a few design decisions have been made based on how alpenhorn uses the queue. Specifically, we assume the number of distinct FIFOs is small and even if they empty out, they will be re-used with the same key. The implementation here also assumes the number of items put into the queue is large in comparison to the number of FIFOs. (See the overly verbose comments in the code for further details.)
Deferred puts
If
put()
is called with a positive value forwait
, then the put is accepted by the FMFQ but not added to the queue. Instead it gets put into aheapq
of "deferred puts". These items wait around forwait
seconds to elapse before the item is added to the queue.Alpenhorn uses deferred puts to implement tasks which need to wait around for something to happen (i.e. nearline recalls). Having these tasks wait in the "deferred put" heap, means they aren't available for workers to take and the nodes appear idle until the deferrals expire.
The deferred puts are resolved during a
get
call (which will check for expired deferrals). As a result, often deferred puts never actually show up in the queue, since they're immediatelyget
ted.Other changes
I've added a workaround for the DB concurrency test. I don't understand why it fails when using the Sqlite3 connector. I've also added the CHIMEDB core package to the test-requirements so we can test it with this code.