Closed Aaronontheweb closed 10 years ago
We should be able to use non threadsafe structures here imo. if we chain the normal mailbox semantics with whatever structure we use for priority sorting. then it should be safe.
message ->
mailbox.Post ->
mailbox.Run ->
loop:
gets all messages off normal queue ->
//insert new messages into priority queue
insert all into sorted structure ->
//process messages off the priority queue
pulls top message of sorted structure ->
process message.
Easier to implement, and extremely little overhead
Here is a priority queue implemented using a backing flat list. It inserts using binary search, so it might be somewhat slower than other implementations. On the other hand, it's very cheap memory wise.
http://visualstudiomagazine.com/articles/2012/11/01/priority-queues-with-c.aspx
As usual, Speed vs. Memory, we have to pick the best fit.
The heap impl you have @Aaronontheweb , is that based on http://www.codeproject.com/Articles/126751/Priority-queue-in-C-with-the-help-of-heap-data-str ?
@rogeralsing that implementation is astonishingly similar to ours, even though I've never seen that article before :)
But yes, that's close to what ours looks like - we require that the TPriority
generic constraint implement the IComparable interface but I think we also have an overload where you can pass in a priority calculator Func
instead.
Our implementation is thread-safe also, just using simple locks.
Anyone working on this or can I take it?
Go for it
I sort of have this working, However, the backing priorityqueue needs to preserve order of messages with the same priority.
I've created an abstraction for MessageQueue
which can be consumed by a mailbox.
This is in line with #301 also.
So if anyone is sitting on a performant priorityqueue that preserves order. that would be great. It does not have to be threadsafe, that is taken care of.
@rogeralsing
However, the backing priorityqueue needs to preserve order of messages with the same priority.
Akka uses the built-in java.util.PriorityQueue
for this, which makes no guarantees about preserving order of messages with the same priority upon insertion. http://docs.oracle.com/javase/7/docs/api/java/util/PriorityQueue.html
It uses a heap implementation similar to what we're proposing, probably a binary heap for O(log(N)) insert times.
I wouldn't worry about this - by definition this mailbox allows the order of messages processed to be different from order of messages received, especially if someone uses it as an aging priority mailbox (depends on how they write their priority function.)
As long as system messages still get their own queue and are processed ahead of user-level messages (keeping inline with the ConcurrentQueueMailbox
design) then everything should be ok.
@Aaronontheweb yes, system messages still have the original queueimpl. It's odd if they do not preseve order, because order is one of the guarantees that Akka makes: http://doc.akka.io/docs/akka/snapshot/general/message-delivery-reliability.html#The_General_Rules
I'll clean things up and make a PR and we can evaluate if the current approach is ok
@rogeralsing relevant note from that page:
It is important to note that Akka’s guarantee applies to the order in which messages are enqueued into the recipient’s mailbox. If the mailbox implementation does not respect FIFO order (e.g. a PriorityMailbox), then the order of processing by the actor can deviate from the enqueueing order.
cc @akkadotnet/developers
This should be safe to do, right?
I'm implementing this as a two stage rocket:
Are there any edge case that this won't deal with?
The reason for this design is that the prio queue does not have to be threadsafe, so we are still non blocking,
(Will provide different impls of prio queues once this is done)
What about this?
An Akka Mailbox holds the messages that are destined for an Actor. Normally each Actor has its own mailbox, but with for example a BalancingPool all routees will share a single mailbox instance. http://doc.akka.io/docs/akka/snapshot/scala/mailboxes.html
Isn't the balancing mailbox obsolete? [edit] BalancingPool != Balancing Mailbox I guess..
Doesn't look like it. Are you thinking about BalancingDispatcher?
@deprecated("Use BalancingPool instead of BalancingDispatcher", "2.3") class BalancingDispatcher(
ah yes. So, what should we do here? none of the other routers concepts in Akka requires a mailbox to be consumed by multiple parties. Our current mailbox is optimized for this scenario and we have a throughput that none of the other .NET actor frameworks can compete with ATM.
So, if BalancingPool requires a multi consumer mailbox, shouldn't we provide just that? and let the other cases use the most optimized approach by default?
From the doc:
Although the technique used in this implementation is commonly known as "work stealing", the actual implementation is probably best described as "work donating" because the actor of which work is being stolen takes the initiative.
It could be worth checking what BalancingPool actually does. I've seen similiar stuff like this implemented with the routee sending NeedMoreWork to the router which then sends it a new Work-message. In this case the inbox is only shared logically. BalancingPool has its mailbox and the routees are normal actors with their own mailboxes. If this is the case for this implementation too then your work is safe.
BalancingPool
uses the same configurator as BalancingDispatcher
, BalancingDispatcherConfigurator
if (!classOf[MultipleConsumerSemantics].isAssignableFrom(requirement))
throw new IllegalArgumentException(
"BalancingDispatcher must have 'mailbox-requirement' which implements akka.dispatch.MultipleConsumerSemantics; " +
So we can safely use the impl I have as long as we mark it as single consumer to prevent it from beeing used by BalancingPool
Ahh. Cool.
I'm implementing this as a two stage rocket: all messages are posted to a concurrentqueue, just like the normal mailbox when the mailbox runs, this queue pulls messages from the concurrent queue and place them into > the priority queueu the top message in the priority queue is returned Are there any edge case that this won't deal with?
Is this a "dequeue all" operation or a "deque(N)" operation? If it's the first option then everything should work.
Using this in one of our projects as of yesterday - cut down our CPU usage by about 90%.
Implemented. closing
http://doc.akka.io/docs/akka/2.2.0/scala/mailboxes.html
Ran into some use cases recently where having this would drastically cut down on CPU usage, by ~90% in some cases (compared having to re-tell-to-Self messages of a lower priority message over and over again.)
I haven't looked at the source to see how Akka implemented the PriorityQueue itself, but we've had success using a Binary Heap as the data structure for our SDK's exponential back-off priority queue for retrying failed API requests (it's an aging priority queue.) I'd be happy to share notes on that.