datafusion-contrib / datafusion-objectstore-hdfs

HDFS based on Java implementation as a remote ObjectStore for DataFusion
Apache License 2.0
9 stars 8 forks source link

Concurrent write issue #13

Open yjshen opened 1 year ago

yjshen commented 1 year ago

Seen while running delta-rs tests:

cargo test --features hdfs,integration_test --test integration_concurrent_writes test_concurrent_writes_hdfs

Failure seen:


---- test_concurrent_writes_hdfs stdout ----

test test_concurrent_writes_hdfs ... FAILED

failures:

---- test_concurrent_writes_hdfs stdout ----
thread 'test_concurrent_writes_hdfs' panicked at 'called `Result::unwrap()` on an `Err` value: ObjectStore { source: Generic { store: "HadoopFileSystem", source: Generic("Fail to read contents from /test-delta-table-1679575230/concurrent_workers/_delta_log/00000000000000000003.json with return code 0") } }', rust/tests/integration_concurrent_writes.rs:166:37
stack backtrace:
   0: rust_begin_unwind
             at /rustc/2c8cc343237b8f7d5a3c3703e3a87f2eb2c54a74/library/std/src/panicking.rs:575:5
   1: core::panicking::panic_fmt
             at /rustc/2c8cc343237b8f7d5a3c3703e3a87f2eb2c54a74/library/core/src/panicking.rs:64:14
   2: core::result::unwrap_failed
             at /rustc/2c8cc343237b8f7d5a3c3703e3a87f2eb2c54a74/library/core/src/result.rs:1790:5
   3: core::result::Result<T,E>::unwrap
             at /rustc/2c8cc343237b8f7d5a3c3703e3a87f2eb2c54a74/library/core/src/result.rs:1112:23
   4: integration_concurrent_writes::Worker::commit_file::{{closure}}
             at ./tests/integration_concurrent_writes.rs:166:9
   5: integration_concurrent_writes::Worker::commit_sequence::{{closure}}
             at ./tests/integration_concurrent_writes.rs:145:44
   6: integration_concurrent_writes::run_test::{{closure}}::{{closure}}
             at ./tests/integration_concurrent_writes.rs:97:71
   7: <core::pin::Pin<P> as core::future::future::Future>::poll
             at /rustc/2c8cc343237b8f7d5a3c3703e3a87f2eb2c54a74/library/core/src/future/future.rs:125:9
   8: tokio::runtime::task::core::Core<T,S>::poll::{{closure}}
             at /Users/yijie/.cargo/registry/src/mirrors.ustc.edu.cn-61ef6e0cd06fb9b8/tokio-1.26.0/src/runtime/task/core.rs:223:17
   9: tokio::loom::std::unsafe_cell::UnsafeCell<T>::with_mut
             at /Users/yijie/.cargo/registry/src/mirrors.ustc.edu.cn-61ef6e0cd06fb9b8/tokio-1.26.0/src/loom/std/unsafe_cell.rs:14:9
  10: tokio::runtime::task::core::Core<T,S>::poll
             at /Users/yijie/.cargo/registry/src/mirrors.ustc.edu.cn-61ef6e0cd06fb9b8/tokio-1.26.0/src/runtime/task/core.rs:212:13
  11: tokio::runtime::task::harness::poll_future::{{closure}}
             at /Users/yijie/.cargo/registry/src/mirrors.ustc.edu.cn-61ef6e0cd06fb9b8/tokio-1.26.0/src/runtime/task/harness.rs:476:19
  12: <core::panic::unwind_safe::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once
             at /rustc/2c8cc343237b8f7d5a3c3703e3a87f2eb2c54a74/library/core/src/panic/unwind_safe.rs:271:9
  13: std::panicking::try::do_call
             at /rustc/2c8cc343237b8f7d5a3c3703e3a87f2eb2c54a74/library/std/src/panicking.rs:483:40
  14: ___rust_try
  15: std::panicking::try
             at /rustc/2c8cc343237b8f7d5a3c3703e3a87f2eb2c54a74/library/std/src/panicking.rs:447:19
  16: std::panic::catch_unwind
             at /rustc/2c8cc343237b8f7d5a3c3703e3a87f2eb2c54a74/library/std/src/panic.rs:140:14
  17: tokio::runtime::task::harness::poll_future
             at /Users/yijie/.cargo/registry/src/mirrors.ustc.edu.cn-61ef6e0cd06fb9b8/tokio-1.26.0/src/runtime/task/harness.rs:464:18
  18: tokio::runtime::task::harness::Harness<T,S>::poll_inner
             at /Users/yijie/.cargo/registry/src/mirrors.ustc.edu.cn-61ef6e0cd06fb9b8/tokio-1.26.0/src/runtime/task/harness.rs:198:27
  19: tokio::runtime::task::harness::Harness<T,S>::poll
             at /Users/yijie/.cargo/registry/src/mirrors.ustc.edu.cn-61ef6e0cd06fb9b8/tokio-1.26.0/src/runtime/task/harness.rs:152:15
  20: tokio::runtime::task::raw::poll
             at /Users/yijie/.cargo/registry/src/mirrors.ustc.edu.cn-61ef6e0cd06fb9b8/tokio-1.26.0/src/runtime/task/raw.rs:255:5
  21: tokio::runtime::task::raw::RawTask::poll
             at /Users/yijie/.cargo/registry/src/mirrors.ustc.edu.cn-61ef6e0cd06fb9b8/tokio-1.26.0/src/runtime/task/raw.rs:200:18
  22: tokio::runtime::task::LocalNotified<S>::run
             at /Users/yijie/.cargo/registry/src/mirrors.ustc.edu.cn-61ef6e0cd06fb9b8/tokio-1.26.0/src/runtime/task/mod.rs:394:9
  23: tokio::runtime::scheduler::current_thread::CoreGuard::block_on::{{closure}}::{{closure}}
             at /Users/yijie/.cargo/registry/src/mirrors.ustc.edu.cn-61ef6e0cd06fb9b8/tokio-1.26.0/src/runtime/scheduler/current_thread.rs:584:25
  24: tokio::runtime::coop::with_budget
             at /Users/yijie/.cargo/registry/src/mirrors.ustc.edu.cn-61ef6e0cd06fb9b8/tokio-1.26.0/src/runtime/coop.rs:107:5
  25: tokio::runtime::coop::budget
             at /Users/yijie/.cargo/registry/src/mirrors.ustc.edu.cn-61ef6e0cd06fb9b8/tokio-1.26.0/src/runtime/coop.rs:73:5
  26: tokio::runtime::scheduler::current_thread::Context::run_task::{{closure}}
             at /Users/yijie/.cargo/registry/src/mirrors.ustc.edu.cn-61ef6e0cd06fb9b8/tokio-1.26.0/src/runtime/scheduler/current_thread.rs:285:29
  27: tokio::runtime::scheduler::current_thread::Context::enter
             at /Users/yijie/.cargo/registry/src/mirrors.ustc.edu.cn-61ef6e0cd06fb9b8/tokio-1.26.0/src/runtime/scheduler/current_thread.rs:350:19
  28: tokio::runtime::scheduler::current_thread::Context::run_task
             at /Users/yijie/.cargo/registry/src/mirrors.ustc.edu.cn-61ef6e0cd06fb9b8/tokio-1.26.0/src/runtime/scheduler/current_thread.rs:285:9
  29: tokio::runtime::scheduler::current_thread::CoreGuard::block_on::{{closure}}
             at /Users/yijie/.cargo/registry/src/mirrors.ustc.edu.cn-61ef6e0cd06fb9b8/tokio-1.26.0/src/runtime/scheduler/current_thread.rs:583:34
  30: tokio::runtime::scheduler::current_thread::CoreGuard::enter::{{closure}}
             at /Users/yijie/.cargo/registry/src/mirrors.ustc.edu.cn-61ef6e0cd06fb9b8/tokio-1.26.0/src/runtime/scheduler/current_thread.rs:615:57
  31: tokio::macros::scoped_tls::ScopedKey<T>::set
             at /Users/yijie/.cargo/registry/src/mirrors.ustc.edu.cn-61ef6e0cd06fb9b8/tokio-1.26.0/src/macros/scoped_tls.rs:61:9
  32: tokio::runtime::scheduler::current_thread::CoreGuard::enter
             at /Users/yijie/.cargo/registry/src/mirrors.ustc.edu.cn-61ef6e0cd06fb9b8/tokio-1.26.0/src/runtime/scheduler/current_thread.rs:615:27
  33: tokio::runtime::scheduler::current_thread::CoreGuard::block_on
             at /Users/yijie/.cargo/registry/src/mirrors.ustc.edu.cn-61ef6e0cd06fb9b8/tokio-1.26.0/src/runtime/scheduler/current_thread.rs:530:19
  34: tokio::runtime::scheduler::current_thread::CurrentThread::block_on
             at /Users/yijie/.cargo/registry/src/mirrors.ustc.edu.cn-61ef6e0cd06fb9b8/tokio-1.26.0/src/runtime/scheduler/current_thread.rs:154:24
  35: tokio::runtime::runtime::Runtime::block_on
             at /Users/yijie/.cargo/registry/src/mirrors.ustc.edu.cn-61ef6e0cd06fb9b8/tokio-1.26.0/src/runtime/runtime.rs:302:47
  36: integration_concurrent_writes::test_concurrent_writes_hdfs
             at ./tests/integration_concurrent_writes.rs:36:5
  37: integration_concurrent_writes::test_concurrent_writes_hdfs::{{closure}}
             at ./tests/integration_concurrent_writes.rs:34:43
  38: core::ops::function::FnOnce::call_once
             at /rustc/2c8cc343237b8f7d5a3c3703e3a87f2eb2c54a74/library/core/src/ops/function.rs:250:5
  39: core::ops::function::FnOnce::call_once
             at /rustc/2c8cc343237b8f7d5a3c3703e3a87f2eb2c54a74/library/core/src/ops/function.rs:250:5
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.
thread 'test_concurrent_writes_hdfs' panicked at 'called `Result::unwrap()` on an `Err` value: JoinError::Panic(Id(14), ...)', rust/tests/integration_concurrent_writes.rs:103:28
stack backtrace:
   0: rust_begin_unwind
             at /rustc/2c8cc343237b8f7d5a3c3703e3a87f2eb2c54a74/library/std/src/panicking.rs:575:5
   1: core::panicking::panic_fmt
             at /rustc/2c8cc343237b8f7d5a3c3703e3a87f2eb2c54a74/library/core/src/panicking.rs:64:14
   2: core::result::unwrap_failed
             at /rustc/2c8cc343237b8f7d5a3c3703e3a87f2eb2c54a74/library/core/src/result.rs:1790:5
   3: core::result::Result<T,E>::unwrap
             at /rustc/2c8cc343237b8f7d5a3c3703e3a87f2eb2c54a74/library/core/src/result.rs:1112:23
   4: integration_concurrent_writes::run_test::{{closure}}
             at ./tests/integration_concurrent_writes.rs:103:20
   5: integration_concurrent_writes::test_concurrent_writes::{{closure}}
             at ./tests/integration_concurrent_writes.rs:42:51
   6: integration_concurrent_writes::test_concurrent_writes_hdfs::{{closure}}
             at ./tests/integration_concurrent_writes.rs:35:53
   7: <core::pin::Pin<P> as core::future::future::Future>::poll
             at /rustc/2c8cc343237b8f7d5a3c3703e3a87f2eb2c54a74/library/core/src/future/future.rs:125:9
   8: <core::pin::Pin<P> as core::future::future::Future>::poll
             at /rustc/2c8cc343237b8f7d5a3c3703e3a87f2eb2c54a74/library/core/src/future/future.rs:125:9
   9: tokio::runtime::scheduler::current_thread::CoreGuard::block_on::{{closure}}::{{closure}}::{{closure}}
             at /Users/yijie/.cargo/registry/src/mirrors.ustc.edu.cn-61ef6e0cd06fb9b8/tokio-1.26.0/src/runtime/scheduler/current_thread.rs:541:57
  10: tokio::runtime::coop::with_budget
             at /Users/yijie/.cargo/registry/src/mirrors.ustc.edu.cn-61ef6e0cd06fb9b8/tokio-1.26.0/src/runtime/coop.rs:107:5
  11: tokio::runtime::coop::budget
             at /Users/yijie/.cargo/registry/src/mirrors.ustc.edu.cn-61ef6e0cd06fb9b8/tokio-1.26.0/src/runtime/coop.rs:73:5
  12: tokio::runtime::scheduler::current_thread::CoreGuard::block_on::{{closure}}::{{closure}}
             at /Users/yijie/.cargo/registry/src/mirrors.ustc.edu.cn-61ef6e0cd06fb9b8/tokio-1.26.0/src/runtime/scheduler/current_thread.rs:541:25
  13: tokio::runtime::scheduler::current_thread::Context::enter
             at /Users/yijie/.cargo/registry/src/mirrors.ustc.edu.cn-61ef6e0cd06fb9b8/tokio-1.26.0/src/runtime/scheduler/current_thread.rs:350:19
  14: tokio::runtime::scheduler::current_thread::CoreGuard::block_on::{{closure}}
             at /Users/yijie/.cargo/registry/src/mirrors.ustc.edu.cn-61ef6e0cd06fb9b8/tokio-1.26.0/src/runtime/scheduler/current_thread.rs:540:36
  15: tokio::runtime::scheduler::current_thread::CoreGuard::enter::{{closure}}
             at /Users/yijie/.cargo/registry/src/mirrors.ustc.edu.cn-61ef6e0cd06fb9b8/tokio-1.26.0/src/runtime/scheduler/current_thread.rs:615:57
  16: tokio::macros::scoped_tls::ScopedKey<T>::set
             at /Users/yijie/.cargo/registry/src/mirrors.ustc.edu.cn-61ef6e0cd06fb9b8/tokio-1.26.0/src/macros/scoped_tls.rs:61:9
  17: tokio::runtime::scheduler::current_thread::CoreGuard::enter
             at /Users/yijie/.cargo/registry/src/mirrors.ustc.edu.cn-61ef6e0cd06fb9b8/tokio-1.26.0/src/runtime/scheduler/current_thread.rs:615:27
  18: tokio::runtime::scheduler::current_thread::CoreGuard::block_on
             at /Users/yijie/.cargo/registry/src/mirrors.ustc.edu.cn-61ef6e0cd06fb9b8/tokio-1.26.0/src/runtime/scheduler/current_thread.rs:530:19
  19: tokio::runtime::scheduler::current_thread::CurrentThread::block_on
             at /Users/yijie/.cargo/registry/src/mirrors.ustc.edu.cn-61ef6e0cd06fb9b8/tokio-1.26.0/src/runtime/scheduler/current_thread.rs:154:24
  20: tokio::runtime::runtime::Runtime::block_on
             at /Users/yijie/.cargo/registry/src/mirrors.ustc.edu.cn-61ef6e0cd06fb9b8/tokio-1.26.0/src/runtime/runtime.rs:302:47
  21: integration_concurrent_writes::test_concurrent_writes_hdfs
             at ./tests/integration_concurrent_writes.rs:36:5
  22: integration_concurrent_writes::test_concurrent_writes_hdfs::{{closure}}
             at ./tests/integration_concurrent_writes.rs:34:43
  23: core::ops::function::FnOnce::call_once
             at /rustc/2c8cc343237b8f7d5a3c3703e3a87f2eb2c54a74/library/core/src/ops/function.rs:250:5
  24: core::ops::function::FnOnce::call_once
             at /rustc/2c8cc343237b8f7d5a3c3703e3a87f2eb2c54a74/library/core/src/ops/function.rs:250:5
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.
wjones127 commented 1 year ago

Not sure if this is the root cause, but I'll also note that your copy_if_not_exist implementation isn't quite right; it doesn't handle the race condition if two concurrent calls are made, since the call to see if the destination exists happens separately from the call to write, with no locking happening in between.

https://github.com/datafusion-contrib/datafusion-objectstore-hdfs/blob/a88fbc8c186310a0bbb81392406657816229c8a3/hdfs/src/object_store/hdfs.rs#L385-L396

Perhaps you should implement rename_if_not_exist instead, since it seems like the HDFS rename operation should error if the destination doesn't exist.