pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
29.71k stars 1.9k forks source link

Polars scan_parquet_files blocking tokio async main thread #13163

Open horizondeep opened 9 months ago

horizondeep commented 9 months ago

Checks

Reproducible example


//Function for reading files from cloud
fn fetch_data (store: Vec<&str>,mpath:&str,args:ScanArgsParquet) -> Result<LazyFrame, PolarsError> {

    let mut paths:Vec<PathBuf> = Vec::new();
    for &s in store.iter(){
        let p = format!("{}{}{}{}","azure:/",mpath,&s,".parquet");

        let p1 = PathBuf::from(p);
        paths.push(p1);
    }
    let df = LazyFrame::scan_parquet_files(paths.into(),args).unwrap().with_streaming(true);

    match df {
        a => return Ok(a),
        _  => return Err(PolarsError::NoData("Not data found".into()))
    };

}

#[tokio::main]
async fn main() -> PolarsResult<()>{
    //Provide Blob Storage Name and Key
    let AccountName = std::env::var("STORAGE_ACC_NAME").expect("missing STORAGE_ACCOUNT");
    let AccountKey = std::env::var("access_key").expect("missing STORAGE_ACCOUNT_KEY");
    let AccountKey = std::env::var("container_name").expect("missing STORAGE_ACCOUNT_KEY");

    const TEST_S2: &str = "https://cprxt.blob.core.windows.net/blob_1";

    //set up cloud options    
    let cloud_options=cloud::CloudOptions::default().with_azure([(Key::AccountName,AccountName),(Key::AccessKey,AccountKey),(Key::ContainerName,ContName),(Key::Endpoint,TEST_S2)]);
    let mut args = ScanArgsParquet::default();

    //Set Options for scan parquet 
    args.row_count=None;
    args.n_rows = None;
    args.low_memory=false; 
    args.use_statistics = false;
    args.cache= false;
    args.parallel = ParallelStrategy::RowGroups;
    args.cloud_options = Some(cloud_options);

    //Define Master Paths 
    let mpath_fp = "/read_parquet/fp_data/";
    let mpath_product = "/read_parquet/product_data/";   
    let mpath_sales = "/read_parquet/sales/";    
    let mpath_attribute_product = "/read_parquet/attribute_product/";
    let mpath_vars = "/read_parquet/vars/";

    //Parquet Partitions for all the files
    let mut input1 = vec!["1 ------- 33","4 ------- 33", "6 ------- 33", "7 ------- 33", "10 ------- 33"]; 

    //////New function for parquet retrieval # Using scan parquet inside spawn_blocking to avoid blocking the main thread by polars
    let dfs= tokio::task::spawn_blocking( move || {
        let fp = fetch_data(input1.clone(),mpath_fp,args.clone()).unwrap();
        let df_product = fetch_data(input1.clone(),mpath_product,args.clone()).unwrap();
        let df_sales = fetch_data(input1.clone(),mpath_sales,args.clone()).unwrap();
        let mut df_vars = fetch_data(input1.clone(),mpath_vars,args.clone()).unwrap();

        let dp=    collect_all(vec![fp,df_product,df_sales,df_vars]).unwrap();

        (dp)

    }).await.expect("Task panicked");

    Ok(())

    }

Log output

thread 'main' panicked at 'Cannot start a runtime from within a runtime. This happens because a function (like `block_on`) attempted to block the current thread while the thread is being used to drive asynchronous tasks.', /home/xyx/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.21/src/runtime/enter.rs:38:5
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

Issue description

When running scan_parquet_files function inside async tokio function, polars starts to block the async main tokio function. The workaround for that is to put scan_parquet_files code inside a tokio::task::spawn_blocking block. It works most of the times but sometimes it throws the error that "cannot start runtime within a runtime". The issue very unpredictible. To reproduce the issue we might have to run somtimes 10 times the same code or sometimes within 1-2 runs this error comes up.

Expected behavior

Polars scan_parquet_files should not block async tokio runtime thread.

Installed versions

