twmb / franz-go

franz-go contains a feature complete, pure Go library for interacting with Kafka from 0.8.0 through 3.8+. Producing, consuming, transacting, administrating, etc.
BSD 3-Clause "New" or "Revised" License
1.84k stars 188 forks source link

BrokerResponse too large #809

Closed sakib-malik-7417 closed 1 month ago

sakib-malik-7417 commented 2 months ago

Hi, We were using the below code for fetching from a topic, assuming all of our payloads are small enough

And given fetch_max_bytes = 512B, broker_max_read_bytes=1KB,

We are not able to consume and getting below in the DEBUG logs, the response size (131108 =~ 130KB) is much bigger than the fetch_max_bytes and broker_max_read_bytes and also a single record cannot contain that much size

Can you help me understand why this is happening?

example[DEBUG] wrote Fetch v16; broker: 0, bytes_written: 94, write_wait: 345.577375ms, time_to_write: 31.334µs, err: <nil>
example[DEBUG] read Fetch v16; broker: 0, bytes_read: 4, read_wait: 30.25µs, time_to_read: 83.000166ms, err: invalid large response size 131108 > limit 1024
example[DEBUG] read from broker errored, killing connection; req: Fetch, addr: <broker_1>:9092, broker: 0, successful_reads: 0, err: invalid large response size 131108 > limit 1024
package main

import (
    "context"
    "crypto/tls"
    "fmt"
    "os"
    "time"

    "github.com/twmb/franz-go/pkg/kgo"
    "github.com/twmb/franz-go/pkg/sasl/plain"
)

func main() {
    client, err := kgo.NewClient(
        kgo.SeedBrokers("<broker_1:9092>"),
        kgo.ConsumeTopics("topic"),
        kgo.ConsumerGroup("consumer-x"),
        kgo.ConsumeResetOffset(kgo.NewOffset().AtEnd().Relative(-1)),
        kgo.FetchMaxBytes(1024/2),
        kgo.BrokerMaxReadBytes(1024),
        kgo.FetchMaxPartitionBytes(1024/2),
        kgo.FetchMaxWait(5000*time.Millisecond),
        kgo.DialTLSConfig(
            &tls.Config{
                InsecureSkipVerify: true,
                ClientAuth:         tls.NoClientCert,
            },
        ),
        kgo.WithLogger(kgo.BasicLogger(os.Stdout, kgo.LogLevelDebug, func() string { return "example" })),
        kgo.SASL(
            plain.Auth{
                User: "<User>",
                Pass: "<Password>",
            }.AsMechanism(),
        ),
    )
    if err != nil {
        panic(err)
    }
    err = client.Ping(context.TODO())
    if err != nil {
        panic(err)
    }
    ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
    for {
        fetches := client.PollRecords(ctx, 1)
        for _, fetch := range fetches {
            for _, record := range fetch.Topics {
                for _, f := range record.Records() {
                    fmt.Println("Received record")
                    fmt.Println(f.Value)
                    client.MarkCommitRecords(f)
                }
            }
        }
    }
}
sakib-malik-7417 commented 2 months ago

Also on another note shouldn't below line be of Warn log level: example[DEBUG] read from broker errored, killing connection; req: Fetch, addr: <broker_1>:9092, broker: 0, successful_reads: 0, err: invalid large response size 131108 > limit 1024

twmb commented 2 months ago

Was the batch produced at the offset you're consuming from very large? Kafka does not return individual records, only batches.

kmrgirish commented 2 months ago

Was the batch produced at the offset you're consuming from very large? Kafka does not return individual records, only batches.

If I understand correctly, if the producer created a batch of 100 records with a total size of 10MB at offset 0, and I issue a fetch request with a maximum byte size of 1MB at the same offset, would Kafka still return all 100 records in that batch? Is it possible for Kafka to return just the first few records up to the 1MB limit, or does it always return the entire batch?

kmrgirish commented 2 months ago

@twmb We were able to reproduce the issue, and it seems to be caused by the larger batch size. Thanks for the help—learned something new today! :)

sakib-malik-7417 commented 2 months ago

@twmb Thanks for the quick response, also can you please comment on this as well:

Also on another note shouldn't below line be of Warn log level: example[DEBUG] read from broker errored, killing connection; req: Fetch, addr: :9092, broker: 0, successful_reads: 0, err: invalid large response size 131108 > limit 1024

twmb commented 1 month ago

Connections can die regularly -- e.g., internet traffic weather, temporary internet outages. IMO warn is reserved for things that shouldn't happen but we can recover, but you should really know because you should probably look into a problem somewhere OR more generally, things you should be aware of and look into if you have a logger setup. A connection being cut is pretty common and very retryable, so I don't throw that at warn -- or even info.