twmb / franz-go

franz-go contains a feature complete, pure Go library for interacting with Kafka from 0.8.0 through 3.6+. Producing, consuming, transacting, administrating, etc.
BSD 3-Clause "New" or "Revised" License
1.6k stars 158 forks source link

Transaction is not properly aborted #729

Closed dvachaiev closed 1 month ago

dvachaiev commented 1 month ago

Calling Client.EndTransaction(ctx, kgo.TryAbort) doesn't abort transaction on the broker and records from it can be consumed with read_committed isolation level after next committed transaction:

consumed record: topic=test, partition=0, offset=115, value=abort 0
consumed record: topic=test, partition=0, offset=116, value=abort 1
consumed record: topic=test, partition=0, offset=117, value=abort 2
consumed record: topic=test, partition=0, offset=118, value=abort 3
consumed record: topic=test, partition=0, offset=119, value=abort 4
consumed record: topic=test, partition=0, offset=120, value=abort 5
consumed record: topic=test, partition=0, offset=121, value=abort 6
consumed record: topic=test, partition=0, offset=122, value=abort 7
consumed record: topic=test, partition=0, offset=123, value=abort 8
consumed record: topic=test, partition=0, offset=124, value=abort 9
consumed record: topic=test, partition=0, offset=125, value=commit 0
consumed record: topic=test, partition=0, offset=126, value=commit 1
consumed record: topic=test, partition=0, offset=127, value=commit 2
consumed record: topic=test, partition=0, offset=128, value=commit 3
consumed record: topic=test, partition=0, offset=129, value=commit 4
consumed record: topic=test, partition=0, offset=130, value=commit 5
consumed record: topic=test, partition=0, offset=131, value=commit 6
consumed record: topic=test, partition=0, offset=132, value=commit 7
consumed record: topic=test, partition=0, offset=133, value=commit 8
consumed record: topic=test, partition=0, offset=134, value=commit 9
consumed record: topic=test, partition=0, offset=135, value=�

Messages with abort in value shouldn't be consumed and there is no control record between aborted and committed records.

To reproduce I used slightly modified eos script:

package main

import (
    "context"
    "flag"
    "fmt"
    "os"
    "strconv"
    "strings"
    "time"

    "github.com/twmb/franz-go/pkg/kerr"
    "github.com/twmb/franz-go/pkg/kgo"
)

var (
    seedBrokers = flag.String("brokers", "localhost:9092", "comma delimited list of seed brokers")
    produceTo   = flag.String("produce-to", "", "input topic to produce transactionally produce to")

    group = flag.String("group", "example-group", "group to use for consuming")

    produceTxnID = flag.String("produce-txn-id", "eos-example-input-producer", "transactional ID to use for the input producer")
)

func die(msg string, args ...any) {
    fmt.Fprintf(os.Stderr, msg, args...)
    os.Exit(1)
}

func main() {
    flag.Parse()

    if *produceTo == "" {
        die("missing -produce-to (%s)", *produceTo)
    }

    go consumer()
    go inputProducer()

    select {}
}

func inputProducer() {
    cl, err := kgo.NewClient(
        kgo.SeedBrokers(strings.Split(*seedBrokers, ",")...),
        kgo.DefaultProduceTopic(*produceTo),
        kgo.TransactionalID(*produceTxnID),
        kgo.WithLogger(kgo.BasicLogger(os.Stderr, kgo.LogLevelInfo, func() string {
            return "[input producer] "
        })),
    )
    if err != nil {
        die("unable to create input producer: %v", err)
    }

    ctx := context.Background()

    for doCommit := true; ; doCommit = !doCommit {
        if err := cl.BeginTransaction(); err != nil {
            // We are unable to start a transaction if the client
            // is not transactional or if we are already in a
            // transaction. A proper transactional loop will never
            // account either error.
            die("unable to start transaction: %v", err)
        }

        msg := "commit "
        if !doCommit {
            msg = "abort "
        }

        e := kgo.AbortingFirstErrPromise(cl)
        for i := 0; i < 10; i++ {
            cl.Produce(ctx, kgo.StringRecord(msg+strconv.Itoa(i)), e.Promise())
        }
        commit := kgo.TransactionEndTry(doCommit && e.Err() == nil)

        switch err := cl.EndTransaction(ctx, commit); err {
        case nil:
        case kerr.OperationNotAttempted:
            if err := cl.EndTransaction(ctx, kgo.TryAbort); err != nil {
                die("abort failed: %v", err)
            }
        default:
            die("commit failed: %v", err)
        }

        time.Sleep(10 * time.Second)
    }
}

func consumer() {
    sess, err := kgo.NewClient(
        kgo.SeedBrokers(strings.Split(*seedBrokers, ",")...),
        kgo.FetchIsolationLevel(kgo.ReadCommitted()),
        kgo.WithLogger(kgo.BasicLogger(os.Stderr, kgo.LogLevelInfo, func() string {
            return "[consumer] "
        })),
        kgo.ConsumerGroup(*group),
        kgo.ConsumeTopics(*produceTo),
        kgo.RequireStableFetchOffsets(),
        kgo.DisableAutoCommit(),
        kgo.ConsumeResetOffset(kgo.NewOffset().AtEnd()),
        kgo.KeepControlRecords(),
    )
    if err != nil {
        die("unable to create eos consumer/producer: %v", err)
    }
    defer sess.Close()

    ctx := context.Background()

    for {
        fetches := sess.PollFetches(ctx)

        if fetchErrs := fetches.Errors(); len(fetchErrs) > 0 {
            for _, fetchErr := range fetchErrs {
                fmt.Printf("error consuming from topic: topic=%s, partition=%d, err=%v",
                    fetchErr.Topic, fetchErr.Partition, fetchErr.Err)
            }

            // The errors may be fatal for the partition (auth
            // problems), but we can still process any records if
            // there are any.
        }

        fetches.EachRecord(func(r *kgo.Record) {
            fmt.Fprintf(os.Stderr, "consumed record: topic=%s, partition=%d, offset=%d, value=%s\n", r.Topic, r.Partition, r.Offset, r.Value)
        })
    }
}
twmb commented 1 month ago

Well, you owe me an hour back, but I suppose I owe you that time back as well because my own example is wrong. The code block above is failing because doCommit && e.Err() == nil short circuits when doCommit is false, meaning e.Err() == nil is never evaluated, meaning the AbortingFirstErrPromise never actually waits for records to be produced, meaning the transaction is actually ended before a single record is produced. The abort records actually are produced in a new transaction -- the one that actually will commit.

I spent far too long trying to figure out why this worked when I moved e.Err() out and made the code:

perr := e.Err()
commit := kgo.TransactionEndTry(doCommit && perr == nil)

I'll fixup the example.