polars = { version = "0.35.4", features = ["parquet","azure","serde", "json", "lazy", "pivot", "rank", "log","lazy_regex","horizontal_concat","polars-lazy","dtype-struct","dtype-array","concat_str","is_in","polars-io","strings","cum_agg","cloud","streaming","dtype-full"] } serde_json = { version = "1.0", default-features = false, features = ["alloc"] } polars-ops = "0.35.4" postgres = {version="0.19.5",features = ["with-chrono-0_4"]} tokio-postgres = "0.7.8" postgres_query = "0.3.3" tokio = {version= "1.29.1", features = ["full"] }
horizondeep commented 9 months ago

Hi @ritchie46. Can you please have a look into it.

horizondeep commented 9 months ago

Any update on this @ritchie46 / @stinodego ....?

orlp commented 9 months ago

@horizondeep Could you post an error with the full backtrace from RUST_BACKTRACE=1?

horizondeep commented 8 months ago

Hi @orlp. Apologies for the delayed response. Below is the backtrace:

[2024-01-24T18:50:32.512Z] thread '' panicked at /home/amitr/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.34.0/src/runtime/scheduler/multi_thread/mod.rs:86:9: [2024-01-24T18:50:32.512Z] Cannot start a runtime from within a runtime. This happens because a function (like block_on) attempted to block the current thread while the thread is being used to drive asynchronous tasks. [2024-01-24T18:50:32.512Z] stack backtrace: [2024-01-24T18:50:32.554Z] 0: rust_begin_unwind [2024-01-24T18:50:32.554Z] at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/std/src/panicking.rs:595:5 [2024-01-24T18:50:32.554Z] 1: core::panicking::panic_fmt [2024-01-24T18:50:32.554Z] at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/core/src/panicking.rs:67:14 [2024-01-24T18:50:32.555Z] 2: tokio::runtime::context::runtime::enter_runtime [2024-01-24T18:50:32.556Z] 3: tokio::runtime::runtime::Runtime::block_on [2024-01-24T18:50:32.557Z] 4: ::get_batches [2024-01-24T18:50:32.558Z] 5: <core::panic::unwind_safe::AssertUnwindSafe as core::ops::function::FnOnce<()>>::call_once [2024-01-24T18:50:32.559Z] 6: <rayon_core::job::HeapJob as rayon_core::job::Job>::execute [2024-01-24T18:50:32.560Z] 7: rayon_core::registry::WorkerThread::wait_until_cold [2024-01-24T18:50:32.561Z] 8: rayon_core::join::join_context::{{closure}} [2024-01-24T18:50:32.562Z] 9: rayon_core::registry::in_worker [2024-01-24T18:50:32.564Z] 10: rayon::iter::plumbing::bridge_producer_consumer::helper [2024-01-24T18:50:32.564Z] 11: rayon::iter::from_par_iter::collect_extended [2024-01-24T18:50:32.564Z] 12: rayon::result::<impl rayon::iter::FromParallelIterator<core::result::Result<T,E>> for core::result::Result<C,E>>::from_par_iter [2024-01-24T18:50:32.564Z] 13: rayon_core::registry::Registry::in_worker [2024-01-24T18:50:32.565Z] 14: polars_io::parquet::read_impl::rg_to_dfs [2024-01-24T18:50:32.565Z] 15: polars_io::parquet::read_impl::BatchedParquetReader::next_batches::{{closure}} [2024-01-24T18:50:32.565Z] 16: tokio::runtime::park::CachedParkThread::block_on [2024-01-24T18:50:32.565Z] 17: tokio::runtime::context::runtime::enter_runtime [2024-01-24T18:50:32.565Z] 18: tokio::runtime::runtime::Runtime::block_on [2024-01-24T18:50:32.566Z] 19: polars_io::pl_async::RuntimeManager::block_on_potential_spawn [2024-01-24T18:50:32.566Z] 20: ::get_batches [2024-01-24T18:50:32.566Z] 21: ::get_batches [2024-01-24T18:50:32.566Z] 22: <core::panic::unwind_safe::AssertUnwindSafe as core::ops::function::FnOnce<()>>::call_once [2024-01-24T18:50:32.566Z] 23: <rayon_core::job::HeapJob as rayon_core::job::Job>::execute [2024-01-24T18:50:32.566Z] 24: rayon_core::registry::WorkerThread::wait_until_cold [2024-01-24T18:50:32.567Z] 25: rayon_core::scope::ScopeBase::complete [2024-01-24T18:50:32.567Z] 26: rayon_core::scope::scope::{{closure}} [2024-01-24T18:50:32.567Z] 27: polars_pipe::pipeline::dispatcher::PipeLine::run_pipeline_no_finalize [2024-01-24T18:50:32.567Z] 28: polars_pipe::pipeline::dispatcher::PipeLine::run_pipeline [2024-01-24T18:50:32.567Z] 29: polars_pipe::pipeline::dispatcher::PipeLine::execute [2024-01-24T18:50:32.569Z] 30: ::call_udf [2024-01-24T18:50:32.570Z] 31: polars_plan::logical_plan::functions::FunctionNode::evaluate [2024-01-24T18:50:32.571Z] 32: polars_lazy::physical_plan::state::ExecutionState::record [2024-01-24T18:50:32.571Z] 33: ::execute [2024-01-24T18:50:32.572Z] 34: polars_lazy::frame::LazyFrame::collect [2024-01-24T18:50:32.573Z] 35: <alloc::vec::Vec<T,A> as alloc::vec::spec_extend::SpecExtend<T,I>>::spec_extend [2024-01-24T18:50:32.574Z] 36: rayon::iter::plumbing::bridge_producer_consumer::helper [2024-01-24T18:50:32.575Z] 37: rayon_core::join::join_context::{{closure}} [2024-01-24T18:50:32.575Z] 38: rayon_core::registry::in_worker [2024-01-24T18:50:32.575Z] 39: rayon::iter::plumbing::bridge_producer_consumer::helper [2024-01-24T18:50:32.576Z] 40: <rayon_core::job::StackJob<L,F,R> as rayon_core::job::Job>::execute [2024-01-24T18:50:32.576Z] 41: rayon_core::registry::WorkerThread::wait_until_cold [2024-01-24T18:50:32.576Z] 42: rayon_core::registry::ThreadBuilder::run [2024-01-24T18:50:32.587Z] note: Some details are omitted, run with RUST_BACKTRACE=full for a verbose backtrace. [2024-01-24T18:50:36.363Z] thread 'tokio-runtime-worker' panicked at src/main.rs:236:15: [2024-01-24T18:50:36.363Z] Task panicked: Any { .. } [2024-01-24T18:50:36.364Z] stack backtrace: [2024-01-24T18:50:36.364Z] 0: rust_begin_unwind [2024-01-24T18:50:36.364Z] at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/std/src/panicking.rs:595:5 [2024-01-24T18:50:36.364Z] 1: core::panicking::panic_fmt [2024-01-24T18:50:36.364Z] at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/core/src/panicking.rs:67:14 [2024-01-24T18:50:36.364Z] 2: core::result::unwrap_failed [2024-01-24T18:50:36.364Z] at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/core/src/result.rs:1652:5 [2024-01-24T18:50:36.364Z] 3: handler::get_data::{{closure}} [2024-01-24T18:50:36.364Z] 4: <warp::filter::and_then::AndThenFuture<T,F> as core::future::future::Future>::poll [2024-01-24T18:50:36.366Z] 5: hyper::proto::h1::dispatch::Dispatcher<D,Bs,I,T>::poll_loop [2024-01-24T18:50:36.367Z] 6: hyper::proto::h1::dispatch::Dispatcher<D,Bs,I,T>::poll_catch [2024-01-24T18:50:36.369Z] 7: <hyper::server::conn::upgrades::UpgradeableConnection<I,S,E> as core::future::future::Future>::poll [2024-01-24T18:50:36.369Z] 8: <hyper::server::server::new_svc::NewSvcTask<I,N,S,E,W> as core::future::future::Future>::poll [2024-01-24T18:50:36.369Z] 9: tokio::runtime::task::core::Core<T,S>::poll [2024-01-24T18:50:36.369Z] 10: tokio::runtime::task::harness::Harness<T,S>::poll [2024-01-24T18:50:36.369Z] 11: tokio::runtime::scheduler::multi_thread::worker::Context::run_task [2024-01-24T18:50:36.372Z] 12: tokio::runtime::scheduler::multi_thread::worker::run [2024-01-24T18:50:36.372Z] 13: tokio::runtime::task::core::Core<T,S>::poll [2024-01-24T18:50:36.372Z] 14: tokio::runtime::task::harness::Harness<T,S>::poll [2024-01-24T18:50:36.375Z] note: Some details are omitted, run with RUST_BACKTRACE=full for a verbose backtrace.

