tokio-rs / tokio

A runtime for writing reliable asynchronous applications with Rust. Provides I/O, networking, scheduling, timers, ...
https://tokio.rs
MIT License
27.17k stars 2.49k forks source link

Polling Timers Takes a Very Long Time #5987

Open eric-stokes-architect opened 1 year ago

eric-stokes-architect commented 1 year ago

Version cargo tree | grep tokio └── tokio v1.32.0 (/home/eric/proj/tokio/tokio) └── tokio-macros v2.1.0 (proc-macro) (/home/eric/proj/tokio/tokio-macros)

Platform Linux kagura.ryu-oh.org 6.4.6-76060406-generic #202307241739~1692717645~22.04~5597803 SMP PREEMPT_DYNAMIC Tue A x86_64 x86_64 x86_64 GNU/Linux

Description

Just calling poll in a timer (interval, sleep, etc) takes between 4 and 40 microseconds.

Minimal example:

test.rs

use futures::{channel::mpsc, prelude::*};
use hdrhistogram::Histogram;
use std::{
    future,
    time::{Duration, Instant},
};
use tokio::{runtime, task, time};

async fn send(tx: mpsc::UnboundedSender<Instant>) {
    let mut ticker = time::interval(Duration::from_millis(100));
    loop {
        let _ = ticker.tick().await;
        let _ = tx.unbounded_send(Instant::now());
    }
}

async fn recv(mut rx: mpsc::UnboundedReceiver<Instant>) {
    let mut data = Histogram::<u64>::new_with_bounds(1, 10_000_000, 5).unwrap();
    let mut last = Instant::now();
    loop {
        match rx.next().await {
            None => eprintln!("recv error"),
            Some(ts) => {
                let now = Instant::now();
                let _ = data.record((now - ts).as_nanos() as u64);
                if now - last > Duration::from_secs(1) {
                    last = now;
                    println!(
                        "10th: {}, 25th: {}, 50th: {}, 90th: {}, 99th: {}",
                        data.value_at_quantile(0.1),
                        data.value_at_quantile(0.25),
                        data.value_at_quantile(0.5),
                        data.value_at_quantile(0.9),
                        data.value_at_quantile(0.99)
                    )
                }
            }
        }
    }
}

fn main() {
    time();
    let rt = runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .unwrap();
    rt.block_on(async {
        let (tx, rx) = mpsc::unbounded();
        task::spawn(send(tx));
        task::spawn(recv(rx));
        future::pending::<()>().await;
    });
}

Cargo.toml

[package]
name = "test-clock"
version = "0.1.0"
edition = "2021"

[profile.release]
codegen-units = 1
opt-level = 3
lto = true
debug = false

[dependencies]
futures = "0.3.28"
hdrhistogram = "7.5.2"
tokio = { version = "1", features = ["full"] }
#tokio = { version = "1", path = "../tokio/tokio", features = ["full"] }
eric@kagura:~/proj/test-clock$ taskset -c 0 target/release/test-clock
10th: 2964, 25th: 3627, 50th: 4593, 90th: 6651, 99th: 30338
10th: 3627, 25th: 4081, 50th: 4624, 90th: 6651, 99th: 30338
10th: 3627, 25th: 4081, 50th: 4805, 90th: 7493, 99th: 30338
10th: 3258, 25th: 4268, 50th: 5128, 90th: 7493, 99th: 30338
10th: 3258, 25th: 4220, 50th: 5241, 90th: 7493, 99th: 30338
10th: 3460, 25th: 4127, 50th: 5023, 90th: 7463, 99th: 30338
10th: 3258, 25th: 4031, 50th: 4962, 90th: 7109, 99th: 30338
10th: 3323, 25th: 4031, 50th: 5023, 90th: 7030, 99th: 30338
10th: 3323, 25th: 4031, 50th: 5005, 90th: 7030, 99th: 30338

establishing a baseline of task to task communication via a channel

use futures::{channel::mpsc, prelude::*};
use hdrhistogram::Histogram;
use std::{
    future,
    time::{Duration, Instant},
};
use tokio::{runtime, task, time};

async fn send(tx: mpsc::UnboundedSender<Instant>) {
    //let mut ticker = time::interval(Duration::from_millis(100));
    loop {
        //let _ = ticker.tick().await;
        let _ = tx.unbounded_send(Instant::now());
    task::yield_now().await;
    }
}

