GreptimeTeam / bunshin

A distributed Write-Ahead log implementation for cloud
8 stars 0 forks source link

Distributed logging for cloud-native databases #2

Open v0y4g3r opened 1 year ago

v0y4g3r commented 1 year ago

Background

Shared-data architecture binds dedicated local storage to every computing node, which has good performance for small IO operations like continous insertions. But, the replication and failover of shared-data style systems is complicated and error-prone. Shared-nothing architecture, on the other hand, benifits from the inherent replication and scalibility of cloud storages like S3, but the write performance may degrade when handling continous IO due the overhead of cloud storage operations.

Thus we need a distributed WAL to bridge the gap between shared-data and shared-nothing. Data inserted first go to this WAL to ensure durability and then apply to datanode. Insertions are buffered on datanode's local storage and write to cloud storage at a time to increase throughput. The data buffered on datanode is transient. Once datanode crashes, those data can be easily recovered from WAL.

Design goals

Proposal

Some systems like RedPanda introduces Raft to replicate logs across nodes. But using such consensus algorithm for each log record is quite expensive. On the premise that GreptimeDB already has a metasrv to coordinate datanodes, we can use quorum-base replication on data plane and leave consensus to metasrv.

Quorum-based replication

Quorum-base systems read from and write to a subset of copies of data, namely read set Vr and write set Vw respectively. To ensure consistency, Vr and Vw must overlap at least one node so that when data is retrived, at least one node at Vr contains the latest version of value.

Quorum-base replication is widely used on existing database systems like Amazon Aurora and Amazon DynamoDB.

Concepts

Quorum, ensemble and data striping

In Bunshin, ensemble is the pool of all available nodes, while quorum is a subset of ensemble that a log entry is actually written to. When ensemble size > quorum size, adjacent entries will be written to different quorum, which is called "data striping". image

Entry and sequence

Entry is the basic unit of log. Entry has a monotonic increasing sequence number generated by writer, Bunshin ensures that once a sequence number is acked, then all entries with lower sequence numbers are also acked.

Stream and segment

In Bunshin, an unbounded log entry sequence is called a "stream". A stream can have only one writer but many readers. Unbounded data structure is hard to deal with, so we partition the stream into "segment"s. Segment is a batch of log entries that accepts append operations only. Once a segment is closed (or sealed), it's immutable and cannot be reopened for writing. Since segments are immutable once closed, we can easily migrate it to other storage systems or simply delete it when it's safe to reclaim disk spaces.

Chunks

Nodes in quorum may fail, permanently or transiently. When writer finds some node in write quorum is unable to handle the request, instead of just hangs there and waits, it intiates a write quorum change and picks another node to write. A sequence of log entries in a segment that are written to the same write quorum is logically called a "chunk". Different chunks in a segment may consist of different quorum nodes, but the quorum size must be the same.

Protocol

Write

Writer opens a stream by creating a segment metadata in metasrv. To write an entry, it chooses N nodes to form the write quorum and sends write request in parallel to these nodes. In order to successfully replicate entries, writer waits ACK response from up to K nodes. Here K is an configurable option for each stream. Increasing K brings higher reliability, but may also result into spiking latency.

When writer fails to write to some node in quorum, it initiates a quorum change and open a new chunk with the new write quorum.

Read

Since every entry carries a sequence number, we can simplify the quorum read to a read from single node. Readers reads entries by entry id. As reader already knows the ensemble of a fragment, it can infer the write quorum of that entry and sends read request to any of those nodes to fetch the content of the entry.

Recovery and repairing

TBD

Metadata storage

Segment meta and chunk meta

TBD

Exclusive writer and epoch

TBD

Underlying Storage

Bunshin uses RocksDB as entry storage, we won't focus on writing an append-only storage from scratch since RocksDB provides a relatively good batch write performance while point query is also fast.

evenyag commented 1 year ago

What about putting this into a design-docs directory?

sunng87 commented 1 year ago

How about providing support for configurable consistency level: https://docs.datastax.com/en/cassandra-oss/3.0/cassandra/dml/dmlConfigConsistency.html

It allows user to trade consistency for performance if they have large amount of data and each data point is not that critical to lose.

v0y4g3r commented 1 year ago

What about putting this into a design-docs directory?

Will move to design doc when it's finished.

v0y4g3r commented 1 year ago

How about providing support for configurable consistency level: https://docs.datastax.com/en/cassandra-oss/3.0/cassandra/dml/dmlConfigConsistency.html

It allows user to trade consistency for performance if they have large amount of data and each data point is not that critical to lose.

In quorum-write pattern, configurable consistency level can be easily implemented using client-side option ACK_QUORUM, which means client confirms write as soon as it has received given number of ACKs. Thta's exactly how cassandra and Kafka implement this feature.

evenyag commented 1 year ago

The design looks similar to bookkeeper. I'd recommend adding a reference or related works part.

Will move to design doc when it's finished. Maybe you could open a pull request for this (like an RFC), which should be easier to review.

evenyag commented 1 year ago

A sequence of log entries in a segment that are written to the same write quorum is logically called a "chunk"

The naming of the chunk is a bit confusing with the segment.

killme2008 commented 1 year ago

A sequence of log entries in a segment that are written to the same write quorum is logically called a "chunk"

The naming of the chunk is a bit confusing with the segment.

Agree. Maybe a graph to describe these concepts is better.

v0y4g3r commented 1 year ago

The design looks similar to bookkeeper. I'd recommend adding a reference or related works part.

Will move to design doc when it's finished. Maybe you could open a pull request for this (like an RFC), which should be easier to review.

Quorum systems almost look the same, and we have to somehow simplify the design of bookkeeper, since the operation and maintainence bring lot of pain in real-world production environment.

tisonkun commented 6 months ago

If we build such a quorum read/write system on cloud, it will require stateful set to store multiple replications. That would cause a dependency to PV or similar things.

Instead, projects like AutoMQ builds WAL over single AZ EBS (perhaps Azure Storage supports cross-region replication out-of-the-box).

I doubt a BookKeeper alike system does not leverage cloud storage and introduces unnecessary complexity.

What if we read/write to a block service on cloud as a WAL? IIRC we can tolerate some data loss in WAL. And AWS EBS is 99.999% reliable; maybe even better than operating a multi-server quorum system with stateful set and PV.

WDYT?

v0y4g3r commented 6 months ago

You're correct. That's why we are still revisiting the design.

That would cause a dependency to PV or similar things.

That's not the main concern. Persistent Volumes are just the abstraction of durable storage like EBS in K8s.

The problems of BookKeeper-like quorum systems are:

Despite it's quorum or not, we still need a standalone WAL service. It serves not only for GreptimeDB components as WAL, but also as a channel that streams database events to faciliate functions like CDC and incremental materialized views.