horizondeep commented 8 months ago

Hi @orlp. This is with Rust_Backtrace=full.

**Cannot start a runtime from within a runtime. This happens because a function (like block_on) attempted to block the current thread while the thread is being used to drive asynchronous tasks. thread '' panicked at /home/amitr/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.34.0/src/runtime/scheduler/multi_thread/mod.rs:86:9: stack backtrace: 0: 0x7f773f121fcc - std::backtrace_rs::backtrace::libunwind::trace::h370587616c149a45 at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/std/src/../../backtrace/src/backtrace/libunwind.rs:93:5 1: 0x7f773f121fcc - std::backtrace_rs::backtrace::trace_unsynchronized::h4cc55038e4a874cd at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/std/src/../../backtrace/src/backtrace/mod.rs:66:5 2: 0x7f773f121fcc - std::sys_common::backtrace::_print_fmt::hc26cc3c2eddc21d7 at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/std/src/sys_common/backtrace.rs:67:5 3: 0x7f773f121fcc - ::fmt::hb67f6e94152691e9 at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/std/src/sys_common/backtrace.rs:44:22 4: 0x7f773f172fcc - core::fmt::rt::Argument::fmt::h1fd35fc4c88687b8 at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/core/src/fmt/rt.rs:138:9 5: 0x7f773f172fcc - core::fmt::write::h738793984be7f52c at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/core/src/fmt/mod.rs:1094:21 6: 0x7f773f11d1ee - std::io::Write::write_fmt::h84056616c47209ca at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/std/src/io/mod.rs:1714:15 7: 0x7f773f121db4 - std::sys_common::backtrace::_print::h20a11f09f8216cd4 at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/std/src/sys_common/backtrace.rs:47:5 8: 0x7f773f121db4 - std::sys_common::backtrace::print::h7b70e260a2475fa3 at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/std/src/sys_common/backtrace.rs:34:9 9: 0x7f773f1236a3 - std::panicking::default_hook::{{closure}}::h81c979645f368377 at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/std/src/panicking.rs:270:22 10: 0x7f773f1233bc - std::panicking::default_hook::h157ab9bf73bc8932 at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/std/src/panicking.rs:290:9 11: 0x7f773f123c99 - std::panicking::rust_panic_with_hook::hec9e488e5d9ea17f at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/std/src/panicking.rs:707:13 12: 0x7f773f123b51 - std::panicking::begin_panic_handler::{{closure}}::h83232311f2354a3d at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/std/src/panicking.rs:597:13 13: 0x7f773f1224f6 - std::sys_common::backtrace::rust_end_short_backtrace::h8a82556d18c9cf5a at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/std/src/sys_common/backtrace.rs:170:18 14: 0x7f773f1238e2 - rust_begin_unwind at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/std/src/panicking.rs:595:5 15: 0x7f773f16f993 - core::panicking::panic_fmt::h9fb1cf80aef8bb8a at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/core/src/panicking.rs:67:14 16: 0x7f773d7a6c56 - tokio::runtime::context::runtime::enter_runtime::h4281f5fbdc83538b 17: 0x7f773d87ad92 - tokio::runtime::runtime::Runtime::block_on::ha627cbfbf6922f2f 18: 0x7f773d830d8f - ::get_batches::ha80a4cd888c64b72 19: 0x7f773d7fc185 - <core::panic::unwind_safe::AssertUnwindSafe as core::ops::function::FnOnce<()>>::call_once::ha2c475d1ecf1070f 20: 0x7f773d80641e - <rayon_core::job::HeapJob as rayon_core::job::Job>::execute::hf125101cfa544925 21: 0x7f773f0ed63f - rayon_core::registry::WorkerThread::wait_until_cold::hab7b263a470d21b8 22: 0x7f773dad761c - rayon_core::join::join_context::{{closure}}::hcb1c338a91b903de 23: 0x7f773dadb1c6 - rayon_core::registry::in_worker::habdde84105aa856f 24: 0x7f773dc1d6c3 - rayon::iter::plumbing::bridge_producer_consumer::helper::hffa53f86b8040afb 25: 0x7f773dad7549 - rayon_core::join::join_context::{{closure}}::hcb1c338a91b903de 26: 0x7f773dadb1c6 - rayon_core::registry::in_worker::habdde84105aa856f 27: 0x7f773dc1d6c3 - rayon::iter::plumbing::bridge_producer_consumer::helper::hffa53f86b8040afb 28: 0x7f773db88a5c - rayon::iter::from_par_iter::collect_extended::hefdfe5fe90cf558f 29: 0x7f773dc95991 - rayon::result::<impl rayon::iter::FromParallelIterator<core::result::Result<T,E>> for core::result::Result<C,E>>::from_par_iter::h9a187d36e7a60d8f 30: 0x7f773dada501 - rayon_core::registry::Registry::in_worker::h9afa4338732f129e 31: 0x7f773dcc3623 - polars_io::parquet::read_impl::rg_to_dfs::ha9837fbb9447fa9e 32: 0x7f773d7b048f - polars_io::parquet::read_impl::BatchedParquetReader::next_batches::{{closure}}::haa202b40813714a1 33: 0x7f773d7a66fd - tokio::runtime::park::CachedParkThread::block_on::hb1f9ec5f4e2e0b73 34: 0x7f773d7a71df - tokio::runtime::context::runtime::enter_runtime::hbdb8863b8bccfc3d 35: 0x7f773d87ac62 - tokio::runtime::runtime::Runtime::block_on::h3d4d21db2a3ea425 36: 0x7f773d78fdd8 - polars_io::pl_async::RuntimeManager::block_on_potential_spawn::hf6018e806fb687ae 37: 0x7f773d830f04 - ::get_batches::ha80a4cd888c64b72 38: 0x7f773d831164 - ::get_batches::ha80a4cd888c64b72 39: 0x7f773d7fc185 - <core::panic::unwind_safe::AssertUnwindSafe as core::ops::function::FnOnce<()>>::call_once::ha2c475d1ecf1070f 40: 0x7f773d80641e - <rayon_core::job::HeapJob as rayon_core::job::Job>::execute::hf125101cfa544925 41: 0x7f773f0ed5fb - rayon_core::registry::WorkerThread::wait_until_cold::hab7b263a470d21b8 42: 0x7f773d7fb546 - rayon_core::scope::ScopeBase::complete::hef5a4745608aac57 43: 0x7f773d838e13 - rayon_core::scope::scope::{{closure}}::h695eabf9932ba2ea 44: 0x7f773d854501 - polars_pipe::pipeline::dispatcher::PipeLine::run_pipeline_no_finalize::h9e6ee68e3991f0af 45: 0x7f773d85566b - polars_pipe::pipeline::dispatcher::PipeLine::run_pipeline::hb3beb6df1b53eb45 46: 0x7f773d8558dd - polars_pipe::pipeline::dispatcher::PipeLine::execute::hc27ddb2ed98c5905 47: 0x7f773d4a09ab - ::call_udf::h58fdce7fadd41f63 48: 0x7f773d966b0b - polars_plan::logical_plan::functions::FunctionNode::evaluate::hf4b14b366ee87a23 49: 0x7f773d6d2c67 - polars_lazy::physical_plan::state::ExecutionState::record::h1c7d195396666c5e 50: 0x7f773d4b0e3c - ::execute::h849c49ee0952b7b3 51: 0x7f773d6326d5 - polars_lazy::frame::LazyFrame::collect::h14915994e86c5f64 52: 0x7f773d3766a7 - <alloc::vec::Vec<T,A> as alloc::vec::spec_extend::SpecExtend<T,I>>::spec_extend::h9c2b0fc96526ab2d 53: 0x7f773d3d30f3 - rayon::iter::plumbing::bridge_producer_consumer::helper::h09f1e500e1a7da23 54: 0x7f773d456bee - rayon_core::join::join_context::{{closure}}::h5ad4011a4b945539 55: 0x7f773d46247e - rayon_core::registry::in_worker::h07fa3413f506bfba 56: 0x7f773d3d323b - rayon::iter::plumbing::bridge_producer_consumer::helper::h09f1e500e1a7da23 57: 0x7f773d39b23a - <rayon_core::job::StackJob<L,F,R> as rayon_core::job::Job>::execute::h6fc318b5e806d99b 58: 0x7f773f0ed63f - rayon_core::registry::WorkerThread::wait_until_cold::hab7b263a470d21b8 59: 0x7f773f0eba02 - rayon_core::registry::ThreadBuilder::run::h00a3ece25d87e00b 60: 0x7f773f0f0bfa - std::sys_common::backtrace::rust_begin_short_backtrace::h12997ee34c89b431 61: 0x7f773f0f1cef - core::ops::function::FnOnce::call_once{{vtable.shim}}::hc8252d0f1d62b261 62: 0x7f773f129305 - <alloc::boxed::Box<F,A> as core::ops::function::FnOnce>::call_once::hc4b69460ab0b58b3 at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/alloc/src/boxed.rs:2007:9 63: 0x7f773f129305 - <alloc::boxed::Box<F,A> as core::ops::function::FnOnce>::call_once::hff9ab78020349a37 at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/alloc/src/boxed.rs:2007:9 64: 0x7f773f129305 - std::sys::unix::thread::Thread::new::thread_start::h7fa77081a8285658 at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/std/src/sys/unix/thread.rs:108:17 thread 'tokio-runtime-worker' panicked at src/main.rs:236:15: Task panicked: Any { .. } stack backtrace: 0: 0x7f773f121fcc - std::backtrace_rs::backtrace::libunwind::trace::h370587616c149a45 at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/std/src/../../backtrace/src/backtrace/libunwind.rs:93:5 1: 0x7f773f121fcc - std::backtrace_rs::backtrace::trace_unsynchronized::h4cc55038e4a874cd at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/std/src/../../backtrace/src/backtrace/mod.rs:66:5 2: 0x7f773f121fcc - std::sys_common::backtrace::_print_fmt::hc26cc3c2eddc21d7 at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/std/src/sys_common/backtrace.rs:67:5 3: 0x7f773f121fcc - ::fmt::hb67f6e94152691e9 at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/std/src/sys_common/backtrace.rs:44:22 4: 0x7f773f172fcc - core::fmt::rt::Argument::fmt::h1fd35fc4c88687b8 at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/core/src/fmt/rt.rs:138:9 5: 0x7f773f172fcc - core::fmt::write::h738793984be7f52c at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/core/src/fmt/mod.rs:1094:21 6: 0x7f773f11d1ee - std::io::Write::write_fmt::h84056616c47209ca at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/std/src/io/mod.rs:1714:15 7: 0x7f773f121db4 - std::sys_common::backtrace::_print::h20a11f09f8216cd4 at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/std/src/sys_common/backtrace.rs:47:5 8: 0x7f773f121db4 - std::sys_common::backtrace::print::h7b70e260a2475fa3 at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/std/src/sys_common/backtrace.rs:34:9 9: 0x7f773f1236a3 - std::panicking::default_hook::{{closure}}::h81c979645f368377 at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/std/src/panicking.rs:270:22 10: 0x7f773f1233bc - std::panicking::default_hook::h157ab9bf73bc8932 at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/std/src/panicking.rs:290:9 11: 0x7f773f123c99 - std::panicking::rust_panic_with_hook::hec9e488e5d9ea17f at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/std/src/panicking.rs:707:13 12: 0x7f773f123b97 - std::panicking::begin_panic_handler::{{closure}}::h83232311f2354a3d at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/std/src/panicking.rs:599:13 13: 0x7f773f1224f6 - std::sys_common::backtrace::__rust_end_short_backtrace::h8a82556d18c9cf5a at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/std/src/sys_common/backtrace.rs:170:18 14: 0x7f773f1238e2 - rust_begin_unwind at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/std/src/panicking.rs:595:5 15: 0x7f773f16f993 - core::panicking::panic_fmt::h9fb1cf80aef8bb8a at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/core/src/panicking.rs:67:14 16: 0x7f773f16fd9a - core::result::unwrap_failed::hdeffda1fd287c7d3 at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/core/src/result.rs:1652:5 17: 0x7f773d33bb8f - handler::get_data::{{closure}}::h234312d6fe7a79cf 18: 0x7f773d313753 - <warp::filter::and_then::AndThenFuture<T,F> as core::future::future::Future>::poll::ha32da00390b1ec5c 19: 0x7f773d4138d6 - hyper::proto::h1::dispatch::Dispatcher<D,Bs,I,T>::poll_loop::he3ecbfaf344edcba 20: 0x7f773d41145d - hyper::proto::h1::dispatch::Dispatcher<D,Bs,I,T>::poll_catch::h6eace1f8a1923912 21: 0x7f773d446753 - <hyper::server::conn::upgrades::UpgradeableConnection<I,S,E> as core::future::future::Future>::poll::h82755f89a33b139b 22: 0x7f773d3d9f2e - <hyper::server::server::new_svc::NewSvcTask<I,N,S,E,W> as core::future::future::Future>::poll::h7cc97b7df93d5e3a 23: 0x7f773d40046a - tokio::runtime::task::core::Core<T,S>::poll::h46418eaf7fa91104 24: 0x7f773d3e9c91 - tokio::runtime::task::harness::Harness<T,S>::poll::hdaf5bdd65c2d3c5a 25: 0x7f773efde2f4 - tokio::runtime::scheduler::multi_thread::worker::Context::run_task::h625259bf34069285 26: 0x7f773efdcdf5 - tokio::runtime::scheduler::multi_thread::worker::run::h7931b8a8f08768fd 27: 0x7f773efe3f7a - tokio::runtime::task::core::Core<T,S>::poll::h5fb35a6579dbe07e 28: 0x7f773efbf9ee - tokio::runtime::task::harness::Harness<T,S>::poll::hdc6bdbf8dc2116bc 29: 0x7f773efd3a9a - std::sys_common::backtrace::__rust_begin_short_backtrace::h666c7963d72f0413 30: 0x7f773efc9f69 - core::ops::function::FnOnce::call_once{{vtable.shim}}::h24d7ae6c73a72280 31: 0x7f773f129305 - <alloc::boxed::Box<F,A> as core::ops::function::FnOnce>::call_once::hc4b69460ab0b58b3 at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/alloc/src/boxed.rs:2007:9 32: 0x7f773f129305 - <alloc::boxed::Box<F,A> as core::ops::function::FnOnce>::call_once::hff9ab78020349a37 at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/alloc/src/boxed.rs:2007:9 33: 0x7f773f129305 - std::sys::unix::thread::Thread::new::thread_start::h7fa77081a8285658 at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/std/src/sys/unix/thread.rs:108:17**