async fn recv(mut rx: mpsc::UnboundedReceiver<Instant>) {
    let mut data = Histogram::<u64>::new_with_bounds(1, 10_000_000, 5).unwrap();
    let mut last = Instant::now();
    loop {
        match rx.next().await {
            None => eprintln!("recv error"),
            Some(ts) => {
                let now = Instant::now();
                let _ = data.record((now - ts).as_nanos() as u64);
                if now - last > Duration::from_secs(1) {
                    last = now;
                    println!(
                        "10th: {}, 25th: {}, 50th: {}, 90th: {}, 99th: {}",
                        data.value_at_quantile(0.1),
                        data.value_at_quantile(0.25),
                        data.value_at_quantile(0.5),
                        data.value_at_quantile(0.9),
                        data.value_at_quantile(0.99)
                    )
                }
            }
        }
    }
}

fn main() {
    time();
    let rt = runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .unwrap();
    rt.block_on(async {
        let (tx, rx) = mpsc::unbounded();
        task::spawn(send(tx));
        task::spawn(recv(rx));
        future::pending::<()>().await;
    });
}
eric@kagura:~/proj/test-clock$ taskset -c 0 target/release/test-clock
10th: 87, 25th: 88, 50th: 89, 90th: 92, 99th: 97
10th: 87, 25th: 88, 50th: 90, 90th: 92, 99th: 97
10th: 88, 25th: 89, 50th: 90, 90th: 92, 99th: 97
10th: 88, 25th: 89, 50th: 90, 90th: 92, 99th: 97
10th: 88, 25th: 89, 50th: 90, 90th: 92, 99th: 97
10th: 88, 25th: 89, 50th: 90, 90th: 92, 99th: 97

Clock events, tasks and channels are all critical components of modular async applications. Tasks and channels are fast enough to write low latency applications, but at the moment clock events are not. This is very unfortunate, as clock events are very hard to avoid, even if purging them from one's own code were possible, numerous widely used libraries depend on them. Moreover it is very common to find clock events in a select loop where various other IO operations are happening often, and this adds a massive tax to all of those operations, since very time an event in the select is ready this will cause the clock event will be polled again.

carllerche commented 1 year ago

Tokio's timer APIs are intended to be coase-grained. They are very efficient to great / cancel, but are not precise. If you need a high resolution timer, use an OS-specific API and integrate it with TOkio via AsyncFD. There probably are already crates out there.

eric-stokes-architect commented 1 year ago

Sorry, I'm not saying that the timer wakes up at the wrong time, or should be more precise, I'm well aware we're working with course grained timers here. I'm saying that poll ON the timer takes 40 microseconds. Just calling poll, not calling it until it returns Ready, just calling it once, can take from 4 - 40 microseconds.

I know this because I put timing statements in the interval timer's poll function and then ran the above program using my local copy of tokio, I'm sorry I didn't mention that in the bug report, I thought I would let the example program stand for itself.

carllerche commented 1 year ago

Can you include a snippet that times includes the poll time by instrumenting the poll start / stop? You can either implement a future manually or use github.com/tokio-rs/tokio-metrics/

eric-stokes-architect commented 1 year ago

Yes, of course. Earlier during a long conversation on discord with Alice, we instrumented various time functions in tokio, here is output I just generated by recompiling against the instrumented tokio.

start Interval::tick
start Interval::poll_tick
start Interval::poll_tick::poll_delay
start Sleep::poll
start TimerEntry::poll_elapsed
check shutdown 133ns
check registered 134ns
poll inner 332ns
TimerEntry::poll_elapsed 9.481µs
finish Sleep::poll 15.024µs
finish Interval::poll_tick::poll_delay 18.625µs
interval reregister 294ns
finish Interval::poll_tick: 24.853µs
finish Interval::tick: 28.9µs

Here is the instrumented Interval::tick method, as an example of how the instrumentation was done.

    pub async fn tick(&mut self) -> Instant {
        #[cfg(all(tokio_unstable, feature = "tracing"))]
        let resource_span = self.resource_span.clone();
        #[cfg(all(tokio_unstable, feature = "tracing"))]
        let instant = trace::async_op(
            || poll_fn(|cx| self.poll_tick(cx)),
            resource_span,
            "Interval::tick",
            "poll_tick",
            false,
        );
        #[cfg(not(all(tokio_unstable, feature = "tracing")))]
        let instant = poll_fn(|cx| {
            println!("start Interval::tick");
            let st = Instant::now();
            let r = self.poll_tick(cx);
            println!("finish Interval::tick: {:?}", st.elapsed());
            r
        });

        instant.await
    }
eric-stokes-architect commented 1 year ago

And here is an example where we took the early return from poll_tick because the timer isn't ready

start Interval::tick
start Interval::poll_tick
start Interval::poll_tick::poll_delay
start Sleep::poll
start TimerEntry::poll_elapsed
check shutdown 42ns
check registered 2.291µs
poll inner 251ns
TimerEntry::poll_elapsed 7.225µs
finish Sleep::poll 10.075µs
finish Interval::poll_tick::poll_delay 12.922µs
finish Interval::tick: 17.07µs