delta-io / delta-rs

A native Rust library for Delta Lake, with bindings into Python
https://delta-io.github.io/delta-rs/
Apache License 2.0
2.33k stars 411 forks source link

Regression in Python multiprocessing support #2744

Closed Tom-Newton closed 3 months ago

Tom-Newton commented 3 months ago

Environment

Delta-rs version: 0.18.2 Binding: Python Environment:


Bug

What happened: Just upgraded deltalake for the first time in a while and we are now getting a deadlock when initialising a deltalake.DeltaTable object when using Python multiprocessing. Sometimes it just hangs and sometimes the OS kills it.

thread 'tokio-runtime-worker' panicked at library/std/src/sys/pal/unix/thread.rs:274:13:
failed to join thread: Resource deadlock avoided (os error 35)

full_log.txt

What you expected to happen: DeltaTable should initialise successfully regardless of multiprocessing.

How to reproduce it:

from multiprocessing import Pool, Process
from deltalake import DeltaTable

def load():
    DeltaTable("crates/test/tests/data/simple_table_with_checkpoint")

def main():
    print("start normal")
    load()
    print("Start multiprossed")
    p = Process(target=load)
    p.start()
    p.join()
    print("DONE")

if __name__ == "__main__":
    main()

Note, to reproduce we must use deltalake.DeltaTable once then use it again in a child python process using multiprocessing.

More details: I have done a git bisect and identified that https://github.com/delta-io/delta-rs/pull/2424 is the cause.

ion-elgreco commented 3 months ago

@wjones127 do you have any idea's why this could happen? Oncelock should be thread safe

wjones127 commented 3 months ago

Are you using fork or spawn? IIRC different platforms have different defaults. tokio doesn't support fork.

Tom-Newton commented 3 months ago

Good question. We are using fork since that is the default on linux. multiprocessing.set_start_method("spawn") does indeed avoid the problem. I'll try to find out if this will be a viable solution for us.

Tom-Newton commented 3 months ago

I would definitely feel more comfortable if the default behaviour (fork) worked with deltalake. I don't have much control over how people use this.

tokio doesn't support fork

I think this can be worked around and previously was worked around by creating new runtimes in new processes. What do you think of this monstrosity? It would allow plenty of forking to work.... :sweat_smile:

const NUMBER_OF_RUNTIME_SLOTS: usize = 10;
const PID_LOCK: OnceLock<u32> = OnceLock::new();
const RUNTIME_LOCK: OnceLock<Runtime> = OnceLock::new();

#[inline]
pub fn rt() -> &'static Runtime {
    static RUNTIMES: [OnceLock<Runtime>; NUMBER_OF_RUNTIME_SLOTS] =
        [RUNTIME_LOCK; NUMBER_OF_RUNTIME_SLOTS];
    static PIDS: [OnceLock<u32>; NUMBER_OF_RUNTIME_SLOTS] = [PID_LOCK; NUMBER_OF_RUNTIME_SLOTS];

    let current_pid = process::id();
    let (found_pid, idx) = PIDS
        .iter()
        .enumerate()
        .find_map(|(i, lock)| match lock.get() {
            Some(pid) if pid == &current_pid => Some((true, i)), // Found the current PID
            Some(_pid) => None,                                  // Found a different PID, keep searching
            None => Some((false, i)),                            // Found an empty slot
        })
        .expect("No available slot for tokio runtime. The process was forked too many times");

    match found_pid {
        true => RUNTIMES[idx].get().expect("Failed to get tokio runtime"),
        false => {
            PIDS[idx]
                .set(current_pid)
                .expect("Failed to record PID for new tokio runtime.");
            RUNTIMES[idx].get_or_init(|| Runtime::new().expect("Failed to create a tokio runtime."))
        }
    }
}
wjones127 commented 3 months ago

If that fixes it, I think that would be very cool. I think for simplicity you could probably just make it a HashMap<u32, Runtime>, although TBH I'm not sure I know how fork() works enough in detail to know if that's the best way. One concern I have is whether it will garbage collect properly. Will forking recursively cause a huge number of threads to be spawned? (I guess having a fixed buffer is part of what addressed that?)

Tom-Newton commented 3 months ago

I don't really understand why, but my attempt above still deadlocks after a couple of layers of recursive forking. For my current test I think it should be effectively the same as creating a new runtime on every call, but it seems there is something more subtle going on.

From reading https://github.com/tokio-rs/tokio/issues/4301 it sounds like there is probably no solution that is completely safe with fork and probably there are some subtleties that causes my attempt above to break. I'm starting to think the best solution might be to just fail fast if there is a fork. My main concern currently is difficult to debug deadlocks, so if users get an error saying to use spawn or forkserver instead of forkthat probably solves the problem for me.

Tom-Newton commented 3 months ago

I implemented https://github.com/delta-io/delta-rs/pull/2765 as my proposed solution.

wjones127 commented 3 months ago

Thanks for trying @Tom-Newton. 👍

Tom-Newton commented 3 months ago

Thanks for the help and the reviews everyone :slightly_smiling_face:.