// ┌───────────┬──────────┬────────────┬──────────────┐
// │ type <1b> │ len <2b> │ CRC32 <4b> │ data <bytes> │
// └───────────┴──────────┴────────────┴──────────────┘
func (r *Reader) next() (err error) {
// We have to use r.buf since allocating byte arrays here fails escape
// analysis and ends up on the heap, even though it seemingly should not.
hdr := r.buf[:recordHeaderSize]
// 数据部分
buf := r.buf[recordHeaderSize:]
// 清空
r.rec = r.rec[:0]
// snappy 算法
r.snappyBuf = r.snappyBuf[:0]
i := 0
for {
// 获取第一个字节
if _, err = io.ReadFull(r.rdr, hdr[:1]); err != nil {
return errors.Wrap(err, "read first header byte")
}
r.total++
r.curRecTyp = recTypeFromHeader(hdr[0])
// 是否被压缩
compressed := hdr[0]&snappyMask != 0
// Gobble up zero bytes.
if r.curRecTyp == recPageTerm {
// recPageTerm is a single byte that indicates the rest of the page is padded.
// If it's the first byte in a page, buf is too small and
// needs to be resized to fit pageSize-1 bytes.
buf = r.buf[1:]
// We are pedantic and check whether the zeros are actually up
// to a page boundary.
// It's not strictly necessary but may catch sketchy state early.
k := pageSize - (r.total % pageSize)
if k == pageSize {
continue // Initial 0 byte was last page byte.
}
n, err := io.ReadFull(r.rdr, buf[:k])
if err != nil {
return errors.Wrap(err, "read remaining zeros")
}
r.total += int64(n)
for _, c := range buf[:k] {
if c != 0 {
return errors.New("unexpected non-zero byte in padded page")
}
}
continue
}
// 剩下的部分
n, err := io.ReadFull(r.rdr, hdr[1:])
if err != nil {
return errors.Wrap(err, "read remaining header")
}
// 整个读取真正的数据
r.total += int64(n)
var (
// 长度,2 个字节
length = binary.BigEndian.Uint16(hdr[1:])
// crc, 4 个字节
crc = binary.BigEndian.Uint32(hdr[3:])
)
// 记录的 record 大于 一页
if length > pageSize-recordHeaderSize {
return errors.Errorf("invalid record size %d", length)
}
// 读取数据部分
n, err = io.ReadFull(r.rdr, buf[:length])
if err != nil {
return err
}
// 真正读取部分
r.total += int64(n)
if n != int(length) {
return errors.Errorf("invalid size: expected %d, got %d", length, n)
}
// 计算数据的 hash 值
if c := crc32.Checksum(buf[:length], castagnoliTable); c != crc {
return errors.Errorf("unexpected checksum %x, expected %x", c, crc)
}
if compressed {
// 压缩过
r.snappyBuf = append(r.snappyBuf, buf[:length]...)
} else {
// 数据部分
r.rec = append(r.rec, buf[:length]...)
}
// 开始验证数据部分
// 数据 type 的类型
if err := validateRecord(r.curRecTyp, i); err != nil {
return err
}
if r.curRecTyp == recLast || r.curRecTyp == recFull {
if compressed && len(r.snappyBuf) > 0 {
// The snappy library uses `len` to calculate if we need a new buffer.
// In order to allocate as few buffers as possible make the length
// equal to the capacity.
r.rec = r.rec[:cap(r.rec)]
r.rec, err = snappy.Decode(r.rec, r.snappyBuf)
return err
}
return nil
}
// Only increment i for non-zero records since we use it
// to determine valid content record sequences.
i++
}
}
包含头信息和数据信息,头中包含了类别,长度 crc 校验和。
注意一个 WAL 文件,默认是128MB,也就是一个 segment 的大小,
一条 WAL record 记录是 32KB。
在存 record 的记录的时候,不足 32KB 的记录按照一条来存,多余 record 的记录,那就分多页,这多个页,
包含的头 type 也不同,具体的 type 取值为:
0: rest of page will be empty
1: a full record encoded in a single fragment
2: first fragment of a record
3: middle fragment of a record
4: final fragment of a record
recType
const (
recPageTerm recType = 0 // Rest of page is empty.
recFull recType = 1 // Full record.
recFirst recType = 2 // First fragment of a record.
recMiddle recType = 3 // Middle fragments of a record.
recLast recType = 4 // Final fragment of a record.
)
类别,然后是 series 的 id 号,接着是该 series 对应的 label 的数量,最后是 label 的 name 和 value。
其中 type 的可能取值为:
// Unknown is returned for unrecognised WAL record types.
Unknown Type = 255
// Series is used to match WAL records of type Series.
Series Type = 1
// Samples is used to match WAL records of type Samples.
Samples Type = 2
// Tombstones is used to match WAL records of type Tombstones.
Tombstones Type = 3
// Exemplars is used to match WAL records of type Exemplars.
Exemplars Type = 4
其中,如果有多条,那么接下来的还是有 id, label 等同样的数据结构
func (d *Decoder) Series(rec []byte, series []RefSeries) ([]RefSeries, error) {
dec := encoding.Decbuf{B: rec}
// record 类型不是 series
if Type(dec.Byte()) != Series {
return nil, errors.New("invalid record type")
}
for len(dec.B) > 0 && dec.Err() == nil {
// 获取该 series 的 id
ref := storage.SeriesRef(dec.Be64())
lset := make(labels.Labels, dec.Uvarint())
for i := range lset {
lset[i].Name = dec.UvarintStr()
lset[i].Value = dec.UvarintStr()
}
sort.Sort(lset)
series = append(series, RefSeries{
Ref: chunks.HeadSeriesRef(ref),
Labels: lset,
})
}
if dec.Err() != nil {
return nil, dec.Err()
}
// 还剩余数据
if len(dec.B) > 0 {
return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.B))
}
return series, nil
}
// Series appends the encoded series to b and returns the resulting slice.
func (e *Encoder) Series(series []RefSeries, b []byte) []byte {
// buffer
buf := encoding.Encbuf{B: b}
// 设置 type
buf.PutByte(byte(Series))
for _, s := range series {
// 设置 series id
buf.PutBE64(uint64(s.Ref))
buf.PutUvarint(len(s.Labels))
for _, l := range s.Labels {
buf.PutUvarintStr(l.Name)
buf.PutUvarintStr(l.Value)
}
}
// 获取序列化后的 byte
return buf.Get()
}
WAL disk 磁盘格式
┌───────────┬──────────┬────────────┬──────────────┐
│ type <1b> │ len <2b> │ CRC32 <4b> │ data <bytes> │
└───────────┴──────────┴────────────┴──────────────┘
参考资料
chunk
读 WAL 文件
0
: rest of page will be empty1
: a full record encoded in a single fragment2
: first fragment of a record3
: middle fragment of a record4
: final fragment of a recordrecType
series 类型的 record
其中,如果有多条,那么接下来的还是有 id, label 等同样的数据结构
WAL disk 磁盘格式
index 文件格式
部分都以 len 字段开始
。symbol table
series
格式
label index
posting
posing offset table
TOC
type/mysql #type/golang #public