slatedb / slatedb

A cloud native embedded storage engine built on object storage.
https://slatedb.io
Apache License 2.0
1.32k stars 59 forks source link

Panic in the compactor when writing to SlateDB from a DataFusion operator #252

Open ameyc opened 1 week ago

ameyc commented 1 week ago

Describe the bug We are attempting to use SlateDb to checkpoint the state of our operators of our Stream Processing system built atop DataFusion.

On a high level here's how our system works -

  1. We are executing a compute graph that is running on DataFusions tokio based runtime. DataFusion operators implement that ReceiverStream pattern and produce data asynchronously that is routed through the graph by the runtime. (See: https://github.com/apache/datafusion/blob/main/datafusion/physical-plan/src/stream.rs )

  2. In some of our stateful operators, we want to checkpoint the state

    fn check_point_state_for_epoch(&mut self, epoch_ts: u128) {
        debug!(
            "initiating checkpoint for {} at epoch {}",
            self.channel_tag, epoch_ts
        );
        ...
        let serialized_checkpoint = bincode::serialize(&checkpointed_state).unwrap();
        let key = self.channel_tag.as_bytes();
        let _ = block_on(self.state_backend.put(key, &serialized_checkpoint));
        debug!(
            "finished checkpoint for {} at epoch {}",
            self.channel_tag, epoch_ts
        );
    }
  1. The self.state_backend here is an Arc<Db> instance that is shared between the tasks.

Ever so often, we see a panic while checkpoint that ends up killing the runtime.

For purposes of testing we are using the LocalFilesystem

async fn get_slate_db_backend(path_str: &str) -> Db {
    let object_store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
    let options = DbOptions::default();
    debug!("initialising slate db at {}", path_str);
    Db::open_with_opts(Path::from(path_str), options, object_store)
        .await
        .unwrap()
}

And here is the backtrace

thread '<unnamed>' panicked at /Users/amey/.cargo/registry/src/index.crates.io-6f17d22bba15001f/slatedb-0.2.0/src/compactor.rs:159:46:
fatal error loading manifest: ObjectStoreError(JoinError { source: JoinError::Cancelled(Id(109)) })
stack backtrace:
   0:        0x1068a0ef8 - <std::sys::backtrace::BacktraceLock::print::DisplayBacktrace as core::fmt::Display>::fmt::h14dcb4794e811c9f
   1:        0x1068b12d4 - core::fmt::write::h49a9cf68bfb4a321
   2:        0x1068a431c - std::io::Write::write_fmt::h00e4f1b82d71ad85
   3:        0x10688cf74 - std::panicking::default_hook::{{closure}}::hda52b8046522f561
   4:        0x10688cbb0 - std::panicking::default_hook::h9566cb73367aef3e
   5:        0x10688d92c - std::panicking::rust_panic_with_hook::hfa41181adbcf532d
   6:        0x1068a12dc - std::panicking::begin_panic_handler::{{closure}}::ha417e66bf9de7acf
   7:        0x1068a1120 - std::sys::backtrace::__rust_end_short_backtrace::hc7b890f013fa9efb
   8:        0x10688d068 - _rust_begin_unwind
   9:        0x1069eb128 - core::panicking::panic_fmt::h29609029db4d41f4
  10:        0x1069eb0f8 - core::result::unwrap_failed::h1a030347a5d7de67
  11:        0x102ad14d8 - core::result::Result<T,E>::expect::he3596e049c212a86
                               at /private/tmp/rust-20240905-9990-tmdsun/rustc-1.81.0-src/library/core/src/result.rs:1059:23
  12:        0x102bf6608 - slatedb::compactor::CompactorOrchestrator::run::he520b96bb88ca345
                               at /Users/amey/.cargo/registry/src/index.crates.io-6f17d22bba15001f/slatedb-0.2.0/src/compactor.rs:159:25
  13:        0x1024d87d4 - slatedb::compactor::Compactor::new::{{closure}}::{{closure}}::h37fe734dfc21b1c4
                               at /Users/amey/.cargo/registry/src/index.crates.io-6f17d22bba15001f/slatedb-0.2.0/src/compactor.rs:67:13
  14:        0x1024dadc4 - std::sys::backtrace::__rust_begin_short_backtrace::h68fbd1990538055a
                               at /private/tmp/rust-20240905-9990-tmdsun/rustc-1.81.0-src/library/std/src/sys/backtrace.rs:152:18
  15:        0x10249b704 - std::thread::Builder::spawn_unchecked_::{{closure}}::{{closure}}::hc04c79247f926931
                               at /private/tmp/rust-20240905-9990-tmdsun/rustc-1.81.0-src/library/std/src/thread/mod.rs:538:17
  16:        0x10249f2d0 - <core::panic::unwind_safe::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once::h666de1a3662eea42
                               at /private/tmp/rust-20240905-9990-tmdsun/rustc-1.81.0-src/library/core/src/panic/unwind_safe.rs:272:9
  17:        0x1024e7498 - std::panicking::try::do_call::hc5aad5f651f779b4
                               at /private/tmp/rust-20240905-9990-tmdsun/rustc-1.81.0-src/library/std/src/panicking.rs:557:40
  18:        0x10249b918 - ___rust_try
  19:        0x10249b3b4 - std::panicking::try::h50bcaf7edf39fa2a
                               at /private/tmp/rust-20240905-9990-tmdsun/rustc-1.81.0-src/library/std/src/panicking.rs:521:19
  20:        0x10249b3b4 - std::panic::catch_unwind::hcf164ee567674fd1
                               at /private/tmp/rust-20240905-9990-tmdsun/rustc-1.81.0-src/library/std/src/panic.rs:350:14
  21:        0x10249b3b4 - std::thread::Builder::spawn_unchecked_::{{closure}}::ha82d38afd7a6518f
                               at /private/tmp/rust-20240905-9990-tmdsun/rustc-1.81.0-src/library/std/src/thread/mod.rs:537:30
  22:        0x1024ab908 - core::ops::function::FnOnce::call_once{{vtable.shim}}::h134233046647c648
                               at /private/tmp/rust-20240905-9990-tmdsun/rustc-1.81.0-src/library/core/src/ops/function.rs:250:5
  23:        0x10688e484 - std::sys::pal::unix::thread::Thread::new::thread_start::h3d45bacb8043a502
  24:        0x18d622f94 - __pthread_joiner_wake

Additional context Unclear if this is related to slatedb internals or particularly how SlateDB's threading is interacting with DataFusion's runtime.

ameyc commented 1 week ago

fyi... trying this against InMemory and Localstack does not seem to reproduce the issue. Might be strictly related to LocalFilesystem impl of the ObjectStore.

KaguraMilet commented 1 week ago

Conducted a simple tracking based on the error report. It seems likely that there is something wrong with manifest_store::read_latest_manifest But continuing to track it lacks more context.

criccomini commented 1 week ago

Perhaps it's this? https://github.com/apache/arrow-rs/issues/6375

Listing files out of order will cause read latest manifest to work in an unexpected way. I'm not sure if it'd cause a complete failure, as this issue illustrates, though.

criccomini commented 1 week ago

Ah, nm. We sort the files in list_manifests

criccomini commented 1 week ago

@ameyc Can you share your complete DbOptions config for the DB?

ameyc commented 1 week ago

Using DBOptions::default()

Here they are -

[2024-10-08T19:34:20Z DEBUG denormalized::state_backend::slatedb] DBOptions DbOptions {
        flush_interval: 100ms,
        manifest_poll_interval: 1s,
        min_filter_keys: 1000,
        filter_bits_per_key: 10,
        l0_sst_size_bytes: 67108864,
        l0_max_ssts: 8,
        max_unflushed_memtable: 2,
        compactor_options: CompactorOptions { poll_interval: 5s, max_sst_size: 1073741824, max_concurrent_compactions: 4, compaction_runtime: None }
        compression_codec: None
        object_store_cache_options: ObjectStoreCacheOptions { root_folder: None, part_size_bytes: 4194304 }
        block_cache: Some(Arc<dyn DbCache>)
        garbage_collector_options: GarbageCollectorOptions { manifest_options: Some(GarbageCollectorDirectoryOptions { poll_interval: 300s, min_age: 86400s }), wal_options: Some(GarbageCollectorDirectoryOptions { poll_interval: 60s, min_age: 60s }), compacted_options: Some(GarbageCollectorDirectoryOptions { poll_interval: 300s, min_age: 86400s }), gc_runtime: None }
    }

Added a few debug statements to list_manifests and read_manifest as suggested in the discord. Here is the trace

[2024-10-08T19:43:13Z DEBUG slatedb::manifest_store] entering read_latest_manifest
[2024-10-08T19:43:13Z DEBUG slatedb::manifest_store] entered list_manifests
[2024-10-08T19:43:13Z DEBUG slatedb::manifest_store] about to parse manifest files
thread '<unnamed>' panicked at /Users/amey/pnl/3rdparty/slatedb/src/compactor.rs:159:46:
fatal error loading manifest: ObjectStoreError(JoinError { source: JoinError::Cancelled(Id(576)) })
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

amey_debug_changes.patch

ameyc commented 1 week ago

Accidentally opened a PR with my patch. But nevertheless in case it is easier to follow - https://github.com/slatedb/slatedb/pull/253/files

criccomini commented 1 week ago

Based on the debug statements, it appears to be dying here:

        while let Some(file) = match files_stream.next().await.transpose() {
            Ok(file) => file,
            Err(e) => return Err(SlateDBError::ObjectStoreError(e)),
        } {

Which suggests a file_stream.next() is returning an error. The nested JoinError is a bit puzzling.

I suppose it could die inside the loop as well (in parse_id, for example). But given the SlateDBError::ObjectStoreError in the error message, I think it's happening in the files_stream.next().await.transpose() call (parse_id throws InvalidDBState, not ObjectStoreError).

criccomini commented 1 week ago

Some useful tidbits from ChatGPT:

The error you're seeing, ObjectStoreError(JoinError { source: JoinError::Cancelled(Id(576)) }), indicates that a join operation (likely from futures or tokio) was canceled. In async Rust, a JoinError::Cancelled usually happens when a future is aborted before it completes. There are a few potential reasons for this:

  1. Timeout or Cancelation: If you're running this in an environment where there's a timeout or some form of future cancelation mechanism, the future running the files_stream.next() could be getting canceled before it completes. Check if there are any timeouts in your code or external timeouts being enforced.

  2. Task Abort due to Other Conditions: If the code calling list_manifests depends on other futures, they might be failing or aborting, which could cascade and cause the join error in your current task. For example, if there's another task that fails and causes the task running this function to be canceled, you would see this error.

  3. Misuse of Streams: Ensure that the files_stream.next().await isn't causing a deadlock or other issue that could result in cancellation. If the stream implementation has bugs or interacts badly with the environment (such as exhausting system resources), it might cause an unexpected abort.

criccomini commented 1 week ago

@ameyc What are you setting for path_str?

ameyc commented 1 week ago

it is being set to /tmp/checkpoints/ . currently seeing if how i am trying to use blocking calls to slatedb (perhaps running into timeout issues) from DafaFusion's streams is the issue but at loss as to why this seems to be an issue with using LFS specifically.

fwiw i do see the manifest and the wal files in the checkpoint directory.

ameyc commented 5 days ago

added a few debugs to the while loop in the list_manifests and heres the full trace. Funnily adding these debugs, make the failure far rarer (though it still fails if i leave the pipeline running long enough). I will try and produce a repro that isnt super embedded in our code base over the weekend.

[2024-10-10T00:03:20Z DEBUG slatedb::manifest_store] entering read_latest_manifest
[2024-10-10T00:03:20Z DEBUG slatedb::manifest_store] entered list_manifests
[2024-10-10T00:03:20Z DEBUG slatedb::manifest_store] about to parse manifest files
[2024-10-10T00:03:20Z DEBUG slatedb::manifest_store] parsing 00000000000000000002.manifest
[2024-10-10T00:03:20Z DEBUG slatedb::manifest_store] pushing id 2 to manifests
[2024-10-10T00:03:20Z DEBUG slatedb::manifest_store] parsing 00000000000000000003.manifest
[2024-10-10T00:03:20Z DEBUG slatedb::manifest_store] pushing id 3 to manifests
[2024-10-10T00:03:20Z DEBUG slatedb::manifest_store] parsing 00000000000000000001.manifest
[2024-10-10T00:03:20Z DEBUG slatedb::manifest_store] pushing id 1 to manifests
[2024-10-10T00:03:20Z DEBUG slatedb::manifest_store] manifests successfully loaded
[2024-10-10T00:03:20Z DEBUG slatedb::manifest_store] successfully read the manifest_metadatas_list
criccomini commented 5 days ago

@ameyc thanks. Do you have a debug log for when it failed with the debug! statements?