immux / immux1

https://immux.com
0 stars 0 forks source link

ImmuxDB Atomicity #161

Open poppindouble opened 4 years ago

poppindouble commented 4 years ago

What is the issue

Our DB has some 'out of sync' write issue. Let's have a look at our current implementation in vkv.rs:


1. DataWriteInstruction::SetMany(set_many) => {
2.     for target in set_many.targets.iter() {
3. // We save the target's key and target's value 
4.         self.execute_versioned_set(&target.key, &target.value, next_height)?
5.     }
6.     let record: InstructionRecord = instruction.to_owned().into();
7. // We save instruction record
8.     if let Err(_) = self.save_instruction_record(&next_height, &record) {
9.         return Err(VkvError::SaveInstructionFail.into());
10.     }
11.     let count = set_many.targets.len();
12.     return Ok(Answer::DataAccess(DataAnswer::Write(
13.         DataWriteAnswer::SetOk(SetOkAnswer { count }),
14.     )));
15. }

In the above code, we save our target's key and target's value first, and then we save our instruction record. We are using two separate write here, if there is a power outage between these two writes, let's say at line 6 in the above code, this will cause a problem that the target's key and target's value have been already written to the disk, but the instruction record is lost. This will cause the database stay in a broken state, our revert and revert_all feature will be broken.

LevelDB/RocksDB Atomicity

Since our DB is based on RocksDB, we can take the advantage of the atomicity of RocksDB. Similar to LevelDB, RocksDB's atomicity is achieved by Write Ahead Log(WAL), let's have a look about its source code.


1. Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
2.   Writer w(&mutex_);
3.   w.batch = updates;
4.   w.sync = options.sync;
5.   w.done = false;
6. 
7.   MutexLock l(&mutex_);
8.   writers_.push_back(&w);
9.   while (!w.done && &w != writers_.front()) {
10.     w.cv.Wait();
11.   }
12.   if (w.done) {
13.     return w.status;
14.   }
15. 
16.   // May temporarily unlock and wait.
17.   Status status = MakeRoomForWrite(updates == nullptr);
18.   uint64_t last_sequence = versions_->LastSequence();
19.   Writer* last_writer = &w;
20.   if (status.ok() && updates != nullptr) {  // nullptr batch is for compactions
21.     WriteBatch* write_batch = BuildBatchGroup(&last_writer);
22.     WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
23.     last_sequence += WriteBatchInternal::Count(write_batch);
24. 
25.     // Add to log and apply to memtable.  We can release the lock
26.     // during this phase since &w is currently responsible for logging
27.     // and protects against concurrent loggers and concurrent writes
28.     // into mem_.
29.     {
30.       mutex_.Unlock();
31.       status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
32.       bool sync_error = false;
33.       if (status.ok() && options.sync) {
34.         status = logfile_->Sync();
35.         if (!status.ok()) {
36.           sync_error = true;
37.         }
38.       }
39.       if (status.ok()) {
40.         status = WriteBatchInternal::InsertInto(write_batch, mem_);
41.       }
42.       mutex_.Lock();
43.       if (sync_error) {
44.         // The state of the log file is indeterminate: the log record we
45.         // just added may or may not show up when the DB is re-opened.
46.         // So we force the DB into a mode where all future writes fail.
47.         RecordBackgroundError(status);
48.       }
49.     }
50.     if (write_batch == tmp_batch_) tmp_batch_->Clear();
51. 
52.     versions_->SetLastSequence(last_sequence);
53.   }
54. 
55.   while (true) {
56.     Writer* ready = writers_.front();
57.     writers_.pop_front();
58.     if (ready != &w) {
59.       ready->status = status;
60.       ready->done = true;
61.       ready->cv.Signal();
62.     }
63.     if (ready == last_writer) break;
64.   }
65. 
66.   // Notify new head of write queue
67.   if (!writers_.empty()) {
68.     writers_.front()->cv.Signal();
69.   }
70. 
71.   return status;
72. }

The above code is from LevelDB. From line 7 to line 14, we know that there will be on one thread will enter the critical section in the following code


1.       mutex_.Unlock();
2.       status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
3.       bool sync_error = false;
4.       if (status.ok() && options.sync) {
5.         status = logfile_->Sync();
6.         if (!status.ok()) {
7.           sync_error = true;
8.         }
9.       }
10.       if (status.ok()) {
11.         status = WriteBatchInternal::InsertInto(write_batch, mem_);
12.       }
13.       mutex_.Lock();
14.       if (sync_error) {
15.         // The state of the log file is indeterminate: the log record we
16.         // just added may or may not show up when the DB is re-opened.
17.         // So we force the DB into a mode where all future writes fail.
18.         RecordBackgroundError(status);
19.       }

