tikv / yatp

Yet another thread pool in rust for both callbacks or futures.
Apache License 2.0
135 stars 32 forks source link

Support dynamically scaling workers of the pool (#60) #61

Closed ethercflow closed 2 years ago

ethercflow commented 2 years ago

This Feature Request description and Technical proposal can be seen here: #60

Tests that have been run, the test env is

Testcases:

#!/bin/bash -e

while true
do
        cargo bench --all -- --test
        cargo bench --all --all-features -- --test
        cargo test --all -- --nocapture
        cargo test --all --all-features -- --nocapture
done
test queue::multilevel::tests::test_get_elapsed_deadlock ... ignored
test task::future::tests::test_no_preemptive_task ... ignored
test task::future::tests::test_repoll_limit ... ignored
test task::future::tests::test_reschedule ... ignored

The test logic is simple:

fn scale_workers() {
    let pool = Builder::new("SP")
        .max_thread_count(16)
        .core_thread_count(4)
        .build_callback_pool();
    let handler = pool.remote().clone();
    let builder = thread::Builder::new().name("wl".to_string());
    builder
        .spawn(move || {
            loop {
                let (tx, rx) = mpsc::channel();
                // A bunch of tasks should be executed correctly.
                let cases: Vec<_> = (10..100000000).collect();
                for id in &cases {
                    let t = tx.clone();
                    let id = *id;
                    handler.spawn(move |_: &mut Handle<'_>| t.send(id).unwrap());
                }
                let mut ans = vec![];
                for _ in 10..100000000 {
                    let r = rx.recv_timeout(Duration::from_secs(1)).unwrap();
                    ans.push(r);
                }
                ans.sort();
                assert_eq!(cases, ans);
                println!("finish one loop");
            }
        })
        .unwrap();
    loop {
        let mut rng = rand::thread_rng();
        let new_thread_count = rng.gen_range(1..16);
        println!("scale workers to {}", new_thread_count);
        pool.remote().scale_workers(new_thread_count);
        thread::sleep(Duration::from_secs(10));
    }
}

While running, execute the pidstat -G yatp -t 1 command to observe that the number of threads running is in line with expectations:

➜  test-yatp git:(main) ./target/release/test-yatp
scale workers to 8
scale workers to 14
scale workers to 9
00:57:19      UID      TGID       TID    %usr %system  %guest   %wait    %CPU   CPU  Command
00:57:20        0   3950591         -  785.00   77.00    0.00    0.00  862.00    17  test-yatp
00:57:20        0         -   3950592   89.00    6.00    0.00    0.00   95.00     6  |__SP-0
00:57:20        0         -   3950593   87.00    8.00    0.00    0.00   95.00     2  |__SP-1
00:57:20        0         -   3950594   90.00    6.00    0.00    0.00   96.00     7  |__SP-2
00:57:20        0         -   3950595   87.00    9.00    0.00    0.00   96.00    10  |__SP-3
00:57:20        0         -   3950604   88.00    8.00    0.00    0.00   96.00    39  |__SP-12
00:57:20        0         -   3950605   88.00    8.00    0.00    0.00   96.00    31  |__SP-13
00:57:20        0         -   3950606   87.00    8.00    0.00    0.00   95.00    33  |__SP-14
00:57:20        0         -   3950607   87.00    8.00    0.00    0.00   95.00    21  |__SP-15
00:57:20        0         -   3950608   83.00   16.00    0.00    0.00   99.00    25  |__wl

00:57:20      UID      TGID       TID    %usr %system  %guest   %wait    %CPU   CPU  Command
00:57:21        0   3950591         -  797.00  149.00    0.00    0.00  946.00    17  test-yatp
00:57:21        0         -   3950592   75.00   13.00    0.00    0.00   88.00    21  |__SP-0
00:57:21        0         -   3950593   76.00   10.00    0.00    0.00   86.00     7  |__SP-1
00:57:21        0         -   3950594   75.00   11.00    0.00    1.00   86.00    31  |__SP-2
00:57:21        0         -   3950595   75.00   11.00    0.00    0.00   86.00    10  |__SP-3
00:57:21        0         -   3950596   21.00    4.00    0.00    0.00   25.00    29  |__SP-4
00:57:21        0         -   3950597   19.00    5.00    0.00    0.00   24.00    20  |__SP-5
00:57:21        0         -   3950598   20.00    5.00    0.00    0.00   25.00    33  |__SP-6
00:57:21        0         -   3950599   19.00    5.00    0.00    0.00   24.00    13  |__SP-7
00:57:21        0         -   3950600   20.00    4.00    0.00    0.00   24.00    35  |__SP-8
00:57:21        0         -   3950602   20.00    4.00    0.00    0.00   24.00     9  |__SP-10
00:57:21        0         -   3950604   77.00   10.00    0.00    0.00   87.00    39  |__SP-12
00:57:21        0         -   3950605   75.00   11.00    0.00    0.00   86.00     3  |__SP-13
00:57:21        0         -   3950606   75.00   12.00    0.00    0.00   87.00     1  |__SP-14
00:57:21        0         -   3950607   75.00   11.00    0.00    0.00   86.00    23  |__SP-15
00:57:21        0         -   3950608   69.00   32.00    0.00    0.00  101.00    25  |__wl

00:57:34      UID      TGID       TID    %usr %system  %guest   %wait    %CPU   CPU  Command
00:57:35        0   3950591         -  829.00  100.00    0.00    0.00  929.00    21  test-yatp
00:57:35        0         -   3950592   84.00    8.00    0.00    0.00   92.00    30  |__SP-0
00:57:35        0         -   3950594   83.00   10.00    0.00    0.00   93.00    15  |__SP-2
00:57:35        0         -   3950595   85.00    8.00    0.00    0.00   93.00    17  |__SP-3
00:57:35        0         -   3950597   83.00    9.00    0.00    1.00   92.00     0  |__SP-5
00:57:35        0         -   3950599   85.00    7.00    0.00    0.00   92.00    29  |__SP-7
00:57:35        0         -   3950600   82.00   10.00    0.00    0.00   92.00    34  |__SP-8
00:57:35        0         -   3950602   83.00    9.00    0.00    0.00   92.00    25  |__SP-10
00:57:35        0         -   3950606   85.00    8.00    0.00    0.00   93.00    23  |__SP-14
00:57:35        0         -   3950607   82.00   10.00    0.00    1.00   92.00     1  |__SP-15
00:57:35        0         -   3950608   78.00   19.00    0.00    0.00   97.00    31  |__wl
^C

It does make sense?

Signed-off-by: Wenbo Zhang ethercflow@gmail.com

ethercflow commented 2 years ago

@BusyJay @sticnarf PTAL, thanks!

ethercflow commented 2 years ago

I found that the CI failed caused by thread 'pool::tests::test_basic' panicked at 'called `Result::unwrap()` on an `Err` value: Timeout', src/pool/tests.rs:38:67, but I cannot reproduce it locally. Is this just because CI is too slower?

sticnarf commented 2 years ago

I found that the CI failed caused by thread 'pool::tests::test_basic' panicked at 'called `Result::unwrap()` on an `Err` value: Timeout', src/pool/tests.rs:38:67, but I cannot reproduce it locally. Is this just because CI is too slower?

I don't know. I think the 1 seconds timeout is large enough. I cannot reproduce the failure locally, either.

ethercflow commented 2 years ago

The idea generally looks good. Is there any benchmark result about this change? It looks it adds a few checks to paths that are frequently called.

Not yet, will follow up

ethercflow commented 2 years ago

The idea generally looks good. Is there any benchmark result about this change? It looks it adds a few checks to paths that are frequently called.

Not yet, will follow up

The comparison result of bench is as follows, base represents the current master branch, and scale represents my modification:

image

ethercflow commented 2 years ago

I found that the CI failed caused by thread 'pool::tests::test_basic' panicked at 'called `Result::unwrap()` on an `Err` value: Timeout', src/pool/tests.rs:38:67, but I cannot reproduce it locally. Is this just because CI is too slower?

I don't know. I think the 1 seconds timeout is large enough. I cannot reproduce the failure locally, either.

I found the reason. The setting of the default value of core_thread_count is problematic. If it is not set, it should be equal to the max_thread_count not cpu_nr.

taskset -a -c 1 cargo bench --all - --test can reproduce.