IBM / sarama

Sarama is a Go library for Apache Kafka.
MIT License
11.58k stars 1.76k forks source link

Transactional Producer send msg always got ErrOutOfOrderSequenceNumber error when brokers reconnected #2611

Closed songxinjianqwe closed 6 months ago

songxinjianqwe commented 1 year ago
Versions

Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly.

Sarama Kafka Go
1.40.1 2.6.2 1.19
Configuration

What configuration values are you using for Sarama and Kafka?

    var appName = "tx-id-1"
    var config = sarama.NewConfig()
    config.ClientID = appName
    config.Producer.Idempotent = true
    config.Producer.Return.Errors = true
    config.Producer.Return.Successes = true
    config.Producer.RequiredAcks = sarama.WaitForAll
    config.Producer.Partitioner = sarama.NewManualPartitioner
    config.Producer.Transaction.Retry.Max = 3
    config.Producer.Transaction.ID = appName
    config.Net.MaxOpenRequests = 1
    config.Producer.Retry.Max = 1
    config.Metadata.Retry.Max = 0
    config.Net.DialTimeout = time.Second * 3
    config.Net.ReadTimeout = time.Second * 3
    config.Net.WriteTimeout = time.Second * 3
Logs

When filing an issue please provide logs from Sarama and Kafka if at all possible. You can set sarama.Logger to a log.Logger to capture Sarama debug output.

logs: CLICK ME

``` 2023/08/21 15:36:04 begin txn start [Sarama] 2023/08/21 15:36:47 txnmgr/transition [tx-id-1] transition from ProducerTxnStateReady to ProducerTxnStateInTransaction 2023/08/21 15:36:53 begin txn done 2023/08/21 15:36:57 send msg: 10 start [Sarama] 2023/08/21 15:36:58 client/metadata fetching metadata for [slowsql_unitest] from broker alikafka-post-cn-zxu39oke500i-3-vpc.alikafka.aliyuncs.com:9092 [Sarama] 2023/08/21 15:36:58 client/brokers registered new broker #103 at 10.130.232.87:9092 [Sarama] 2023/08/21 15:36:58 client/brokers registered new broker #102 at 10.130.232.86:9092 [Sarama] 2023/08/21 15:36:58 producer/broker/102 starting up [Sarama] 2023/08/21 15:36:58 producer/broker/102 state change to [open] on slowsql_unitest/0 [Sarama] 2023/08/21 15:36:58 producer/leader/slowsql_unitest/0 selected broker 102 [Sarama] 2023/08/21 15:36:58 Connected to broker at 10.130.232.86:9092 (registered as #102) [Sarama] 2023/08/21 15:36:58 txnmgr/add-partition-to-txn [tx-id-1] successful to add partitions txn &{ThrottleTime:0s Errors:map[slowsql_unitest:[0xc000022310]]} [Sarama] 2023/08/21 15:36:58 txnmgr/transition [tx-id-1] transition from ProducerTxnStateInTransaction to ProducerTxnStateInError|ProducerTxnStateAbortableError 2023/08/21 15:37:14 send msg: 10 done 2023/08/21 15:37:14 send msg 10 fail, err: kafka server: The broker received an out of order sequence number 2023/08/21 15:37:14 abort txn start [Sarama] 2023/08/21 15:37:16 producer/txnmgr [tx-id-1] aborting transaction [Sarama] 2023/08/21 15:37:16 txnmgr/transition [tx-id-1] transition from ProducerTxnStateInError|ProducerTxnStateAbortableError to ProducerTxnStateEndTransaction|ProducerTxnStateAbortingTransaction [Sarama] 2023/08/21 15:37:16 txnmgr/endtxn [tx-id-1] successful to end txn &{ThrottleTime:0s Err:kafka server: Not an error, why are you printing me?} [Sarama] 2023/08/21 15:37:16 txnmgr/transition [tx-id-1] transition from ProducerTxnStateEndTransaction|ProducerTxnStateAbortingTransaction to ProducerTxnStateReady [Sarama] 2023/08/21 15:37:16 producer/txnmgr [tx-id-1] transaction aborted 2023/08/21 15:37:18 abort txn done ```

Problem Description

