segmentio / kafka-go

Kafka library in Go
MIT License
7.65k stars 792 forks source link

Significant load on Kafka Cluster when consuming from (almost) empty topics #313

Closed complex64 closed 3 years ago

complex64 commented 5 years ago

We are running into some weird behavior with kafka-go and would appreciate your help with the following issue.

Thanks in advance!


Describe the bug

Kafka Version

To Reproduce

func (s *S) readAndScheduleLoop() error {
    // ...
    for {
        err := s.fetchAndSchedule()
        if err != nil && err != io.EOF {
            return err
        }
        if err == nil {
            // reset exponential back-off
            continue
        }
        // io.EOF -> At HEAD, we retry with exponential back-off here.
    }
}

func (s *S) fetchAndSchedule() error {
    // s.reader is a *kafka.Reader
    msg, err := s.reader.FetchMessage(s.self)
    if err != nil {
        return err
    }
    // doing something with msg here.
    return nil
}

Expected behavior

Additional context

complex64 commented 5 years ago

I suspect the following chain of events (somewhat supported by running pprof):

  1. run is called and attempts is 0: https://github.com/segmentio/kafka-go/blob/1aea3407772b14fc9e7f04655be7d5c3a54e0a3f/reader.go#L1764
  2. initialize is called, no error is returned, we enter the nil case with implicit break: https://github.com/segmentio/kafka-go/blob/1aea3407772b14fc9e7f04655be7d5c3a54e0a3f/reader.go#L1775
  3. attempt is set "back" to 0: https://github.com/segmentio/kafka-go/blob/1aea3407772b14fc9e7f04655be7d5c3a54e0a3f/reader.go#L1801-L1804
  4. readLoop is entered and we end up in the case that matches the error message: https://github.com/segmentio/kafka-go/blob/1aea3407772b14fc9e7f04655be7d5c3a54e0a3f/reader.go#L1895-L1905
  5. break on the read loop brings us back to 1.) Just that attempt == 1 and we'll sleep backoffDelayMin, which in our case is the default of 100ms

We'd end up with 60 dials/s in this scenario, could the number of partitions account for >= 600/s? I.e. is the code above run for each partition?

I'll try to confirm my suspicion by setting the default to a higher value than the default tomorrow morning and then report back to you folks.

achille-roussel commented 5 years ago

@complex64 Thanks a lot of the detailed bug report, we're gonna look into this as soon as possible 👍

complex64 commented 5 years ago

I'll try to confirm my suspicion by setting the default to a higher value than the default tomorrow morning and then report back to you folks.

Setting the back-off to a higher value (2s) in fact reduces the load!

Do you see a path forward, where the back-off is applied to a topic, different from per-partition? Or should EOF be handled differently?

stevevls commented 5 years ago

Hi @complex64. By any chance do you have logs from the brokers? When a broker gets a request that it doesn't understand, its behavior is to close the connection. That manifests on the client side as an EOF. Since it looks like you're getting the EOFs immediately, it makes me suspect that something on the wire isn't quite right and the broker is closing the connection.

complex64 commented 5 years ago

Silence so far. I'll look into log levels tomorrow morning to see if we can get more introspection as to what is happening.

stevevls commented 5 years ago

Hello again. I was able to reproduce some of this behavior today, and I believe I tracked down at least part of the issue. I think in the case of empty batches, we're erroneously returning an io.EOF when the max wait time is low (e.g. < 1 second). If you get a moment, could you take #315 for a spin and see if it helps with your problem? I think that it should help by keeping the connections to the broker alive when there are no new messages to consume. Thanks!

complex64 commented 5 years ago

@stevevls, thanks for the suggestion, unfortunately I haven't had time yet to try it out. Might be able to get it in this week though.

Thanks for your help so far!

paulqin commented 5 years ago

@complex64, @achille-roussel

I also dicover this problem this week when we use kafka-go in our product. error log:
2019/07/30 21:46:42 the kafka reader got an unknown error reading partition 0 of sink_19 at offset 21433436: EOF
2019/07/30 21:47:00 the kafka reader got an unknown error reading partition 1 of sink_19 at offset 21433437: EOF
2019/07/30 21:47:00 the kafka reader got an unknown error reading partition 0 of sink_19 at offset 21433436: EOF
2019/07/30 21:47:09 the kafka reader got an unknown error reading partition 1 of sink_19 at offset 21433437: EOF

I have change the MaxWait parameter, and it did not work. then I tcpdump the packet between kafka-go and the kafak server, and debug the code(kafka-go) a lot of times. I found that when the topic is idle, this problem will happen.

this is an interation when the kafka-go fetch request with fetchVersion=v10 kafka-request:

