tikv / yatp

Yet another thread pool in rust for both callbacks or futures.
Apache License 2.0
135 stars 32 forks source link

resource leak in nested task #82

Closed tabokie closed 1 year ago

tabokie commented 1 year ago

See reproducible case below (in the context of TiKV's future_pool.rs), an entire ThreadPool is leaked which I presume will be run in detached mode.

    #[test]
    fn test_shutdown_yatp_pool() {
        let outer = Builder::new(DefaultTicker {})
            .thread_count(1, 1, 1)
            .build_future_pool();
        let inner = Builder::new(DefaultTicker {})
            .thread_count(1, 1, 1)
            .build_future_pool();

        let inner_clone = inner.clone();
        outer.spawn(async move {
            let (tx, rx) = oneshot::channel();
            inner_clone.spawn(async move {
                println!("before bed");
                std::thread::sleep(std::time::Duration::from_secs(100));
                println!("slept over");
                tx.send(()).unwrap();
            }).unwrap();
            rx.await.unwrap();
        }).unwrap();

        std::thread::sleep(std::time::Duration::from_secs(1));

        drop(outer);

        // assertion fails with 2.
        assert_eq!(Arc::strong_count(&inner.inner), 1);
    }
tabokie commented 1 year ago

cc @BusyJay @sticnarf for help, I'm not familiar with async Rust to understand this.

sticnarf commented 1 year ago

I would say this is expected.

Let's call this part fut:

        async move {
            let (tx, rx) = oneshot::channel();
            inner_clone.spawn(async move {
                println!("before bed");
                std::thread::sleep(std::time::Duration::from_secs(100));
                println!("slept over");
                tx.send(()).unwrap();
            }).unwrap();
            rx.await.unwrap();
        }

fut captures inner_clone by move, so inner_clone won't drop until fut drops.

When polling rx in fut, an Arc of fut (not really, but equivalent) will be included in the Waker. The Waker is passed to the internal of the channel, so the future can be woken up when tx sends something.

So, it's the oneshot channel that holds a reference count of fut, and fut holds inner_pool.

tabokie commented 1 year ago

Interesting. Thanks for explaining. I'll just work around this by manually closing the inner before shutdown.