confluentinc / confluent-kafka-go

Confluent's Apache Kafka Golang client
Apache License 2.0
4.65k stars 659 forks source link

Consumer retrieving wrong offsets after transaction #525

Open roignpar opened 4 years ago

roignpar commented 4 years ago

Description

confluent-kafka-go: 1.4.2 librdkafka: 17040127 1.4.2-dirty broker: cp-kafka docker image, tried with 5.3.1 (Kafka 2.3.0) and 5.5.1 (Kafka 2.5.0) OS: Linux 5.7.17-2-MANJARO

When sending consumer offsets as part of a transaction and retrieving the consumer offsets with .Committed immediately after committing the transaction the returned offsets are not the ones sent in the transaction. If there is a small delay between committing and retrieving, the offsets are correct.

How to reproduce

package main

import (
    "fmt"
    "time"

    "github.com/confluentinc/confluent-kafka-go/kafka"
)

var topic = "go_transaction_test"
var timeout = 5000

func main() {
    // consumer
    c, err := kafka.NewConsumer(&kafka.ConfigMap{
        "bootstrap.servers":  "localhost",
        "group.id":           "go_transaction_example",
        "enable.auto.commit": "false",
        "isolation.level":    "read_committed",
        "debug":              "consumer",
    })
    if err != nil {
        panic(err)
    }
    defer c.Close()

    c.SubscribeTopics([]string{topic}, nil)

    // producer
    p, err := kafka.NewProducer(&kafka.ConfigMap{
        "bootstrap.servers":  "localhost",
        "message.timeout.ms": "5000",
        "enable.idempotence": "true",
        "transactional.id":   "go_transaction_test_producer",
        "debug":              "eos",
    })
    if err != nil {
        panic(err)
    }
    defer p.Close()

    go func() {
        for e := range p.Events() {
            switch ev := e.(type) {
            case *kafka.Message:
                if ev.TopicPartition.Error != nil {
                    fmt.Printf("Delivery failed: %v\n", ev.TopicPartition.Error)
                }
            }
        }
    }()

    if err = p.InitTransactions(nil); err != nil {
        panic(err)
    }
    if err = p.BeginTransaction(); err != nil {
        panic(err)
    }

    // populate topic
    for i := 0; i < 30; i++ {
        p.Produce(&kafka.Message{
            TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: 1},
        }, nil)
    }

    if err = p.CommitTransaction(nil); err != nil {
        panic(err)
    }

    // commit initial consumer offsets
    _, err =
        c.CommitOffsets(kafka.TopicPartitions{{Topic: &topic, Partition: 1, Offset: 10}})
    if err != nil {
        panic(err)
    }

    printCommitted(c, "before transaction")

    if err = p.BeginTransaction(); err != nil {
        panic(err)
    }

    cgm, err := c.GetConsumerGroupMetadata()
    if err != nil {
        panic(err)
    }

    // send offsets in transaction
    if err = p.SendOffsetsToTransaction(nil,
        kafka.TopicPartitions{{Topic: &topic, Partition: 1, Offset: 20}},
        cgm); err != nil {
        panic(err)
    }

    if err = p.CommitTransaction(nil); err != nil {
        panic(err)
    }

    printCommitted(c, "after transaction")

    time.Sleep(100 * time.Millisecond)

    printCommitted(c, "after sleeping")
}

func printCommitted(c *kafka.Consumer, text string) {
    committed, err :=
        c.Committed(kafka.TopicPartitions{{Topic: &topic, Partition: 1}}, timeout)
    if err != nil {
        panic(err)
    }

    fmt.Printf("\nConsumer committed offsets %s: %v\n\n", text, committed)
}

Output:

