/// Process any pending messages in the internal queue, calling wrapped actor's `handle()`.
fn process_queue(&mut self, context: &mut <Self as Actor>::Context) -> Result<(), A::Error> {
// Handle all messages that should have been handled by now.
let now = Instant::now();
while self.queue.peek().map(|m| m.fire_at <= now).unwrap_or(false) {
let item = self.queue.pop().expect("heap is non-empty, we have just peeked");
let message = match item.payload {
Payload::Delayed { message } => message,
Payload::Recurring { mut factory, interval } => {
let message = factory();
self.queue.push(QueueItem {
fire_at: item.fire_at + interval,
payload: Payload::Recurring { factory, interval },
});
message
},
};
// Let inner actor do its job.
//
// Alternatively, we could send an `Instant` message to ourselves.
// - The advantage would be that it would go into the queue with proper priority. But it
// is unclear what should be handled first: normal-priority message that should have
// been processed a while ago, or a high-priority message that was delivered now.
// - Disadvantage is we could easily overflow the queue if many messages fire at once.
self.inner.handle(&mut TimedContext::from_context(context), message)?;
}
Ok(())
}
Combined with the fact that we call this from handle().
It was introduced in https://github.com/tonarino/actor/pull/73 which fixed a converse issue: we need to make sure we don't reintroduce it. It has an unit test fortunately.
I think that we should:
send message to ourselves instead of calling inner.handle(), so that it goes through the actor loop and gets proper priority
only process one message at a time: change while let to if let
this would resolve the concern in the TODO in the code that we
Caused by this code
Combined with the fact that we call this from
handle()
.I think that we should:
inner.handle()
, so that it goes through the actor loop and gets proper prioritywhile let
toif let
CC @goodhoko.