travisjeffery / jocko

Kafka implemented in Golang with built-in coordination (No ZK dep, single binary install, Cloud Native)
https://twitter.com/travisjeffery
MIT License
4.92k stars 360 forks source link

Log truncation and committing #123

Open tylertreat opened 6 years ago

tylertreat commented 6 years ago

My understanding of Kafka is that a follower truncates its log up to the high watermark on recovery to remove any potentially uncommitted messages (this might no longer be the case with the leader epoch changes).

https://github.com/travisjeffery/jocko/blob/e2a8d10c648b5a558711dcfa20e166c4ad2d7c73/jocko/broker.go#L1166-L1169

However, the implementation of Truncate here looks like it deletes messages from the oldest offset up to the given offset rather than from the newest.

func (l *CommitLog) Truncate(offset int64) error {
    l.mu.Lock()
    defer l.mu.Unlock()
    var segments []*Segment
    for _, segment := range l.segments {
        if segment.BaseOffset < offset {
            if err := segment.Delete(); err != nil {
                return err
            }
        } else {
            segments = append(segments, segment)
        }
    }
    l.segments = segments
    return nil
}

From looking through some of the code, it looks like Jocko isn't implementing the normal ISR commit "flow", i.e. in Kafka when a message is written to the log, it's not committed until the ISR has replicated.

Is this truncation in Jocko serving a different purpose, am I misunderstanding Kafka, or am I just misreading the code?

Thanks!

kempjeng commented 6 years ago

From what I understand, you're talking about the WAL, which as you note is used to remove uncommitted entries.

I believe this is message log truncation so that you don't fill up your hdd and crash.

travisjeffery commented 6 years ago

Yeah @kempjeng has it, the recovery may not match up 100% yet and the replication I'm working on now. But we need that truncation is needed to periodically remove old segments.

tylertreat commented 6 years ago

Isn't the Cleaner interface responsible for compacting/truncating the log for purposes of preventing unbounded disk usage, i.e. DeleteCleaner? It appears this is what uses MaxLogBytes to control the log size: https://github.com/travisjeffery/jocko/blob/e2a8d10c648b5a558711dcfa20e166c4ad2d7c73/commitlog/commitlog.go#L56

The problem I see is that upon becoming a follower, the broker is deleting all of its log messages because it truncates to its newest offset starting from the beginning of the log.

hw := replica.Log.NewestOffset()
if err := replica.Log.Truncate(hw); err != nil {
    return protocol.ErrUnknown.WithErr(err)
}

This happens inside becomeFollower, so it seemed like the intention was to mimic what Kafka does here which is truncating uncommitted messages from the log.

travisjeffery commented 6 years ago

Ahhh. Yes we need to truncate from the head here rather than truncate from the tail.