elastic / beats

:tropical_fish: Beats - Lightweight shippers for Elasticsearch & Logstash
https://www.elastic.co/products/beats
Other
12.15k stars 4.91k forks source link

A possible bug about Kafka output ErrBreakerOpen #35755

Open Ferrari248 opened 1 year ago

Ferrari248 commented 1 year ago

See at [https://discuss.elastic.co/t/a-possible-bug-about-kafka-output-errbreakeropen/335816]()

I find an issue which is possibly a bug in kafka output of FIleBeat8.7:

libbeat/outputs/kafka/client.go:

func (r *msgRef) fail(msg *message, err error) {
    switch err {
    case sarama.ErrInvalidMessage:
        ...
    case sarama.ErrMessageSizeTooLarge, sarama.ErrInvalidMessageSize:
        ...
    case breaker.ErrBreakerOpen:
        // Add this message to the failed list, but don't overwrite r.err since
        // all the breaker error means is "there were a lot of other errors".
        r.failed = append(r.failed, msg.data)

    default:
        r.failed = append(r.failed, msg.data)
        if r.err == nil {
            // Don't overwrite an existing error. This way at tne end of the batch
            // we report the first error that we saw, rather than the last one.
            r.err = err
        }
    }
    r.dec()
}

func (r *msgRef) dec() {
    ...
    err := r.err
    if err != nil {
        failed := len(r.failed)
        success := r.total - failed
        r.batch.RetryEvents(r.failed)

        stats.Failed(failed)
        if success > 0 {
            stats.Acked(success)
        }

        r.client.log.Debugf("Kafka publish failed with: %+v", err)
    } else {
        r.batch.ACK()
        stats.Acked(r.total)
    }
}

That means when got an ErrBreakerOpen error, the msgRef.err will not be set value. Then r.batch.ACK() will be executed. That will cause to: log records are not collected but file offset is already changed.

I show an example: Firstly, prepare Kafka environment on localhost correctly and then config the filebeat.yml:

output.kafka:
  hosts: ["localhost:9093"]
  username: "xxxx"
  password: "yyyy"
  topic: 'test'
  sasl.mechanism: SCRAM-SHA-512
  # ssl.verification_mode: none

FileBeat works well, the log records are collected to Kafka.

Secondly, change the config:

output.kafka:
  hosts: ["localhost:9093"]
  username: "xxxx"
  password: "yyyy"
  topic: 'test'
  sasl.mechanism: SCRAM-SHA-512
  ssl.verification_mode: none

My Kafka 9093 port doesn't support SSL protocal, and I config an SSL setting on purpose to let it work incorrectly. Then it does't work well as we expected. The new log records are not collected to kafka. But, we can see from data/registry/filebeat/log.json that the offset has gone forward... So I think it is possibly a bug.

I think msgRef.err should be set value when ErrBreakerOpen, As a possible solution:

func (r *msgRef) fail(msg *message, err error) {
    switch err {
    case sarama.ErrInvalidMessage:
        ...
    case sarama.ErrMessageSizeTooLarge, sarama.ErrInvalidMessageSize:
        ...
    case breaker.ErrBreakerOpen:
        // Add this message to the failed list, but don't overwrite r.err since
        // all the breaker error means is "there were a lot of other errors".
        r.failed = append(r.failed, msg.data)
        if r.err == nil {
            // Don't overwrite an existing error. This way at tne end of the batch
            // we report the first error that we saw, rather than the last one.
            r.err = err
        }

    default:
        r.failed = append(r.failed, msg.data)
        if r.err == nil {
            // Don't overwrite an existing error. This way at tne end of the batch
            // we report the first error that we saw, rather than the last one.
            r.err = err
        }
    }
    r.dec()
}
botelastic[bot] commented 1 year ago

This issue doesn't have a Team:<team> label.

monotone commented 1 year ago

I found this too.
It will cause log not to be published actually, but registry recorded the offset.

may be it is easy to solution by remove the case breaker.ErrBreakerOpen, just use default case to handle it.

botelastic[bot] commented 2 months ago

Hi! We just realized that we haven't looked into this issue in a while. We're sorry!

We're labeling this issue as Stale to make it hit our filters and make sure we get back to it as soon as possible. In the meantime, it'd be extremely helpful if you could take a look at it as well and confirm its relevance. A simple comment with a nice emoji will be enough :+1. Thank you for your contribution!