Eventual-Inc / Daft

Distributed data engine for Python/SQL designed for the cloud, powered by Rust
https://getdaft.io
Apache License 2.0
2.39k stars 170 forks source link

refactor: GlobScanOperator do not block runtime! #3423

Closed andrewgazelka closed 2 days ago

andrewgazelka commented 3 days ago

https://github.com/Eventual-Inc/Daft/blob/7e89850b8b276a32119f2d59ccbeeb39dc009557/src/daft-scan/src/glob.rs#L141-L302

getting

thread 'tokio-runtime-worker' panicked at /Users/andrewgazelka/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.41.1/src/runtime/scheduler/multi_thread/mod.rs:86:9:
Cannot start a runtime from within a runtime. This happens because a function (like `block_on`) attempted to block the current thread while the thread is being used to drive asynchronous tasks.
stack backtrace:
   0: rust_begin_unwind
             at /rustc/28a58f2fa7f0c46b8fab8237c02471a915924fe5/library/std/src/panicking.rs:662:5
   1: core::panicking::panic_fmt
             at /rustc/28a58f2fa7f0c46b8fab8237c02471a915924fe5/library/core/src/panicking.rs:74:14
   2: enter_runtime<tokio::runtime::scheduler::multi_thread::{impl#0}::block_on::{closure_env#0}<core::pin::Pin<alloc::boxed::Box<daft_scan::glob::run_glob::{async_block_env#0}, alloc::alloc::Global>>>, core::result::Result<core::pin::Pin<alloc::boxed::Box<(dyn futures_core::stream::Stream<Item=core::result::Result<daft_io::object_io::FileMetadata, daft_io::Error>> + core::marker::Send), alloc::alloc::Global>>, daft_io::Error>>
             at /Users/andrewgazelka/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.41.1/src/runtime/context/runtime.rs:68:5
   3: block_on<core::pin::Pin<alloc::boxed::Box<daft_scan::glob::run_glob::{async_block_env#0}, alloc::alloc::Global>>>
             at /Users/andrewgazelka/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.41.1/src/runtime/scheduler/multi_thread/mod.rs:86:9
   4: block_on_inner<core::pin::Pin<alloc::boxed::Box<daft_scan::glob::run_glob::{async_block_env#0}, alloc::alloc::Global>>>
             at /Users/andrewgazelka/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.41.1/src/runtime/runtime.rs:370:45
   5: block_on<daft_scan::glob::run_glob::{async_block_env#0}>
             at /Users/andrewgazelka/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.41.1/src/runtime/runtime.rs:340:13
   6: block_on_current_thread<daft_scan::glob::run_glob::{async_block_env#0}>
             at ./src/common/runtime/src/lib.rs:123:9
   7: run_glob
             at ./src/daft-scan/src/glob.rs:89:21
   8: try_new
             at ./src/daft-scan/src/glob.rs:164:25
   9: finish
             at ./src/daft-scan/src/builder.rs:108:33
  10: data_source
             at ./src/daft-connect/src/translation/logical_plan/read/data_source.rs:39:19
  11: read
             at ./src/daft-connect/src/translation/logical_plan/read.rs:26:13
  12: to_logical_plan
             at ./src/daft-connect/src/translation/logical_plan.rs:31:29
  13: {async_block#0}
             at ./src/daft-connect/src/op/execute/root.rs:34:28
  14: {async_block#0}
             at ./src/daft-connect/src/op/execute/root.rs:55:43

When executing from an async runtime. Can we make it so we don't block? I'm thinking

fn try_new(...) should becomes async fn try_new(...)

This is similar to https://github.com/Eventual-Inc/Daft/pull/3378

andrewgazelka commented 3 days ago

Assigning @colin-ho but unsure who is best suited for completing this. other contender based on blame is @jaychia

jaychia commented 3 days ago

Feel free to make a refactor to unblock, I can review.

Is the canonical approach something like async fn async_try_new(...) + fn try_new(...) {block_on(async_try_new)}? I'm not entirely sure here, but that could maybe work

andrewgazelka commented 3 days ago

Feel free to make a refactor to unblock, I can review.

Is the canonical approach something like async fn async_try_new(...) + fn try_new(...) {block_on(async_try_new)}? I'm not entirely sure here, but that could maybe work

canonically you would just remove try_new and leave if you want to block up to the callee. For instance, if the callee wanted to block (as python should do) it could do block_on(try_new) where try_new is async

andrewgazelka commented 3 days ago

ok I will work on this