%7|1599561516.018|INIT|rdkafka#consumer-1| [thrd:app]: librdkafka v1.4.2-dirty (0x10402ff) rdkafka#consumer-1 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer, STATIC_LINKING CC GXX PKGCONFIG INSTALL GNULD LDS LIBDL PLUGINS STATIC_LIB_zlib ZLIB STATIC_LIB_libcrypto STATIC_LIB_libssl SSL STATIC_LIB_libzstd ZSTD HDRHISTOGRAM SYSLOG SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER CRC32C_HW, debug 0x2000)
%7|1599561516.019|SUBSCRIBE|rdkafka#consumer-1| [thrd:main]: Group "go_transaction_example": subscribe to new subscription of 1 topics (join state init)
%7|1599561516.019|REBALANCE|rdkafka#consumer-1| [thrd:main]: Group "go_transaction_example" is rebalancing in state init (join-state init) without assignment: unsubscribe
%7|1599561516.019|INIT|rdkafka#producer-2| [thrd:app]: librdkafka v1.4.2-dirty (0x10402ff) rdkafka#producer-2 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer, STATIC_LINKING CC GXX PKGCONFIG INSTALL GNULD LDS LIBDL PLUGINS STATIC_LIB_zlib ZLIB STATIC_LIB_libcrypto STATIC_LIB_libssl SSL STATIC_LIB_libzstd ZSTD HDRHISTOGRAM SYSLOG SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER CRC32C_HW, debug 0x8000)
%7|1599561516.022|TXNAPI|rdkafka#producer-2| [thrd:app]: Transactional API called: init_transactions
%7|1599561516.022|TXNSTATE|rdkafka#producer-2| [thrd:main]: Transaction state change Init -> WaitPID
%7|1599561516.022|IDEMPSTATE|rdkafka#producer-2| [thrd:main]: Idempotent producer state change Init -> RequestPID
%7|1599561516.022|TXN|rdkafka#producer-2| [thrd:main]: Starting PID FSM timer (fire immediately): Starting idempotent producer
%7|1599561516.024|TXNCOORD|rdkafka#producer-2| [thrd:main]: localhost:9092/0: FindCoordinator response: Transaction coordinator is broker 0 (localhost:9092)
%7|1599561516.024|TXNCOORD|rdkafka#producer-2| [thrd:main]: Transaction coordinator changed from (none) -> localhost:9092/0: FindCoordinator response
%7|1599561516.024|IDEMPSTATE|rdkafka#producer-2| [thrd:main]: Idempotent producer state change RequestPID -> WaitTransport
%7|1599561516.024|TXN|rdkafka#producer-2| [thrd:main]: Starting PID FSM timer: No broker available
%7|1599561516.025|COORD|rdkafka#producer-2| [thrd:main]: TxnCoordinator/0: Transaction coordinator is now up
%7|1599561516.025|GETPID|rdkafka#producer-2| [thrd:main]: TxnCoordinator/0: Acquiring ProducerId
%7|1599561516.025|IDEMPSTATE|rdkafka#producer-2| [thrd:main]: Idempotent producer state change WaitTransport -> WaitPID
%7|1599561516.027|GETPID|rdkafka#producer-2| [thrd:main]: Acquired PID{Id:0,Epoch:6}
%7|1599561516.027|IDEMPSTATE|rdkafka#producer-2| [thrd:main]: Idempotent producer state change WaitPID -> Assigned
%7|1599561516.027|TXNSTATE|rdkafka#producer-2| [thrd:main]: Transaction state change WaitPID -> ReadyNotAcked
%7|1599561516.027|TXNAPI|rdkafka#producer-2| [thrd:app]: Transactional API called: init_transactions
%7|1599561516.027|TXNSTATE|rdkafka#producer-2| [thrd:main]: Transaction state change ReadyNotAcked -> Ready
%7|1599561516.027|TXNSTATE|rdkafka#producer-2| [thrd:main]: Transaction state change Ready -> InTransaction
%7|1599561516.027|TXNAPI|rdkafka#producer-2| [thrd:app]: Transactional API called: commit_transaction (begin)
%7|1599561516.027|TXNSTATE|rdkafka#producer-2| [thrd:main]: Transaction state change InTransaction -> BeginCommit
%7|1599561516.027|TXNCOMMIT|rdkafka#producer-2| [thrd:app]: Flushing 30 outstanding message(s) prior to commit
%7|1599561516.028|DRAIN|rdkafka#producer-2| [thrd:localhost:9092/bootstrap]: go_transaction_test [0] beginning partition drain: wait for outstanding requests to finish before producing to new leader
%7|1599561516.028|DRAIN|rdkafka#producer-2| [thrd:localhost:9092/bootstrap]: go_transaction_test [1] beginning partition drain: wait for outstanding requests to finish before producing to new leader
%7|1599561516.028|DRAIN|rdkafka#producer-2| [thrd:localhost:9092/bootstrap]: go_transaction_test [2] beginning partition drain: wait for outstanding requests to finish before producing to new leader
%7|1599561516.029|ADDPARTS|rdkafka#producer-2| [thrd:main]: TxnCoordinator/0: Adding partitions to transaction
%7|1599561516.031|ADDPARTS|rdkafka#producer-2| [thrd:main]: go_transaction_test [1] registered with transaction
%7|1599561516.031|NEWPID|rdkafka#producer-2| [thrd:localhost:9092/bootstrap]: go_transaction_test [1] changed PID{Invalid} -> PID{Id:0,Epoch:6} with base MsgId 1
%7|1599561516.031|RESETSEQ|rdkafka#producer-2| [thrd:localhost:9092/bootstrap]: go_transaction_test [1] resetting epoch base seq from 0 to 1
%7|1599561516.032|TXNAPI|rdkafka#producer-2| [thrd:app]: Transactional API called: commit_transaction
%7|1599561516.033|TXNSTATE|rdkafka#producer-2| [thrd:main]: Transaction state change BeginCommit -> CommittingTransaction
%7|1599561516.034|TXNCOMPLETE|rdkafka#producer-2| [thrd:main]: Transaction successfully committed
%7|1599561516.034|TXNSTATE|rdkafka#producer-2| [thrd:main]: Transaction state change CommittingTransaction -> Ready
%7|1599561516.035|COMMIT|rdkafka#consumer-1| [thrd:main]: Deferring "manual" offset commit for 1 partition(s) in state query-coord: no coordinator available
%7|1599561517.026|JOIN|rdkafka#consumer-1| [thrd:main]: Group "go_transaction_example": postponing join until up-to-date metadata is available
%7|1599561517.029|REJOIN|rdkafka#consumer-1| [thrd:main]: Group "go_transaction_example": subscription updated from metadata change: rejoining group
%7|1599561517.029|REBALANCE|rdkafka#consumer-1| [thrd:main]: Group "go_transaction_example" is rebalancing in state up (join-state init) without assignment: group rejoin
%7|1599561517.029|COMMIT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/0: Committing offsets for 1 partition(s): manual
%7|1599561517.031|OFFSET|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/0: Fetch committed offsets for 1/1 partition(s)

