Closed njsmith closed 7 years ago
I'm definitely +1 on this. However, when it comes to the implementation, there are some sneaky details. I was actually trying to implement such thing with a "soft delete" approach, similar to what they do in asyncio.Semaphore, but I struggled to get it right though.
Sure, we can use linked lists, but this won't actually give as any performance boost, since deque
is written in C, and will outperform any smart data structure written in python, unless we have asymptotically large number of waiters in a queue, which seems highly unlikely.
So I would try to stick with the "soft delete approach" right now.
The operations we care about for a linked list - popleft
, append
, and remove
- are all pretty simple (a few opcodes) even in pure Python, so I think it's worth checking to see just how much slower it is. And yeah, the idea wouldn't be to go faster on average, it'd be to avoid worst-case behavior. (Though -- O(n) can add up pretty fast in some cases. It's not too hard to imagine a workload where a few hundred tasks block waiting for the same event, and then all get canceled, and then it's O(N**2) to take them all out of the deque one at a time.)
"Soft delete" as you call it seems like it'll be much trickier to get right... what if a waits blocks on a mutex, then the wait is canceled, then it waits on the mutex again? How do you know that you're supposed to skip over the first copy of the task inside the queue, but not the second? How would you avoid memory management problems -- do we need a WeakDeque, and does that have to be implemented in C?
Thinking out loud, I guess the linked-list operations would be something like:
# assuming a queue is a circular double-linked intrusive list
# queue.next is head and queue.previous is tail
# empty queue is when queue.next == queue.previous == queue
def cancel_wait(task):
task.previous.next = task.next
task.next.previous = task.previous
task.next = task.previous = None
def popleft(queue):
task = queue.next
if task is queue:
raise QueueEmpty
cancel_wait(task)
return task
def append(queue, task):
task.next = queue
task.previous = queue.previous
queue.previous.next = task
queue.previous = task
Well, I think you've just reinvented something similar to the FreeBSD turnstiles. Which is actually a pretty neat approach for a task synchronization. I guess we can give it a try.
I don't want curio building linked lists for this. Part of the reason why is that I thought it would be interesting to allow for different queuing implementations to be used for sleeping tasks. For example, if you wanted to pursue priority queuing, then these sleeping task queues should probably respect task priorities. Trying to layer that onto some baked-in linked list structure is going to be hell.
I think a soft delete approach could work--and I have some ideas about that that aren't completely insane (I'll write up later when I get a chance). Soft delete is also easier to make work with alternate queuing structures.
If I may put on the "practicality beats purity" hat for a moment though, I'm just wondering how common it would be for lots of tasks to be waiting on the same synchronization primitive? The maximum queue size would be the total number of tasks. What is the likelihood that they're all blocked on the same mutex? And even then, what is the likelihood that they're going to be cancelled? I just don't know. I suppose I could envision a lot of tasks sitting around waiting on a queue maybe. The corner case of cancelling everything at once and getting a possible O(N**2) situation is interesting though. Maybe.
I want to just note for the record that when we say "priorities" here we're talking specifically about are "strict" priorities, like Linux's real-time priorities, not the priorities that you get with WFQ or unix "nice"-style priorities (see here for more discussion of the distinction). WFQ prioritizes tasks on the run queue, but uses FIFO (or whatever) for wait queues.
As a general principle, I really don't think we should be making decisions about how the code should look now based on similarity to how it might look if we later add some other feature. If we decide to add support for priorities later, then implementing that feature will necessarily involve surgery to curio's guts anyway; we can always change the waitqueue implementation as part of that. And at that point we'll have a much better idea of what kind of waitqueue actually makes sense for the specific priority design we end up with. And if we don't decide to support strict priorities, then we've really wasted our time worrying about it now. (This general design principle is what agile folks call "YAGNI".) So my preference would be to implement the best FIFO waitqueue for now and let the future take care of itself.
That said, it is an interesting intellectual question how to best implement a wait queue that supports multiple priorities :-). So here's a review of some options. (Spoiler: I think maybe I like the last one best?)
One option would be a heapq + a counter that increments for each task queued, and is used to guarantee FIFO ordering within a priority level:
class HeapqBasedPriorityWaitingQueue:
def __init__(self):
self._tasks = []
self._counter = itertools.count()
def append(self, task):
heapq.heappush(self._tasks, (task.priority, next(self._counter), task))
# etc.
An interesting feature of this data structure is that it potentially supports mutating a task's priority without it losing its place in line; whether that matters depends on whether that's a feature our hypothetical priority API wants to support, and also I guess on the exact details of what the API looks like. (E.g. you might only want to disallow arbitrary priority mutations but still support priority inheritance; priority inheritance is a little simpler though because priority changes are monotonic, which puts a bound on how many times its priority can change while its in a wait queue.)
But this doesn't support fast deletes, so we'd need "soft delete" support on top of it. How doable is this? Here's one strategy. Instead of enqueuing the actual task objects, we enqueue (priority, counter, taskid, cookie)
tuples. The point of enqueueing taskid
s instead of Task
s is that this avoids having a defunct wait queue entry pin the Task
object in memory. (NB that this also requires that taskid
s have to be globally unique over the lifetime of the program. Which ours currently are, so that's fine. But e.g. if we started using id(taskobject)
for taskid
then this system would be broken.) The job of the cookie
is to let us identify when an entry has been soft-deleted -- it's some arbitrary that's stored inside Task
object itself, and which changes to a new globally unique (over the lifetime of this task) value every time the Task
enters a wait queue. So something like:
def append(self, task):
task.cookie += 1
heapq.heappush(self._tasks, (task.priority, next(self._counter), task.taskid, task.cookie))
def popleft(self):
while True:
(_, _, taskid, cookie) = heapq.heappop(self._tasks)
if taskid not in kernel_task_table:
continue
task = kernel_task_table[taskid]
if task.cookie != cookie:
continue
return
def cancel_wait(task):
task.cookie += 1
However, this still has (at least one) edge case where it will blow up in a nasty way. Consider an HTTP/2 server with the following design:
curio.Event
that is used to signal that the server is entering "graceful shutdown" mode (i.e. it should stop accepting new requests, but should finish up the existing ones)With our heapq design so far, this combination of factors will lead to a situation where this Event
's waitqueue grows monotonically, forever, until the server is shut down or we run out of memory. (Rough estimate of the leak rate: it looks like each entry in the queue consumes about 1.__sizeof__() * 4 + (1, 2, 3, 4).__sizeof__() + 8 = 176
bytes of memory, so a server handling ~10 connections/second will leak ~150 MB/day.)
I can't see any way to solve this except to keep statistics on how many "soft deletes" are pending inside the waitqueue, and trigger a "hard delete" cycle if it grows to large. (The idea would be that this is still an improvement over the naive "hard delete" approach, because we can batch up a set of hard-deletes together to amortize their cost, so it's not an amortized O(n).) This is unfortunate, because the nicest thing about this design so far is the delete
implementation being so simple; I guess this means that each Task
actually needs to keep a reference to the queue its waiting on, and increment a counter, compare its value to the overall size of the waitqueue, etc. Something like:
def cancel_wait(task):
task.cookie += 1
task.waitq._soft_deletes_pending += 1
if task.waitq._soft_deletes_pending > 100 and 2 * task.waitq._soft_deletes_pending > len(task.waitq._tasks):
task.waitq._process_hard_deletes()
task.waitq = None
(plus popleft
needs to decrement _soft_deletes_pending
appropriately, etc.)
Overall it seems workable, but it's rather complex -- enough that I'd be nervous we've missed some more options -- and it may or may not be the fastest option.
The Linux kernel uses an interesting data structure for this -- it's basically a doubly-linked list, but with a second skiplist-like (or "skiplist-lite") index on top: http://lxr.free-electrons.com/source/include/linux/plist.h#L19
The idea is that at each moment the underlying list is a simple linear queue of tasks in the order that they should run, so high-priority tasks are in front of low-priority tasks. But you also have a second index list with one entry per priority, and each entry points to the first task with that priority in the underlying list. So when you want to insert a new task, you do a linear scan through the index list, and this lets you find the other tasks at the same priority so you can insert the new task immediately after them. This is O(k) where k is the number of distinct priorities used by tasks waiting on this queue, but that's not too bad, because on average k is small because most programs don't use many different priority levels, and in the worst case it's bounded to k <= 100 because the Linux RT scheduler is hard-coded to only support integer priorities between 1 and 99. If one wanted to support unbounded priorities, then the natural extension would be to use a real multi-layered skiplist, which would make inserts and deletes O(log k). This data structure is somehow the most "natural" one for this task, I think -- the operations that it makes easy/fast/natural are exactly the ones that we care about, without need for trickiness like soft delete. Of course, "naturalness" is not the only criterion :-).
A third approach would be to use a classic sorted container, the kind that supports O(log n) insertion, O(log n) deletion (given the key of the object to delete), and sorted iteration. sortedcontainers has a reputation as a wicked-fast pure-Python library for this kind of container structure. So something like:
from sortedcollections import SortedList
class SortedListWaitQueue:
def __init__(self):
self._tasks = SortedList(key=lambda task: (task.priority, task.wait_ticket))
self._counter = itertools.count()
def append(self, task):
task.waitqueue = self
task.wait_ticket = next(self._counter)
self._tasks.add(task)
def popleft(self, task):
return self._tasks.pop(0)
def cancel_wait(task):
task.waitqueue._tasks.remove(task)
task.waitqueue = task.wait_key = None
def reprioritize_task(task, new_priority):
task.waitqueue._tasks.remove(task)
task.priority = new_priority
task.waitqueue._tasks.add(task)
Theoretically this is a bit wince-inducing; the data structure really isn't the one that's best fitted to the task, and O(log n) is worse than O(1). But the implementation is ridiculously simple and flexible, it avoids all the edge cases and worst cases, and I bet it's pretty fast in practice. (Sorta like SortedList
actually, which if you look at its internals is a beautiful example of how theoretical algorithm analysis doesn't always apply directly to the real world.)
On priorities.... yes, I'm thinking about something like real-time priorities, not the 'nice' command.
Just an aside--the reason curio has that kqueue
object is that I long thought I'd probably need to replace it with something better than a deque
. The various synchronization objects use a kqueue
in that they hand it to the kernel, but they don't directly manipulate it. So, in theory, it was meant to be something that could be replaced by something else.
That Event
scenario is pretty interesting. I almost wonder if the kernel should support an event as a different kind of thing and dispense with queues altogether for that. It wouldn't be hard to build something like that using a dictionary. Cancellation involves dict deletion (fast). Setting the event dumps everything in the dictionary onto the ready queue.
By the way... these kinds of tricky issues are one of the reasons why I really like this project. ;-)
I'm a bit worried about the fact that we might overload the task object with a lot of additional attributes to support different features, like task.priority
, task.waitqueue
, task.wait_key
etc.
This makes Task implementation depend on the queue implementation. Ideally, tasks should be as loosely coupled with queues a possible. Maybe queue itself should maintain this information is an internal dictionary or something.
I agree. I don't want to build data structures (or queues) into the Task
object. I think these things should be separate. I can't think of any reason why this wouldn't be possible. We just need to be a bit clever perhaps.
I am committing a change that reworks some kernel internal synchronization to allow for more flexible handling of all of this. It passes all tests, but may need more work. Mainly thinking about how to generalize the functionality of "sleeping for some reason" to something that's not necessarily a queue.
Just a few notes on changes. I think the basic problem of "kernel level" synchronization involves just three core operations. 1) Putting a task to sleep. 2) Waking one or more tasks. 3) Cancelling a task. I've introduced a low-level API (KernelSyncBase) that pretty much does just this. This is not meant to be something that would be directly used by end-users. Instead, it's used to implement the various high-level synchronization primitives. It allows for a fair bit of flexibility in that it doesn't dictate anything at all about how tasks get managed internally. So, you can do soft-delete. You can also make non-queue objects for implementing events. I could use it to later explore priorities.
There are undoubtedly ways that this could probably be improved, but I'd like to strive for simplicity. This new approach is pretty simple. It didn't introduce any new traps. Mainly a refinement/generalization of some things that the kernel was doing already.
I'm closing this for the time being. I put some improved primitives into the kernel to address some of the main issues raised here. I'm probably going to tweak it a bit, but if there are continued concerns or improvements, raise a new issue.
Right now, objects like mutexes hold sleeping tasks in a
deque
. This is nice because it supports O(1) FIFO operations. But removing an arbitrary task (e.g. due to cancellation or timeout) is O(n). Ideally our sleeping task queue should be O(1) for all of these operations.I think probably the answer is to use an intrusive double-linked list. I.e., put
next
andprevious
pointers directly into the task object (a task can't sleep on multiple queues at once, so this is fine).