Pay attention to line 2 and line 11 in the above code.

line 2 log ->AddRecord(WriteBatchInternal::Contents(write_batch)); is writing the new write_batch into the log file,

line 11 WriteBatchInternal::InsertInto(write_batch, mem_); is writing to mem_table.

since there will be only one thread enter this critical section, there will be no race condition, So it means that for every write happening in LevelDB/RocksDB, we write it to the log file first and then we put it into the mem_table(skip_list).

Let's have a look about AddRecrod function:

Status Writer::AddRecord(const Slice& slice) {
  const char* ptr = slice.data();
  size_t left = slice.size();

  // Fragment the record if necessary and emit it.  Note that if slice
  // is empty, we still want to iterate once to emit a single
  // zero-length record
  Status s;
  bool begin = true;
  do {
    const int leftover = kBlockSize - block_offset_;
    assert(leftover >= 0);
    if (leftover < kHeaderSize) {
      // Switch to a new block
      if (leftover > 0) {
        // Fill the trailer (literal below relies on kHeaderSize being 7)
        static_assert(kHeaderSize == 7, "");
        dest_->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover));
      }
      block_offset_ = 0;
    }

    // Invariant: we never leave < kHeaderSize bytes in a block.
    assert(kBlockSize - block_offset_ - kHeaderSize >= 0);

    const size_t avail = kBlockSize - block_offset_ - kHeaderSize;
    const size_t fragment_length = (left < avail) ? left : avail;

    RecordType type;
    const bool end = (left == fragment_length);
    if (begin && end) {
      type = kFullType;
    } else if (begin) {
      type = kFirstType;
    } else if (end) {
      type = kLastType;
    } else {
      type = kMiddleType;
    }

    s = EmitPhysicalRecord(type, ptr, fragment_length);
    ptr += fragment_length;
    left -= fragment_length;
    begin = false;
  } while (s.ok() && left > 0);
  return s;
}

How LevelDB/RocksDB are really interact with our operating system? What happen when we call AddRecord in the above code. Let's have a look at its implementation regarding to POSIX. EmitPhysicalRecord will eventually call the following function on a POSIX.

Status WriteUnbuffered(const char* data, size_t size) {
    while (size > 0) {
      ssize_t write_result = ::write(fd_, data, size);
      if (write_result < 0) {
        if (errno == EINTR) {
          continue;  // Retry
        }
        return PosixError(filename_, errno);
      }
      data += write_result;
      size -= write_result;
    }
    return Status::OK();
  }

Actually all the Append, Flush methods in LevelDB/RocksDB will call this WriteUnbuffered essentially. However, Is this function really reliable?

This function is heavily based on a system function: write(int fildes, const void *buf, size_t nbyte), here is the link regarding to this kernel function.

ssize_t write(int fd, const void *buf, size_t count);

The write() function shall attempt to write nbyte bytes from the buffer pointed to by buf to the file associated with the open file descriptor, fildes.

The explanation from the document is blur, what does it actually mean by "write nbyte bytes"? Where the data actually being written to?

Write function under the hood

Before we step further, we need to understand what our goal is.

let's understand the background that what actually happened when the client sends a data to ImmuxDB/RocksDB/LevelDB:

