google / leveldb

LevelDB is a fast key-value storage library written at Google that provides an ordered mapping from string keys to string values.
BSD 3-Clause "New" or "Revised" License
36.37k stars 7.81k forks source link

Question: the safety of implementing a background compaction thread pool (1.20). #913

Open minggrim opened 3 years ago

minggrim commented 3 years ago

Hi all, I have a question about the background compaction thread. currently, it only has one background compaction thread.

But in my use case, I have lots of db instances held by a process. I need the DB compaction of different DBs can run concurrently without queuing and wait for the other.

Is it safe to override the Env::Schedule interface, and put it into a different job queue, and let background threads deque?

As below diagram

                                                               +---> [queue1] -> bg thread 1 deque-> call func(arg)
                                                                |
Schedule() dispatch & signal  --+----> [queue 2] -> bg thread 2 deque -> call func(arg)
                                                                |
                                                                +---> [queue 3] -> bg thread 3 deque -> call func(arg)

From the source code, it looks like the compaction job of a DB is mutually exclusive itself, and one db has one compaction job at a time. But I am not sure about this.

Grateful if you could advise

minggrim commented 3 years ago

And although the comment of Schedule said "the caller may not assume that background work items are serialized"

include/leveldb/env.h

// Arrange to run "(*function)(arg)" once in a background thread.
//
// "function" may run in an unspecified thread.  Multiple functions
// added to the same Env may run concurrently in different threads.
// I.e., the caller may not assume that background work items are
// serialized.
virtual void Schedule(
    void (*function)(void* arg),
    void* arg) = 0;

But EnvTest::RunMany still test the order.

util/env_test.cc

TEST_F(EnvTest, RunMany) {
  struct RunState {
    port::Mutex mu;
    port::CondVar cvar{&mu};
    int last_id = 0;
  };

  struct Callback {
    RunState* state_;  // Pointer to shared state.
    const int id_;  // Order# for the execution of this callback.

    Callback(RunState* s, int id) : state_(s), id_(id) {}

    static void Run(void* arg) {
      Callback* callback = reinterpret_cast<Callback*>(arg);
      RunState* state = callback->state_;

      MutexLock l(&state->mu);
      ASSERT_EQ(state->last_id, callback->id_ - 1);
      state->last_id = callback->id_;
      state->cvar.Signal();
    }
  };

  RunState state;
  Callback callback1(&state, 1);
  Callback callback2(&state, 2);
  Callback callback3(&state, 3);
  Callback callback4(&state, 4);
  env_->Schedule(&Callback::Run, &callback1);
  env_->Schedule(&Callback::Run, &callback2);
  env_->Schedule(&Callback::Run, &callback3);
  env_->Schedule(&Callback::Run, &callback4);

  MutexLock l(&state.mu);
  while (state.last_id != 4) {
    state.cvar.Wait();
  }
}