robinfriedli / rusty_pool

Self growing / shrinking ThreadPool implementation based on crossbeam's multi-producer multi-consumer channels that enables awaiting the result of a task and offers async support.
Apache License 2.0
39 stars 6 forks source link

Failing with "there is no reactor running" #10

Closed milesj closed 1 year ago

milesj commented 1 year ago

I'm trying to integrate this crate instead of my custom build worker pool, but I receive a panic of "there is no reactor running, must be called from the context of a Tokio 1.x runtime". I'm 100% using Tokio, since my entire app is built around it and async.

I'm not really sure what to change here. I've tried spawn, spawn_await, and complete, and they all panic.

Here's the commit where I switched to the pool:

https://github.com/moonrepo/moon/pull/518/commits/31d0a6583bd3e54863a58dfaa0789ef37dfc3297#diff-fa55688d36973c7de638f454b143b99e29a7748d86c753d291f8a2960f75e148R81

robinfriedli commented 1 year ago

This doesn't sound like a panic thrown by this crate directly, can you provide a full backtrace using RUST_BACKTRACE=1?

One thing that sounds like it might be relevant here is that this pool doesn't spawn any threads until you submit a task to it. Depending on where the panic comes from it may be useful to use start_core_threads in case something expects threads to be running. Of course functions like spawn_await spin up threads if necessary themselves.

Another possibility is that the pool gets dropped and its channel broken before the submitted work is finished. But briefly looking at your code it seems you join all work before dropping the pool so that's probably not it. In fact I don't even think that's possible because each AsyncTask keeps a clone of the pool, so if there are live AsyncTasks the pool's channel should not get closed.

These are just shots in the dark, can't say much more without knowing where the panic comes from. To me it sounds like the submitted tasks are expected to run within the tokio runtime, or a tokio version mismatch somewhere else (this crate doesn't depend on tokio, it only uses crossbeam-channel and the futures lib).

milesj commented 1 year ago

Ok, I created a new PR with only rust changes: https://github.com/moonrepo/moon/pull/527 I also tried start_core_threads, same issue.

Here's the full backtrace.

