moka-rs / moka

A high performance concurrent caching library for Rust
Apache License 2.0
1.63k stars 73 forks source link

Pending `run_pending_tasks` may cause busy loop in `schedule_write_op` #412

Closed sticnarf closed 7 months ago

sticnarf commented 7 months ago

It's not so easy to create a minimal reproducible example. I will try to describe my code and my findings.

As a workaround for #408, I run run_pending_tasks manually when the cache size exceeds the capacity by a wide margin. However, occasionally I find my tokio worker threads are stuck in the busy loop of schedule_write_op.

I examine what happens with gdb:

I further add some logs to moka and find that inside my manual call to run_pending_tasks, it tries to acquire the current_task mutex and does not actually get it acquired. It probably shows that the starved mutex acquirer is the manual run_pending_tasks task.

There are various reasons why the run_pending_tasks task is not waked up to acquire the lock:

sticnarf commented 7 months ago

Because run_pending_tasks is paused before acquiring the current_task task, it doesn't enter do_run_pending_tasks and maintenance_task_lock is not acquired.

So, this line doesn't yield to break the busy loop.

https://github.com/moka-rs/moka/blob/ae33f8db63e3cbb697a307d0068299e7616c7d72/src/future/base_cache.rs#L681

It seems to me that current_task can play a similar role to the maintenance_task_lock. I'm thinking whether it's a good idea to remove maintenance_task_lock and change current_task to a RwLock. Then, we call let _ = current_task.read().await after spinning to yield.

tatsuya6502 commented 7 months ago

Hi. Thank you for reporting the issue. So, if I understand correctly, (1) unlocking maintenance_task_lock before (2) unlocking current_task causes all concurrent insert calls to go into busy-loop immediately after (1), and it will prevent manual run_pending_tasks to be scheduled by Tokio?

I'm thinking whether it's a good idea to remove maintenance_task_lock and change current_task to a RwLock. Then, we call let _ = current_task.read().await after spinning to yield.

Can you please try it by modifying moka's code locally?

When I implemented the loop in schedule_write_op, what I really wanted was tokio::task::yield_now. However, moka does not depend on any async runtime (Tokio, async_std, etc.), so I had to do some trick with async_lock::RwLock (maintenance_task_lock) instead.

tatsuya6502 commented 7 months ago

Oh, I realized that you added yield_now on your fork. https://github.com/sticnarf/moka/commit/8352623aed55ca29118beb46a6fc0c37188849df Yeah, that is the one I wanted (but is impossible).

sticnarf commented 7 months ago

Yes, I have tried inserting a yield_now before spinning. After that, I don't see the busy loop to occur again.

tatsuya6502 commented 7 months ago

That's great!

I still do not want to add async runtime dependency to moka. So, I think I will make the busy loop in schedule_write_op to sometimes call try_run_pending_task.

sticnarf commented 7 months ago

I still do not want to add async runtime dependency to moka. So, I think I will make the busy loop in schedule_write_op to sometimes call try_run_pending_task.

Do you mean sometimes call run_pending_tasks instead of the try_ version?

schedule_write_op already calls try_run_pending_task through apply_reads_writes_if_needed. But it never succeeds because try_lock fails due to there is a starved op waiting.

tatsuya6502 commented 7 months ago

schedule_write_op already calls try_run_pending_task through apply_reads_writes_if_needed. But it never succeeds because try_lock fails due to there is a starved op waiting.

I see. So, without calling yield_now in the busy loop, I think we should never call lock on the current_task. If we only call try_lock, it will be okay, because try_lock will never create a starved acquirer.

I will try to reproduce the problem and try some ideas to fix. I run mokabench before every release. Although the write op channel often gets full during a mokabench run, I have never had the busy loop problem. I think this is because mokabench never calls run_pending_tasks using lock, but it only lets insert method to call try_pending_tasks using try_lock.

I will modify mokabench to do something like the followings to see it will reproduce the problem:

I also combine the run_pending_tasks task together with other insert tasks in a join_all future.

tatsuya6502 commented 7 months ago

@sticnarf

I created a potential fix for the busy loop issue. Can you please try it if you have time?

