confluentinc / confluent-kafka-go

Confluent's Apache Kafka Golang client
Apache License 2.0
4.59k stars 652 forks source link

Local: Fatal error - For Messages Sent From Idempotent Producer #830

Open pchang388 opened 2 years ago

pchang388 commented 2 years ago

Description

When sending data to kafka using an async, idempotent producer, we often see delivery report channel receive errors only showing:

"Local: Fatal error"

Our delivery report logging looks like:

func (p *KafkaProducer) Run(doneChan chan bool) {
    defer close(doneChan)

    // Delivery report handler for produced messages
    go func() {
        for e := range p.Producer.Events() {
            switch ev := e.(type) {
            case *kafka.Message:
                msg := ev
                if msg.TopicPartition.Error != nil {
                    log.WithError(msg.TopicPartition.Error).Error("Data delivery to kafka failed")
                } /*else {
                    log.Infof("Data delivered successfully to (%s) [%d]. Offset %v\n",
                        *msg.TopicPartition.Topic, msg.TopicPartition.Partition, msg.TopicPartition.Offset)
                }*/
            default:
                log.WithFields(log.Fields{
                    "event": ev,
                }).Info("Ignore event")
            }
        }
    }()
}

I did a little digging and this is just a generic error that the underlying librdkafka gives: https://docs.confluent.io/5.5.1/clients/librdkafka/rdkafka_8h.html#a44c976534da6f3877cc514826c71607c. But it's not clear on what that exactly means:

RD_KAFKA_RESP_ERR__FATAL | Fatal error: see rd_kafka_fatal_error() -- | --

Has anyone seen this specific error before and perhaps any helpful information? In the meantime, I've added debug attribute to the ConfigMap to see if we can spot the issue but it hasn't started happening again yet.

I do not see anything in the Broker logs at an INFO level that but I may be looking for the wrong thing.

See comment: https://github.com/confluentinc/confluent-kafka-go/issues/830#issuecomment-1208442027 on why this is happening to us

How to reproduce

For us, this seems to happen over time, we do put a large volume of messages in but I'm sure others do more. It doesn't seem to happen at first but gradually ends up in that state at times. Unsure at the moment on how to exactly trigger this error.

See comment: https://github.com/confluentinc/confluent-kafka-go/issues/830#issuecomment-1208442027 on how to reproduce.

Checklist

Please provide the following information:

%7|1659595801.133|IDEMPSTATE|rdkafka#producer-1| [thrd:KAFKA_BROKER_2:9092/bootst]: Idempotent producer state change DrainBump -> FatalError


 - [x] Provide broker log excerpts - nothing showing on info level
 - [x] Critical issue
mhowlett commented 2 years ago

Sometimes the producer seems to recover

the producer should never recover from a fatal error.