thread 'rusty_pool_1_thread_4' panicked at 'there is no reactor running, must be called from the context of a Tokio 1.x runtime', /Users/milesj/.cargo/registry/src/github.com-1ecc6299db9ec823/hyper-0.14.19/src/client/connect/dns.rs:121:24
stack backtrace:
   0: rust_begin_unwind
             at /rustc/69f9c33d71c871fc16ac445211281c6e7a340943/library/std/src/panicking.rs:575:5
   1: core::panicking::panic_fmt
             at /rustc/69f9c33d71c871fc16ac445211281c6e7a340943/library/core/src/panicking.rs:65:14
   2: core::panicking::panic_display
             at /rustc/69f9c33d71c871fc16ac445211281c6e7a340943/library/core/src/panicking.rs:139:5
   3: tokio::runtime::context::current
             at /Users/milesj/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.1/src/runtime/context.rs:22:19
   4: tokio::runtime::blocking::pool::spawn_blocking
             at /Users/milesj/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.1/src/runtime/blocking/pool.rs:134:14
   5: tokio::task::blocking::spawn_blocking
             at /Users/milesj/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.1/src/task/blocking.rs:208:9
   6: <hyper::client::connect::dns::GaiResolver as tower_service::Service<hyper::client::connect::dns::Name>>::call
             at /Users/milesj/.cargo/registry/src/github.com-1ecc6299db9ec823/hyper-0.14.19/src/client/connect/dns.rs:121:24
   7: <S as hyper::client::connect::dns::sealed::Resolve>::resolve
             at /Users/milesj/.cargo/registry/src/github.com-1ecc6299db9ec823/hyper-0.14.19/src/client/connect/dns.rs:350:13
   8: hyper::client::connect::dns::resolve::{{closure}}
             at /Users/milesj/.cargo/registry/src/github.com-1ecc6299db9ec823/hyper-0.14.19/src/client/connect/dns.rs:360:5
   9: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /rustc/69f9c33d71c871fc16ac445211281c6e7a340943/library/core/src/future/mod.rs:91:19
  10: hyper::client::connect::http::HttpConnector<R>::call_async::{{closure}}
             at /Users/milesj/.cargo/registry/src/github.com-1ecc6299db9ec823/hyper-0.14.19/src/client/connect/http.rs:338:17
  11: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /rustc/69f9c33d71c871fc16ac445211281c6e7a340943/library/core/src/future/mod.rs:91:19
  12: <hyper::client::connect::http::HttpConnector<R> as tower_service::Service<http::uri::Uri>>::call::{{closure}}
             at /Users/milesj/.cargo/registry/src/github.com-1ecc6299db9ec823/hyper-0.14.19/src/client/connect/http.rs:271:61
  13: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /rustc/69f9c33d71c871fc16ac445211281c6e7a340943/library/core/src/future/mod.rs:91:19
  14: <core::pin::Pin<P> as core::future::future::Future>::poll
             at /rustc/69f9c33d71c871fc16ac445211281c6e7a340943/library/core/src/future/future.rs:124:9
  15: <hyper::client::connect::http::HttpConnecting<R> as core::future::future::Future>::poll
             at /Users/milesj/.cargo/registry/src/github.com-1ecc6299db9ec823/hyper-0.14.19/src/client/connect/http.rs:406:9
  16: <futures_util::future::either::Either<A,B> as core::future::future::Future>::poll
             at /Users/milesj/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/future/either.rs:89:32
  17: <futures_util::future::either::Either<A,B> as core::future::future::Future>::poll
             at /Users/milesj/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/future/either.rs:89:32
  18: <hyper_tls::client::HttpsConnector<T> as tower_service::Service<http::uri::Uri>>::call::{{closure}}
             at /Users/milesj/.cargo/registry/src/github.com-1ecc6299db9ec823/hyper-tls-0.5.0/src/client.rs:136:33
  19: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /rustc/69f9c33d71c871fc16ac445211281c6e7a340943/library/core/src/future/mod.rs:91:19
  20: <core::pin::Pin<P> as core::future::future::Future>::poll
             at /rustc/69f9c33d71c871fc16ac445211281c6e7a340943/library/core/src/future/future.rs:124:9
  21: <hyper_tls::client::HttpsConnecting<T> as core::future::future::Future>::poll
             at /Users/milesj/.cargo/registry/src/github.com-1ecc6299db9ec823/hyper-tls-0.5.0/src/client.rs:162:9
  22: reqwest::connect::Connector::connect_with_maybe_proxy::{{closure}}
             at /Users/milesj/.cargo/registry/src/github.com-1ecc6299db9ec823/reqwest-0.11.12/src/connect.rs:378:40
  23: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /rustc/69f9c33d71c871fc16ac445211281c6e7a340943/library/core/src/future/mod.rs:91:19
  24: reqwest::connect::with_timeout::{{closure}}
             at /Users/milesj/.cargo/registry/src/github.com-1ecc6299db9ec823/reqwest-0.11.12/src/connect.rs:546:10
  25: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /rustc/69f9c33d71c871fc16ac445211281c6e7a340943/library/core/src/future/mod.rs:91:19
  26: <core::pin::Pin<P> as core::future::future::Future>::poll
             at /rustc/69f9c33d71c871fc16ac445211281c6e7a340943/library/core/src/future/future.rs:124:9
  27: <hyper::service::oneshot::Oneshot<S,Req> as core::future::future::Future>::poll
             at /Users/milesj/.cargo/registry/src/github.com-1ecc6299db9ec823/hyper-0.14.19/src/service/oneshot.rs:60:28
  28: <F as futures_core::future::TryFuture>::try_poll
             at /Users/milesj/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-core-0.3.25/src/future.rs:82:9
  29: <futures_util::future::try_future::into_future::IntoFuture<Fut> as core::future::future::Future>::poll
             at /Users/milesj/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/future/try_future/into_future.rs:34:9
  30: <futures_util::future::future::map::Map<Fut,F> as core::future::future::Future>::poll
             at /Users/milesj/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/future/future/map.rs:55:37
  31: <futures_util::future::future::Map<Fut,F> as core::future::future::Future>::poll
             at /Users/milesj/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/lib.rs:91:13
  32: <futures_util::future::try_future::MapErr<Fut,F> as core::future::future::Future>::poll
             at /Users/milesj/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/lib.rs:91:13
  33: <F as futures_core::future::TryFuture>::try_poll
             at /Users/milesj/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-core-0.3.25/src/future.rs:82:9
  34: <futures_util::future::try_future::into_future::IntoFuture<Fut> as core::future::future::Future>::poll
             at /Users/milesj/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/future/try_future/into_future.rs:34:9
  35: <futures_util::future::future::map::Map<Fut,F> as core::future::future::Future>::poll
             at /Users/milesj/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/future/future/map.rs:55:37
  36: <futures_util::future::future::Map<Fut,F> as core::future::future::Future>::poll
             at /Users/milesj/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/lib.rs:91:13
  37: <futures_util::future::try_future::MapOk<Fut,F> as core::future::future::Future>::poll
             at /Users/milesj/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/lib.rs:91:13
  38: <F as futures_core::future::TryFuture>::try_poll
             at /Users/milesj/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-core-0.3.25/src/future.rs:82:9
  39: <futures_util::future::try_future::try_flatten::TryFlatten<Fut,<Fut as futures_core::future::TryFuture>::Ok> as core::future::future::Future>::poll
             at /Users/milesj/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/future/try_future/try_flatten.rs:49:61
  40: <futures_util::future::try_future::TryFlatten<Fut1,Fut2> as core::future::future::Future>::poll
             at /Users/milesj/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/lib.rs:91:13
  41: <futures_util::future::try_future::AndThen<Fut1,Fut2,F> as core::future::future::Future>::poll
             at /Users/milesj/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/lib.rs:91:13
  42: <futures_util::future::either::Either<A,B> as core::future::future::Future>::poll
             at /Users/milesj/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/future/either.rs:89:32
  43: <hyper::common::lazy::Lazy<F,R> as core::future::future::Future>::poll
             at /Users/milesj/.cargo/registry/src/github.com-1ecc6299db9ec823/hyper-0.14.19/src/common/lazy.rs:69:28
  44: futures_util::future::future::FutureExt::poll_unpin
             at /Users/milesj/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/future/future/mod.rs:562:9
  45: <futures_util::future::select::Select<A,B> as core::future::future::Future>::poll
             at /Users/milesj/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/future/select.rs:108:35
  46: hyper::client::client::Client<C,B>::connection_for::{{closure}}
             at /Users/milesj/.cargo/registry/src/github.com-1ecc6299db9ec823/hyper-0.14.19/src/client/client.rs:358:48
  47: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /rustc/69f9c33d71c871fc16ac445211281c6e7a340943/library/core/src/future/mod.rs:91:19
  48: hyper::client::client::Client<C,B>::send_request::{{closure}}
             at /Users/milesj/.cargo/registry/src/github.com-1ecc6299db9ec823/hyper-0.14.19/src/client/client.rs:230:61
  49: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /rustc/69f9c33d71c871fc16ac445211281c6e7a340943/library/core/src/future/mod.rs:91:19
  50: hyper::client::client::Client<C,B>::retryably_send_request::{{closure}}
             at /Users/milesj/.cargo/registry/src/github.com-1ecc6299db9ec823/hyper-0.14.19/src/client/client.rs:200:65
  51: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /rustc/69f9c33d71c871fc16ac445211281c6e7a340943/library/core/src/future/mod.rs:91:19
  52: <hyper::client::client::ResponseFuture as core::future::future::Future>::poll
             at /Users/milesj/.cargo/registry/src/github.com-1ecc6299db9ec823/hyper-0.14.19/src/client/client.rs:611:9
  53: <reqwest::async_impl::client::PendingRequest as core::future::future::Future>::poll
             at /Users/milesj/.cargo/registry/src/github.com-1ecc6299db9ec823/reqwest-0.11.12/src/async_impl/client.rs:1894:29
  54: <reqwest::async_impl::client::Pending as core::future::future::Future>::poll
             at /Users/milesj/.cargo/registry/src/github.com-1ecc6299db9ec823/reqwest-0.11.12/src/async_impl/client.rs:1873:51
  55: reqwest::get::{{closure}}
             at /Users/milesj/.cargo/registry/src/github.com-1ecc6299db9ec823/reqwest-0.11.12/src/lib.rs:263:47
  56: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /rustc/69f9c33d71c871fc16ac445211281c6e7a340943/library/core/src/future/mod.rs:91:19
  57: proto_core::resolver::load_versions_manifest::{{closure}}
             at ./crates/proto/core/src/resolver.rs:125:37
  58: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /rustc/69f9c33d71c871fc16ac445211281c6e7a340943/library/core/src/future/mod.rs:91:19
  59: proto_node::depman::resolve::<impl proto_core::resolver::Resolvable for proto_node::depman::NodeDependencyManager>::load_manifest::{{closure}}
             at ./crates/proto/node/src/depman/resolve.rs:54:9
  60: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /rustc/69f9c33d71c871fc16ac445211281c6e7a340943/library/core/src/future/mod.rs:91:19
  61: <core::pin::Pin<P> as core::future::future::Future>::poll
             at /rustc/69f9c33d71c871fc16ac445211281c6e7a340943/library/core/src/future/future.rs:124:9
  62: proto_node::depman::resolve::<impl proto_core::resolver::Resolvable for proto_node::depman::NodeDependencyManager>::resolve_version::{{closure}}
             at ./crates/proto/node/src/depman/resolve.rs:98:44
  63: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /rustc/69f9c33d71c871fc16ac445211281c6e7a340943/library/core/src/future/mod.rs:91:19
  64: <core::pin::Pin<P> as core::future::future::Future>::poll
             at /rustc/69f9c33d71c871fc16ac445211281c6e7a340943/library/core/src/future/future.rs:124:9
  65: proto_core::Tool::is_setup::{{closure}}
             at ./crates/proto/core/src/lib.rs:88:46
  66: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /rustc/69f9c33d71c871fc16ac445211281c6e7a340943/library/core/src/future/mod.rs:91:19
  67: <core::pin::Pin<P> as core::future::future::Future>::poll
             at /rustc/69f9c33d71c871fc16ac445211281c6e7a340943/library/core/src/future/future.rs:124:9
  68: <moon_node_tool::yarn_tool::YarnTool as moon_tool::tool::Tool>::setup::{{closure}}
             at ./crates/node/tool/src/yarn_tool.rs:96:52
  69: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /rustc/69f9c33d71c871fc16ac445211281c6e7a340943/library/core/src/future/mod.rs:91:19
  70: <core::pin::Pin<P> as core::future::future::Future>::poll
             at /rustc/69f9c33d71c871fc16ac445211281c6e7a340943/library/core/src/future/future.rs:124:9
  71: <moon_node_tool::node_tool::NodeTool as moon_tool::tool::Tool>::setup::{{closure}}
             at ./crates/node/tool/src/node_tool.rs:186:51
  72: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /rustc/69f9c33d71c871fc16ac445211281c6e7a340943/library/core/src/future/mod.rs:91:19
  73: <core::pin::Pin<P> as core::future::future::Future>::poll
             at /rustc/69f9c33d71c871fc16ac445211281c6e7a340943/library/core/src/future/future.rs:124:9
  74: moon_tool::manager::ToolManager<T>::setup::{{closure}}
             at ./crates/core/tool/src/manager.rs:60:57
  75: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /rustc/69f9c33d71c871fc16ac445211281c6e7a340943/library/core/src/future/mod.rs:91:19
  76: <moon_node_platform::platform::NodePlatform as moon_platform::platform::Platform>::setup_tool::{{closure}}
             at ./crates/node/platform/src/platform.rs:339:57
  77: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /rustc/69f9c33d71c871fc16ac445211281c6e7a340943/library/core/src/future/mod.rs:91:19
  78: <core::pin::Pin<P> as core::future::future::Future>::poll
             at /rustc/69f9c33d71c871fc16ac445211281c6e7a340943/library/core/src/future/future.rs:124:9
  79: moon_action_pipeline::actions::setup_tool::setup_tool::{{closure}}
             at ./crates/core/action-pipeline/src/actions/setup_tool.rs:38:9
  80: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /rustc/69f9c33d71c871fc16ac445211281c6e7a340943/library/core/src/future/mod.rs:91:19
  81: moon_action_pipeline::processor::process_action::{{closure}}
             at ./crates/core/action-pipeline/src/processor.rs:59:84
  82: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /rustc/69f9c33d71c871fc16ac445211281c6e7a340943/library/core/src/future/mod.rs:91:19
  83: moon_action_pipeline::pipeline::Pipeline::run::{{closure}}::{{closure}}
             at ./crates/core/action-pipeline/src/pipeline.rs:142:25
  84: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /rustc/69f9c33d71c871fc16ac445211281c6e7a340943/library/core/src/future/mod.rs:91:19
  85: rusty_pool::ThreadPool::try_spawn_await::{{closure}}
             at /Users/milesj/.cargo/registry/src/github.com-1ecc6299db9ec823/rusty_pool-0.7.0/src/lib.rs:668:32
  86: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /rustc/69f9c33d71c871fc16ac445211281c6e7a340943/library/core/src/future/mod.rs:91:19
  87: <alloc::sync::Arc<rusty_pool::AsyncTask> as rusty_pool::Task<()>>::run
             at /Users/milesj/.cargo/registry/src/github.com-1ecc6299db9ec823/rusty_pool-0.7.0/src/lib.rs:120:16
  88: rusty_pool::ThreadPool::try_execute::{{closure}}
             at /Users/milesj/.cargo/registry/src/github.com-1ecc6299db9ec823/rusty_pool-0.7.0/src/lib.rs:488:17
  89: core::ops::function::FnOnce::call_once{{vtable.shim}}
             at /rustc/69f9c33d71c871fc16ac445211281c6e7a340943/library/core/src/ops/function.rs:251:5
  90: <alloc::boxed::Box<F,A> as core::ops::function::FnOnce<Args>>::call_once
             at /rustc/69f9c33d71c871fc16ac445211281c6e7a340943/library/alloc/src/boxed.rs:1987:9
  91: rusty_pool::Worker::exec_task_and_notify
             at /Users/milesj/.cargo/registry/src/github.com-1ecc6299db9ec823/rusty_pool-0.7.0/src/lib.rs:1119:9
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.

 ERROR  Unknown error!