[dependencies]
moka = { git = "https://github.com/moka-rs/moka", branch = "avoid-async-scheduler-busy-loop", features = ["future"] }

If you do not have time, that is okay. I have not been able to reproduce the issue using mokabench, but I will continue trying.

tatsuya6502 commented 7 months ago

I have not been able to reproduce the issue using mokabench, but I will continue trying.

I found the following program can reproduce the problem. I ran it on a MacBook Air with Apple M2 chip.

Cargo.toml

[dependencies]
moka = { version = "0.12.5", features = ["future"] }
# moka = { git = "https://github.com/moka-rs/moka", branch = "avoid-async-scheduler-busy-loop", features = ["future"] }

futures-util = "0.3.30"
rand = "0.8.5"
rand_pcg = "0.3.1"
tokio = { version = "1.37.0", features = ["rt-multi-thread", "macros"] }

src/main.rs

use moka::future::Cache;
use rand::{distributions::Uniform, prelude::*};
use rand_pcg::Pcg64;

const NUM_TASKS: usize = 48;
const NUM_KEYS: usize = 500_000;
const NUM_OPERATIONS_PER_TASK: usize = 1_000_000;
const NUM_INSERTS_PER_STEP: usize = 16;
const MAX_CAPACITY: u64 = 100_000;

// Use a small number of the worker threads to make it easy to reproduce the problem.
#[tokio::main(flavor = "multi_thread", worker_threads = 4)]
async fn main() {
    let cache = Cache::builder().max_capacity(MAX_CAPACITY).build();

    // Spawn `NUM_TASKS` tasks, each performing `NUM_OPERATIONS_PER_TASK` operations.
    let tasks = (0..NUM_TASKS).map(|task_id| {
        let cache = cache.clone();

        tokio::spawn(async move {
            let mut rng = Pcg64::seed_from_u64(task_id as u64);
            let distr = Uniform::new_inclusive(0, NUM_KEYS - 1);

            // Each task will perform `NUM_OPERATIONS_PER_TASK` operations.
            for op_number in 0..NUM_OPERATIONS_PER_TASK {
                // Each task will insert `NUM_INSERTS_PER_STEP` entries per operation in
                // parallel, and call `run_pending_tasks()` if the random number `n` is 0.
                let ns = (&mut rng)
                    .sample_iter(distr)
                    .take(NUM_INSERTS_PER_STEP)
                    .collect::<Vec<_>>();

                let ops = ns
                    .into_iter()
                    .map(|n| {
                        let key = format!("key-{}", n);
                        let value = format!("value-{}", n);
                        let cache = cache.clone();

                        async move {
                            cache.insert(key, value).await;
                            if n == 0 {
                                cache.run_pending_tasks().await;
                            }
                        }
                    })
                    .collect::<Vec<_>>();

                futures_util::future::join_all(ops).await;

                if (op_number + 1) % 1000 == 0 {
                    println!("Task {:2} - {:7}", task_id, op_number + 1);
                    // Give other tasks some chances to run.
                    tokio::task::yield_now().await;
                }
            }

            println!("Task {:2} - {:7}", task_id, NUM_OPERATIONS_PER_TASK);
        })
    });

    // Run all tasks and wait for them to complete.
    futures_util::future::join_all(tasks).await;

    cache.run_pending_tasks().await;
    println!("Done! Cache size: {}", cache.entry_count());
}
tatsuya6502 commented 7 months ago

Just for sure, I ran the above program on a Linux x86_64 PC with Intel Core i7-12700F. I got the same result:

tatsuya6502 commented 7 months ago

I merged #415 into the main branch and am going to delete the topic branch. Please use the main branch from now.

[dependencies]
moka = { git = "https://github.com/moka-rs/moka", branch = "main", features = ["future"] }

I will try to release v0.12.6 in coming weekend.

tatsuya6502 commented 7 months ago

I published moka@v0.12.6 to crates.io.

Thanks again for reporting the bug. Your analysis really helped to reproduce and fix it.

sticnarf commented 7 months ago

Sorry, I didn't have time to try the new fix until today. I can confirm the problem disappears in the test of my application with moka 0.12.6.

Thank you for the quick fix!

tatsuya6502 commented 7 months ago

Thank you for trying out moka 0.12.6 while you are busy!