hi, I am using the transactional api in a sync-send-multi-messages-in-txn scenario, but I found an unexpected case when the network not connectable. The code like this:

        for {
            select {
            case <-ctx.Done():
                return nil
            default:
                log.Printf("begin txn start")
                if err := producer.BeginTxn(); err != nil {
                    log.Printf("could not begin transaction: %v", err)
                    time.Sleep(time.Second)
                    continue
                }
                log.Printf("begin txn done")
                msgList := make([]*sarama.ProducerMessage, 0, 10)
                for j := 0; j < 10; j++ {
                    message := sarama.ProducerMessage{Topic: topic, Partition: 0, Key: sarama.StringEncoder(strconv.Itoa(i)), Value: sarama.StringEncoder(msgBody)}
                    msgList = append(msgList, &message)
                    i += 1
                }
                var commit = true
                for _, msg := range msgList {
                    log.Printf("send msg: %s start ", msg.Key)
                    _, _, err := producer.SendMessage(msg)
                    log.Printf("send msg: %s done", msg.Key)
                    if err != nil {
                        log.Printf("send msg %s fail, err: %+v", msg.Key, err)
                        log.Printf("abort txn start")
                        if err := producer.AbortTxn(); err != nil {
                            log.Printf("could not abort transaction: %+v", err)
                            time.Sleep(time.Second)
                            commit = false
                            break
                        }
                        log.Printf("abort txn done")
                        time.Sleep(time.Second)
                        commit = false
                        break
                    }
                    time.Sleep(time.Second)
                }
                if !commit {
                    continue
                }
                log.Printf("commit txn start")
                if err := producer.CommitTxn(); err != nil {
                    log.Printf("could not commit transaction: %v\n", err)
                    log.Printf("abort txn start")
                    if err := producer.AbortTxn(); err != nil {
                        log.Printf("could not abort transaction: %v", err)
                        time.Sleep(time.Second)
                        continue
                    }
                    log.Printf("abort txn done")
                    time.Sleep(time.Second)
                    continue
                }
                log.Printf("commit txn done")
                time.Sleep(time.Second)
            }
        }

When brokers die, all messages-sending-call will return error ErrOutOfOrderSequenceNumber (45), until I recreate a sync producer. The case looks like this problem https://github.com/IBM/sarama/issues/1430, and this is a fix for the problem : https://github.com/IBM/sarama/pull/1661. I look at this commit https://github.com/IBM/sarama/commit/ba2b4bc7c3b10ef7b14f7db63f77214cb113867d and I found maybe this line of code is the reason for ErrOutOfOrderSequenceNumber.

func (p *asyncProducer) returnError(msg *ProducerMessage, err error) {
    if p.IsTransactional() {
        _ = p.maybeTransitionToErrorState(err)
    }
    // We need to reset the producer ID epoch if we set a sequence number on it, because the broker
    // will never see a message with this number, so we can never continue the sequence.
        // this line! 
    if !p.IsTransactional() && msg.hasSequence {
        Logger.Printf("producer/txnmanager rolling over epoch due to publish failure on %s/%d", msg.Topic, msg.Partition)
        p.bumpIdempotentProducerEpoch()
    }

!p.IsTransactional() && msg.hasSequence will bump epoch only when non-transactional. But I write a similar code of Java to send messages using Kafka Java SDK, and ErrOutOfOrderSequenceNumber will not happen after the brokers reconnect. I found a log printed by Kafak Java SDK:

2023-08-21 15:51:11.980 [INFO] [kafka-producer-network-thread | producer-1] [org.apache.kafka.clients.producer.internals.TransactionManager] @@@traceId=TID: N/A@@@ [Producer clientId=producer-1, transactionalId=1] ProducerId set to 42002 with epoch 46

I think this log means that no matter transaction is enabled, kafka will always bump the epoch after messages sent fail, but in sarama, only idempotent enabled and not transaction enabled will bump the epoch. Is this a bug in sarama ? Could we fix this by removing the transaction predicate in if !p.IsTransactional() && msg.hasSequence condition.

songxinjianqwe commented 1 year ago

Update: I found a way to fix this, set Version >= 2.5.0 I read transactionManager and found this code

func (p *asyncProducer) maybeTransitionToErrorState(err error) error {
    if errors.Is(err, ErrClusterAuthorizationFailed) ||
        errors.Is(err, ErrProducerFenced) ||
        errors.Is(err, ErrUnsupportedVersion) ||
        errors.Is(err, ErrTransactionalIDAuthorizationFailed) {
        return p.txnmgr.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagFatalError, err)
    }
    if p.txnmgr.coordinatorSupportsBumpingEpoch && p.txnmgr.currentTxnStatus()&ProducerTxnFlagEndTransaction == 0 {
        p.txnmgr.epochBumpRequired = true
    }
    return p.txnmgr.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagAbortableError, err)
}

and this code:

    if t.client.Config().Version.IsAtLeast(V2_5_0_0) {
        req.Version = 3
        isEpochBump = t.producerID != noProducerID && t.producerEpoch != noProducerEpoch
        t.coordinatorSupportsBumpingEpoch = true
        req.ProducerID = t.producerID
        req.ProducerEpoch = t.producerEpoch
    } else if t.client.Config().Version.IsAtLeast(V2_4_0_0) {
        req.Version = 2
    }

So if we set Version >= 2.5.0, the producer will bump the epoch after messages send fail.

github-actions[bot] commented 10 months ago

Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. Please check if the main branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.

dnwe commented 10 months ago

@songxinjianqwe yes exactly right, that bit of code is essentially Sarama's client-side implementation of KIP-360: Improve reliability of idempotent/transactional producer / KAFKA-8805: Bump producer epoch following recoverable errors, and it relies upon a broker change (KAFKA-8710) that was made in Kafka 2.5.0

github-actions[bot] commented 7 months ago

Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. Please check if the main branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.