00000200  00 00 00 92 00 01 00 0a  00 00 00 06 00 3e 74 6c       
00000210  69 6e 6b 5f 63 6b 76 5f  63 6f 6e 73 75 6d 65 72       
00000220  40 68 61 64 6f 6f 70 6e  6f 64 65 31 20 28 67 69       
00000230  74 68 75 62 2e 63 6f 6d  2f 73 65 67 6d 65 6e 74      
00000240  69 6f 2f 6b 61 66 6b 61  2d 67 6f 29 ff ff ff ff       
00000250  00 00 23 27 00 00 00 01  00 a0 00 41 00 00 00 00     
00000260  00 ff ff ff ff 00 00 00  01 00 07 73 69 6e 6b 5f      
00000270  32 39 00 00 00 01 00 00  00 00 ff ff ff ff 00 00       
00000280  00 00 00 00 00 00 00 00  00 00 00 00 00 00 00 a0      
00000290  00 41 00 00 00 00                                    

kafka-server respond:

    000001D8  00 00 00 45 00 00 00 06                              
    000001E0  00 00 00 00 00 00 00 00  00 00 00 00 00 01            
    000001EE  00 07 73 69 6e 6b 5f 32  39 00 00 00 01               
    000001FB  00 00 00 00 00 00 00 00  00 00 00 00 00 00 00 00      
    0000020B  00 00 00 00 00 00 00 00  00 00 00 00 00 00 ff ff     
    0000021B  ff ff 00 00 00 00                                   

Let's go to the code:

func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, error) {
    r.stats.fetches.observe(1)
    r.stats.offset.observe(offset)

    t0 := time.Now()
    conn.SetReadDeadline(t0.Add(r.maxWait))

    batch := conn.ReadBatchWith(ReadBatchConfig{
        MinBytes:       r.minBytes,
        MaxBytes:       r.maxBytes,
        IsolationLevel: r.isolationLevel,
    })
    highWaterMark := batch.HighWaterMark()

    t1 := time.Now()
    r.stats.waitTime.observeDuration(t1.Sub(t0))

    var msg Message
    var err error
    var size int64
    var bytes int64

    const safetyTimeout = 10 * time.Second
    deadline := time.Now().Add(safetyTimeout)
    conn.SetReadDeadline(deadline)

    for {
        if now := time.Now(); deadline.Sub(now) < (safetyTimeout / 2) {
            deadline = now.Add(safetyTimeout)
            conn.SetReadDeadline(deadline)
        }

        if msg, err = batch.ReadMessage(); err != nil {
            batch.Close()
            break
        }

when the function conn.ReadBatchWith finished. all the data respond from kafka server had been read(topic has no new data to consum) but in batch.ReadMessage(), it will try to read more data to create a new message, so the error happen.

in this senario, I advice we need to identify this condition and do some special process and return directly to avoid this probliem. Also, I try to modify the code lile this:

func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, error) {
    r.stats.fetches.observe(1)
    r.stats.offset.observe(offset)

    t0 := time.Now()
    conn.SetReadDeadline(t0.Add(r.maxWait))

    batch := conn.ReadBatchWith(ReadBatchConfig{
        MinBytes:       r.minBytes,
        MaxBytes:       r.maxBytes,
        IsolationLevel: r.isolationLevel,
    })

    if batch.Err() == nil && batch.msgs.empty && batch.msgs.remaining() <= 0 {
        return batch.Offset(), nil
    }

I check the idle topic condition and return directly, this problem did not appeal again. may be this is not a comprehensive modification, we need to get more detail in this context, but I just share my discovery to the others

I think this a serious bug, we'd better repair it as quickly as possible. because and EOF error will cause the kafka-go close the current good connection, and reconect to kafka server again。 it will make a lot of TIME_WAIT tcp socket and waste the system resource

complex64 commented 5 years ago

Hello again. I was able to reproduce some of this behavior today, and I believe I tracked down at least part of the issue. I think in the case of empty batches, we're erroneously returning an io.EOF when the max wait time is low (e.g. < 1 second). If you get a moment, could you take #315 for a spin and see if it helps with your problem? I think that it should help by keeping the connections to the broker alive when there are no new messages to consume. Thanks!

It helped somewhat, throughput and load dropped.

Additionally we now have load on the system and still see hundreds (600+) of connection attempts per second from a total of 24 clients and about 1k messages per second across those 24 clients.

stevevls commented 5 years ago

I'm glad that #315 helped some. I was not convinced it was going to be a magic bullet, but I did hope that it would improve the situation. I'm going to dig in a bit on the info that @paulqin posted to see what I can find.

stevevls commented 5 years ago

@paulqin I decoded the response that you posted, and it looks like you were requesting a topic that does not exist. The response contained 0 for the number of partition responses. I was able to reproduce that kind of request when creating a kafka.Reader configured with a non-existent topic. It's not great that we reset the connection in the case, but I also don't think that requesting a missing topic is a case that we really need to optimize for. ;)

stevevls commented 4 years ago

Picking this issue back up, I think that https://github.com/segmentio/kafka-go/pull/409 will fix the issue with the constant reconnects.

achille-roussel commented 3 years ago

Closing since we #409 should have solved the issue. Feel free to reopen if you are still experiencing the issue 👍