1: The client sends a write command to the database (data is in client's memory). 2: The database receives the write (data is in server's memory). 3: The database calls the system call that writes the data on disk (data is in the kernel's buffer). 4: The operating system transfers the write buffer to the disk controller (data is in the disk cache). 5: The disk controller actually writes the data into a physical media (a magnetic disk, a Nand chip, ...).

From the steps above, the data is "safe"(it is not 100% guarantee, we don't consider data corruption here), if and only if step 5 can be reached. Step 5 is our goal.

We can summarize that the important stages in data safety are the 3, 4, and 5. The step 3 and 4 is something like a black box for us now, let's put three questions here first.

  1. What actually happen after we call write function.
  2. How often the kernel will flush the buffers to the disk controller?
  3. And how often the disk controller will write data to the physical media?

Answer to Q1:

We use the write system call to transfer data from user space to the kernel buffers(kernel space). I don't want to discuss file position lock or O_APPEND model here, nor concurrent write to the same file descriptor. I will give the following abstraction regarding to write function.

Two scenarios here:

  1. the data we are transferring is less than the kernel's buffer.

In this case, data are just copied from user-space to kernel-space.

  1. the data we are transferring is larger than the kernel's buffer.

If kernel write buffer will reach it's maximum size and the kernel will block our write. When the disk will be able to receive more data, the write system call will finally return.

From above, we don't have much control about how much time this system call will take before returning successfully. But we knows, it is Buffer to Buffer

Answer to Q2:

In this step 4 the kernel transfers data to the disk controller. Linux by default will actually commit writes after 30 seconds. This means that if there is a failure, all the data written in the latest 30 seconds can get potentially lost.

Answer to Q3:

Once the data reaches disk cache, we are out of control! This depends on the drive, I don't know the answer for this.

Since step3, everything is really unreliable. If bad things(I will discuss bad things in the later of this post) happened, data could be lost in any of the steps of 3, 4, 5.

How about fsync

Using fsync the database system has a way to force the kernel to actually commit data on disk, but this is a very expensive operation. I need a further research to compare the fsync performance with the write system call.

fsync (boolean), If this parameter is on, the PostgreSQL server will try to make sure that updates are physically written to disk, by issuing fsync() system calls or various equivalent methods (see wal_sync_method). This ensures that the database cluster can recover to a consistent state after an operating system or hardware crash.

Definition of "bad things"

I mentioned "bad things" above, but I did not give a definition of what are "bad things". Different "bad things" will have a great impact on the reliability of the database.

Bad things are those unexpected event which might cause our database to lost data.

Levels of bad things:

Level 1: Process failure; User-space memory error; Some one hack into our server, and terminate our db process, etc;

Level 2: Power outage; Hard disk error.

These is a great difference between level 1 and level 2 "bad things". The main difference is that If our kernel is still alive or not. We will use this conclusion in the following section.

Put all the dots together

The whole post is about atomicity regarding to our storage engine. However as the more research I did, the more I find out it is not merely a problem of atomicity, it is also a problem of durability as well. To make this clear, again, let's forget the blur definition of atomicity and durability, we read the code, and we get our conclusion.

Let's clear our logic a bit.

  1. When our engine gets a write operation, before the data actually was put into the LSM(skip list) in the engine's main memory, we will call a write system call to record down the current write operation(WAL).

  2. However write system call is not reliable, it first goes from user-space to kernel's buffer, then POXIS is taking care of it since this stage, it will flush our kernel write buffer to disk cache time by time.

  3. disk cache will eventually put the data into our disk track.

  4. If level 1 bad things happened, which means our kernel is still alive, till the point that the last time we call write system call, those bytes have been copied to kernel's buffer are still able to write to the disk. Then in this case, when our db process is rebooted, we are able to recover those data, Which has been already put into the skip list in RocksDB/LevelDB, but yet to be written to Level 0 file.

  5. If level 2 bad things happened, which means our kernel is dead, all the write buffer is lost, and our disk cache is not working, then even the write function is returned successfully, our data might be lost already.

Let's put everything together. From the above description, we know that, WAL itself is not reliable(When Level2 bad things happened). If our fault tolerance mechanism is not reliable, then how can we prove that our db is reliable? Why we still need the WAL? I think the design philosophy behind this is that, WAL at least can prevent Level 1 bad things, meanwhile level 2 bad things is out of control. However, there is one kind of opinion is that, as a database developer, we should be clear about the boundary of our duty, once I give to OS, the rest of the work is none of my business. I don't think this is a good excuse to ignore what actually happen under the hood of storage engine.

So, now it is not even a problem of "atomic" but also a problem of "durability". Because our data is not 100% guarantee to reach the disk at certain situation. This is decided by the hardware design and the OS design as well. Hardware and OS are all trying to sacrificed reliability for performance.

Let's go back to our issue in ImmuxDB, at the very beginning I point out 'out of sync' write issue in our db. We can not fully solve this issue, since how OS and hardware works, but I think we still can combine saving the target's key and target's value with saving instruction record into one batch_write for under level storage engine. In this way, at least we can make sure that the operation of saving the target's key and target's value and saving instruction record are actually one write operation for our storage engine. If level 1 bad things happened, the instruction and the target's key and value can be recover at the same time, and level 2 bad things happened, these two piece of information are lost together.