Is the cluster stable? There is a known issue whereby if a broker is rolled, the producer can get into this state (KIP-360 implementation isn't quite right).

pchang388 commented 2 years ago

Sometimes the producer seems to recover

the producer should never recover from a fatal error.

Is the cluster stable? There is a known issue whereby if a broker is rolled, the producer can get into this state (KIP-360 implementation isn't quite right).

Thanks @mhowlett for the correction, I have not yet updated this ticket (I will edit my original comment to reflect what you've corrected as well to not confuse future readers) with my latest findings and it was created at first before I did a deeper dive and yes what you said appears correct:

Fatal errors: While the underlying idempotent producer will typically only raise fatal errors for unrecoverable cluster errors where the idempotency guarantees can't be maintained, most of these are treated as abortable by the transactional producer since transactions may be aborted and retried in their entirety; The transactional producer on the other hand introduces a set of additional fatal errors which the application needs to handle by shutting down the producer and terminate. There is no way for a producer instance to recover from fatal errors. Whether an error is fatal or not is detected by calling err.(kafka.Error).IsFatal() on the returned error object or by checking the global GetFatalError().

Reference: https://docs.confluent.io/platform/current/clients/confluent-kafka-go/index.html

And yes this is due to Kafka broker nodes being restarted (due to infrastructure issues, usually they aren't all restarted at the same time, just a node at a time)

pchang388 commented 2 years ago

What I've found so far on our side

Right before the errors start to appear, our debug producer logs showed the following Broker out of Sequence errors:

%0|1659595801.132|FATAL|rdkafka#producer-1| [thrd:KAFKA_BROKER_2:9092/bootst]: Fatal error: Broker: Broker received an out of order sequence number: ProduceRequest for REDACT [0] with 9 message(s) failed due to sequence desynchronization with broker 2 (PID{Id:155002,Epoch:0}, base seq 110234700, idemp state change 52807639ms ago, last partition error NOT_LEADER_FOR_PARTITION (actions Refresh,MsgNotPersisted, base seq 110234700..110234708, base msgid 110234701, 127ms ago)
%7|1659595801.133|IDEMPSTATE|rdkafka#producer-1| [thrd:KAFKA_BROKER_2:9092/bootst]: Idempotent producer state change DrainBump -> FatalError

Based off my reading/ideas of Kafka, this is what appears to be happening:

  1. 3 Kafka broker nodes with producers continually producing messages
  2. 1 Kafka broker goes down (due to node infrastructure issue)
  3. Kafka does a reassignment of Partition leaders (Producers have to be able to write to leaders and they replicate to followers X times based off replication factor)
  4. Producers are able to continue to write with only 2 nodes
  5. The down Kafka broker comes back up a few min later on another node (K8 auto restarts failed pods)
  6. Broker rejoins the cluster and now since we use Idempotent Kafka Producer (it allows exactly-once delivery because it uses a Producer Id assigned by Broker when producer connects and sequence number - assigned to message by Producer when it tries to send - to identify possible duplicates for retried/failed messages), the sequence number no longer matches on the Partition Leader on the Node that just came back up. The other nodes that took over have the previous sequence numbers but not the node that just came back up and took leader again
  7. Idempotent Producer is now running in an failed state since it's unable to guarantee the exactly-once delivery like it's supposed to due to the sequence number not being +1 increment

In our case, we did see the producers still push some messages for some topics during the "Local: fatal error". Most likely due to the downed node that came back up not being a leader for all Topics only some.

This is something that other users will also experience since the actual error comes from the C lib used by confluent-kafka:

pchang388 commented 2 years ago

@mhowlett thanks for the response btw,

I have a few ideas on how we can handle this on our side (in the end, we need to ensure no data loss and producers are restarted) but was wondering how this community is generally handling this scenario for a constant data stream application use case? Do you happen to have any known workarounds/fixes while ensuring no messages are lost? Or are most users avoiding Idempotent producers because of this issue or switching to synchronous idempotent producer to catch and resolve the issue instead at the potential cost of lower throughput? Or using a dlq implementation with a different producer?

We are working on migrating our kafka brokers to a more stable infrastructure/compute, but brokers going down happen regardless of where you run things. We would still need a way to deal with this

mhowlett commented 2 years ago

you'd need to consume from the topic to see what's there. i don't know off the top of my head, but you might need to read the last [max in flight] messages to be sure of what was actually persisted. using idempotent producer is recommended.

i'm actually wondering if librdkafka is failing in a scenario where it doesn't need to here. i'm looking into another bug atm where i roll a broker, and see the same issue (leading to another issue with transactions). on recreating the same setup with the java producer, and i'm not seeing an out of order sequence error.

mhowlett commented 1 year ago

so just confirming you're not using the transactional producer, right?

@emasab has a bunch of related fixes (PRs currently open) on librdkafka, though my understanding is none of these relate to the idempotent producer outside the context of using transactions.

we are seeing the problem you are in our testing there, however I think the fixes at this point just enable the transactional producer to recover, rather than addressing what is causing the out of order sequence number. we did related testing with the java client to see how it behaves and my understanding is we aren't seeing the out of order sequence number issue there.

pchang388 commented 1 year ago

Hi @mhowlett

Sorry for the delay on my part, my knowledge here is admittingly limited but we are using the idempotent producer and we are not setting a transactional.id so based off my light reading, we are not using atransactional producer currently.

Idempotent Guarantee: 
This restricts Exactly-Once Processing on a single Topic-Partition and within a single producer session. 
Exactly-Once Processing is not guaranteed when the producer is restarted.

Transactional Guarantee: 
This ensures Exactly-Once processing on multiple Topic-Partitions and also supports Exactly-Once Processing across multiple producer sessions even when the producer is restarted multiple times.

Based off the basic info, seems as though if we specify a transactional.id (which enforces/implies idempotent=enabled) for each producer as you mentioned, we should get the exactly-once behavior but also the producer Id won't be randomly generated from the broker and instead be consistent to avoid issues with "out of order sequence number" since the transactional.id is consistent per producer in a group and is used in the message sequencing. Does my understanding align with your advice?

drmbui commented 1 year ago

Hi Matt - we're looking for any further recommendations on using the .net sdk version 1.9 in regard to the issue of desynchronization....and any advice on the producer code for recovering; aside from resetting the connection.

we're going thru the redtape of opening a ticket should this helps on the topic of discussion

milindl commented 1 year ago

Hello @pchang388 , thanks for the detailed steps to reproduce.

I've been trying to reproduce this for a while using 3 brokers setup, with replication across all of them. I've tried quite a few combinations of times between the broker being killed and restarted, and changes in the number of topics the producer is producing etc.

Unfortunately I wasn't able to reproduce it -- is it possible for you to provide complete debug logs for a case where one of your producers is facing this issue?

In the ConfigMap you're passing to the producer, "debug" should be set to "eos,msg,broker,protocol" for this.

Also, regarding the last thing you mentioned about transactional producer, I understand that you're currently not using one, but are you considering to use one?

pchang388 commented 1 year ago

Hi @milindl,

I also had trouble reproducing consistently, I haven't looked into this for a while but it may be worth to look into how the other similar issues reproduced this issue:

https://github.com/confluentinc/confluent-kafka-python/issues/1128 https://github.com/edenhill/librdkafka/issues/3584

I don't have the debug logs anymore unfortunately, the only log snippets on here are from this older comment: https://github.com/confluentinc/confluent-kafka-go/issues/830#issuecomment-1208442027

For debug I set it to all to try and capture everything in the past I believe.

And yes I am considering using one after reading @mhowlett's comment here: https://github.com/confluentinc/confluent-kafka-go/issues/830#issuecomment-1305809672 but I wanted to ask for confirmation on that my understanding is correct first on why it could fix the issue before I implement into our prod environments. Just to be sure of what I am changing and why

pchang388 commented 1 year ago

@milindl - looks someone was able to recently reproduce here in a comment: https://github.com/edenhill/librdkafka/issues/3584

From that comment, it appears that the broker was disconnected and when it came back up it was assigned it's old ProducerId and since it was started over with the message sequence Id but using same old PID, it failed since it is not a +1 increment of the existing ProducerId and sequence numbers that came in previously

Based off the above condition that causes this error and my limited understanding (I did a bit more reading and added details below about the potential trade off of using a transactional producer) - I do think that transactional producer should fix this issue since it uses a transactional.id which is consistent and designed to handle producer restarts/disconnects, at the cost of guarantee/speed. The transactional producer offers more guarantees but can have an impact on performance since it groups requests and offset commits and does an all or nothing batch action to be atomic.

Also since it needs to atomic, this means it would probably be a synchronous action which could dramatically slow down throughput and cause data to lag behind real-time but to what degree I am unsure. The all or nothing mechanism of a transactional producer could prevent this scenario though since the offsets that committed are also stored on Kafka and should pick up from the next offset after it sees the existing one.

So in theory, it should be able to re-connect and start producing from the correct sequence number again (by getting the latest offset committed) but at the cost of speed/performance? Is that what you are thinking as well?

For my specific use case, we process about ~150-250k messages per second and try to be a near real-time data stream so a transactional producer could introduce enough latency to cause us to fall behind in our near real-time guarantees. Not sure it would be usable for us until we did some bench marking with our current volume.

milindl commented 1 year ago

@pchang388 , Yep, I looked at both the attached bugs (the python client, and the librdkafka). Both of them seem to outline a roughly similar path to reproducing the bug as yours, but I still failed to reproduce it locally (since it doesn't happen everytime). The logs (if any) attached to them also turned out not to be helpful, since the most detailed logs are with broker v2.2.

You are right in your understanding, all of that sounds correct. You would also need to add some handling for abortable errors and redo the transaction in that case. The transactional producer would indeed fix the handling of this error through the transactional.id at the cost of a slowdown, and you would indeed need to do some benchmarking to find out what is the ideal number of messages/time after which you should commit the transaction for the best results. I was trying to dig and find out if there are already numbers available for the latency impact, but unfortunately I couldn't find any.

Right now, I'm going through the code and trying to find the issue, but at a glance, it doesn't seem like a very straightforward issue. I'm also doing a long-running test to see if I can reproduce this.

Also, if it is possible in your environment, could you once again turn on "debug" for this with the values "all" or "eos,msg,broker,protocol" to capture it in case it happens in future? Client (and also broker) logs would be helpful if my long-running test doesn't yield any results.