tokio-rs / tokio-core

I/O primitives and event loop for async I/O in Rust
Apache License 2.0
640 stars 115 forks source link

Waiting short periods of time with tokio_core::reactor::Timeout #298

Closed realcr closed 6 years ago

realcr commented 6 years ago

This issue is based on this post on the Rust programming language forum.

I want to create a task that is polled every short period of time. I try to do this using a Timeout. After creating a new Timeout, one has to poll() it and have an Ok(Async::NotReady) result to make sure the task will be polled again. However, when creating a Timeout with a very short duration, if I call a poll() over it I get an Ok(Async::Ready(_)) result, therefore the task will never be polled again.

I was wondering what would be a good approach to having a Task that is polled very often. If this is of any interest, the reason that I'm trying to do this is because I'm trying to create a Task that controls the rate of outgoing UDP packets.

A self contained code example:

extern crate futures;
extern crate tokio_core;

use std::time::Duration;
use tokio_core::reactor::{Timeout, Handle, Core};
use futures::{Async, Future, Poll};

struct TimeoutFuture {
    timeout_opt: Option<Timeout>,
    handle: Handle,
}

impl Future for TimeoutFuture {
    type Item = ();
    type Error = ();
    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        self.timeout_opt = match self.timeout_opt.take() {
            None => { 
                println!("First poll() call");
                let mut timeout = Timeout::new(Duration::new(0,1000), &self.handle).unwrap();
                timeout.poll();
                Some(timeout)
            },
            Some(timeout) => {
                println!("poll() was called again!");
                Some(timeout)
            },
        };
        Ok(Async::NotReady)
    }
}

fn main() {
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    let timeout_future = TimeoutFuture {
        timeout_opt: None,
        handle: handle.clone(),
    };
    core.run(timeout_future).unwrap();
}

The result of running this code is having only "First poll() call" printed, and then the program hangs. What I want to happen is to be able to have both prints printed to the console.

The two important lines are:

let mut timeout = Timeout::new(Duration::new(0,1000), &self.handle).unwrap();
timeout.poll();

For short periods of time (Like 1000 above), the poll call with return Async::Ready, therefore the Task will not be polled again. I could make this work if I pick a larger waiting number, like 1000000, but sending UDP packets in this rate is too slow for me.

Of course if I don't have the timeout.poll() line, the output will be the same: Only the first print: "First poll() call" will be printed to the console. I understand that if I don't poll() the Timeout and get and Ok(Async::NotReady) result, there is no reason that my Task will be polled again.

I want to have a Task that is polled very often, in very short periods of time. At the same time, I also want to let other tasks work.

@vitalyd from the Rust programming language forum had the interesting idea of using: futures::task::current().notify(). I combined it with the code above as follows:

let mut timeout = Timeout::new(Duration::new(0,1000), &self.handle).unwrap();
match timeout.poll() {
    Ok(Async::Ready(())) => futures::task::current().notify(),
    _ => {},
};

This worked, but this feels to me like a hacky solution.

I was wondering if you have any ideas for approaching this problem. I am willing to go low level if I know that eventually I can have a nice interface with the rest of the Futures in my code.

p.s. Thanks for your work on Tokio! Using Tokio is one of those things that make me happy wake up in the morning.

carllerche commented 6 years ago

Timeout granularity is 1ms. Anything less is "instant". This is due to operating system timeout granularity.

The yield approach is correct: task::current().notiy(). Ideally, there would be some shim over that, but there isn't one right now.

Hope this helps. I'm going to close the issue, but if you feel that it isn't addressed, please comment and I can reopen it.