Consumer committed offsets before transaction: [go_transaction_test[1]@10]

%7|1599561517.032|TXNSTATE|rdkafka#producer-2| [thrd:main]: Transaction state change Ready -> InTransaction
%7|1599561517.032|TXNAPI|rdkafka#producer-2| [thrd:app]: Transactional API called: send_offsets_to_transaction
%7|1599561517.037|TXNAPI|rdkafka#producer-2| [thrd:app]: Transactional API called: commit_transaction (begin)
%7|1599561517.037|TXNSTATE|rdkafka#producer-2| [thrd:main]: Transaction state change InTransaction -> BeginCommit
%7|1599561517.037|TXNCOMMIT|rdkafka#producer-2| [thrd:app]: Flushing 0 outstanding message(s) prior to commit
%7|1599561517.037|TXNAPI|rdkafka#producer-2| [thrd:app]: Transactional API called: commit_transaction
%7|1599561517.037|TXNSTATE|rdkafka#producer-2| [thrd:main]: Transaction state change BeginCommit -> CommittingTransaction
%7|1599561517.038|TXNCOMPLETE|rdkafka#producer-2| [thrd:main]: Transaction successfully committed
%7|1599561517.038|TXNSTATE|rdkafka#producer-2| [thrd:main]: Transaction state change CommittingTransaction -> Ready
%7|1599561517.039|OFFSET|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/0: Fetch committed offsets for 1/1 partition(s)

Consumer committed offsets after transaction: [go_transaction_test[1]@10]

%7|1599561517.140|OFFSET|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/0: Fetch committed offsets for 1/1 partition(s)

Consumer committed offsets after sleeping: [go_transaction_test[1]@20]

%7|1599561517.235|DESTROY|rdkafka#producer-2| [thrd:app]: Terminating instance (destroy flags none (0x0))
%7|1599561517.235|IDEMPSTATE|rdkafka#producer-2| [thrd:main]: Idempotent producer state change Assigned -> Terminate
%7|1599561517.235|DESTROY|rdkafka#producer-2| [thrd:main]: Destroy internal
%7|1599561517.235|DESTROY|rdkafka#producer-2| [thrd:main]: Removing all topics
%7|1599561517.235|CLOSE|rdkafka#consumer-1| [thrd:app]: Closing consumer
%7|1599561517.235|CLOSE|rdkafka#consumer-1| [thrd:app]: Waiting for close events
%7|1599561517.235|REBALANCE|rdkafka#consumer-1| [thrd:main]: Group "go_transaction_example" is rebalancing in state up (join-state init) without assignment: unsubscribe
%7|1599561517.235|LEAVE|rdkafka#consumer-1| [thrd:main]: localhost:9092/0: Leaving group
%7|1599561517.239|CLOSE|rdkafka#consumer-1| [thrd:app]: Consumer closed
%7|1599561517.239|DESTROY|rdkafka#consumer-1| [thrd:app]: Terminating instance (destroy flags none (0x0))
%7|1599561517.239|CLOSE|rdkafka#consumer-1| [thrd:app]: Closing consumer
%7|1599561517.239|CLOSE|rdkafka#consumer-1| [thrd:app]: Waiting for close events
%3|1599561517.239|ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 1/1 brokers are down: Local: All broker connections are down
%7|1599561517.239|CLOSE|rdkafka#consumer-1| [thrd:app]: Consumer closed
%7|1599561517.239|DESTROY|rdkafka#consumer-1| [thrd:main]: Destroy internal
%7|1599561517.239|DESTROY|rdkafka#consumer-1| [thrd:main]: Removing all topics

Broker logs are quite verbose, seems to ignore LOG4J env config vars.

Checklist

Please provide the following information:

dtheodor commented 4 years ago

@edenhill can you provide a comment here? This issue is on librdkafka, does not have to do with the go wrapper

sneko commented 3 years ago

@roignpar not sure that's your issue but: from what I remember the transaction adds some "markers" as message into the topic, and the .Committed() won't reflect the truth.

Try to use .Position() with the right TopicPartition{} instead :)

jliunyu commented 2 years ago

Since this is an old version, do you still see this error with the latest version?