horizondeep commented 8 months ago

I wasn't aware that Polars is using Rayon in backend for multithreading.

Veiasai commented 8 months ago

I met the same issue. and a workaround is like

        let df = tokio::task::spawn_blocking(|| {
            LazyFrame::scan_parquet("", args)
                .unwrap()
                .collect()
                .unwrap()
        }).await.unwrap();

it would be better if polars can offer async api?

and remove the tokio runtime wrapper in polars-io-0.37.0/src/cloud/glob.rs

#[tokio::main(flavor = "current_thread")]
/// List files with a prefix derived from the pattern.
pub async fn glob(url: &str, cloud_options: Option<&CloudOptions>) -> PolarsResult<Vec<String>> {
ritchie46 commented 2 months ago

Can you confirm this is still an issue on main? We improved a lot lately.

WbaN314 commented 1 month ago

@ritchie46, yes the problem still persists. Just encountered the exact same one with current master branch. The workaround from @Veiasai is also working for me.

Minimal example that fails:

#[tokio::test(flavor = "multi_thread")]
    async fn fails() {
        let path_string = OBJECT_STORE_BASE_URL.to_string() + "sample_parquet.snappy.parquet";
        let path = Path::new(&path_string);
        let lazy_frame = LazyFrame::scan_parquet(path, ScanArgsParquet::default())
            .unwrap()
            .count()
            .collect()
            .unwrap();
        println!("{:?}", lazy_frame)
    }