It looks like rusty is running, and a lot of my code is running as well, and then it fails on hyper trying to do something (via the reqwest crate. I'm using reqwest with default features, so I'm really not sure what to dive into next.

milesj commented 1 year ago

I tried enabling the hyper runtime feature. No luck. https://docs.rs/hyper/0.14.23/hyper/#optional-features

robinfriedli commented 1 year ago

I don't think it's possible to use async reqwest without calling it from a thread belonging to a tokio runtime.

This produces the same error you're getting:

#[tokio::main]
async fn main() {
    let pool = rusty_pool::ThreadPool::default();
    let handle: rusty_pool::JoinHandle<Result<(), ()>> = pool.spawn_await(async {
        let resp = reqwest::get("https://httpbin.org/ip")
            .await
            .map_err(|_| ())?
            .json::<HashMap<String, String>>()
            .await
            .map_err(|_| ())?;
        println!("{:#?}", resp);
        Ok(())
    });

    let _ = handle.receiver.await;
}

But setting up a runtime for the function you call inside the pool works:

#[tokio::main]
async fn main() {
    let pool = rusty_pool::ThreadPool::default();
    let handle: rusty_pool::JoinHandle<Result<(), ()>> = pool.spawn_await(async { inner() });

    let _ = handle.receiver.await;
}

#[tokio::main]
async fn inner() -> Result<(), ()> {
    let resp = reqwest::get("https://httpbin.org/ip")
        .await
        .map_err(|_| ())?
        .json::<HashMap<String, String>>()
        .await
        .map_err(|_| ())?;
    println!("{:#?}", resp);
    Ok(())
}

I'm honestly a bit surprised at this myself, I'm fairly sure I've done the former in the past myself, not sure if that's a new requirement.

The second code snippet would set up a multithreaded tokio runtime for each thread in the pool on each call, so it's not ideal. Using #[tokio::main(flavor = "current_thread")] would improve that a bit, but you'd still have the overhead of setting up a runtime on each call. It's probably best to set up a tokio runtime for the current rusty_pool thread using runtime::Builder::new_current_thread(). Either that, or you could pass a handle to your main tokio runtime using Handle::current() and run the reqwest in there, that would mean that reqwest's tasks run within the tokio runtime instead of rusty_pool but since that's mostly IO waiting for requests to complete it shouldn't require a lot of threads anyway. The latter would be easiest, the former might not be as easy as all that and might require to change the pool to optionally wrap the whole worker loop in a block_on call. I've experimented a bit setting up thread local tokio::runtime::Builder::new_current_thread() runtimes for workers but that results in hanging futures not polled to completion without a root block_on call, which would conflict with blocking stuff and the current implementation using crossbeam. Since this is a "general purpose" threadpool, not an async runtime, I think I'm gonna leave that for now, any direct tokio integration would likely be quite messy.

Using a Handle to submit futures that require a tokio runtime and awaiting the result in rusty_pool should work well though:

#[tokio::main]
async fn main() {
    let pool = rusty_pool::ThreadPool::default();
    let tokio_handle = tokio::runtime::Handle::current();
    let handle: rusty_pool::JoinHandle<Result<(), reqwest::Error>> = pool.spawn_await(async move {
        let resp = tokio_handle
            .spawn(async {
                reqwest::get("https://httpbin.org/ip")
                    .await?
                    .json::<HashMap<String, String>>()
                    .await
            })
            .await
            .unwrap()?;
        println!("{:#?}", resp);
        /* other non-io stuff */
        Ok(())
    });

    handle.await_complete().unwrap();
}

That way the tokio stuff stays in tokio and you can use rusty_pool for everything else without having to block any threads or keep setting up new tokio runtimes.

milesj commented 1 year ago

Ok that makes a lot more sense, thanks for diving into this a bit. Glad I wasn't going crazy.

Also, pulling in the current tokio runtime is super clever and totally worked. I never would of thought of trying that. https://github.com/moonrepo/moon/pull/527

Thanks a bunch, will close this.

robinfriedli commented 1 year ago

Nice, glad it's working now.

I looked at your PR a bit and I have two notes: I was wondering why you switched from spawn_await to complete. spawn_await would be more elegant here because complete simply blocks a worker until the future is complete whereas spawn_await works more like a proper futures executor that polls futures step by step and awakens them by resubmitting them to the pool when progress can be made / when the future is awakened. Since you're submitting tasks in a loop that means you could end up spawning and blocking a worker for each iteration, where spawn_await could share the work more efficiently, only spawning new workers when workers are actually busy, not blocked while waiting.

Also, I noticed that you wrap the entire process_action call inside a tokio handler call. This works, but means that this pools is only really used to wait for tokio to do all the work. You'll get more out of this pool if you pass the Handle to the process_action function and use it there where necessary. Then you can use tokio to run reqwest's IO-bound task and use this pool for the cpu-bound or blocking work (if you have any) and get the best of both worlds.

I could open a PR if you like

milesj commented 1 year ago

@robinfriedli I switched to complete because visually (in the terminal) it acts more like how a user expects it to work. For example, if we pass --concurrency 1, we'd only use 1 core/thread, and the terminal will process a task 1 by 1. This is very apparent as we log a message when the task is starting and finishing.

For spawn_await, it would process every task in the batch and visually it looks like it's running X tasks in parallel as it logs a ton of starting messages. Although under the hood it's only using 1 thread, to the end user, it kind of looks like a bug and that it's not working as intended.

As for the process_action handle, the entirety of that function and all of its branching logic are async and require Tokio. I'm not sure how I could break it up, but I'll give it a shot.

robinfriedli commented 1 year ago

@milesj Ah I see. If it's just tokio stuff and you don't do any cpu intensive or blocking work in there it doesn't matter much, you'd just get some more effective use out of the threads you spawn. If it could be relevant for your users that tasks are executed sequentially then using complete does make sense.