IBM / sarama

Sarama is a Go library for Apache Kafka.
MIT License
11.57k stars 1.76k forks source link

sarama does not correctly use s.SaramaConfig.Net.MaxOpenRequests #2052

Closed baoxc-shopee closed 1 year ago

baoxc-shopee commented 3 years ago
Versions

Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly.

Sarama Kafka Go
Version 1.30.0 (2021-09-29) 2.6 1.16
Configuration

What configuration values are you using for Sarama and Kafka?

        s.SaramaConfig = sarama.NewConfig()
    s.SaramaConfig.Version = sarama.V1_1_0_0
    s.SaramaConfig.Metadata.Timeout = time.Second * 30

    s.SaramaConfig.Producer.RequiredAcks = sarama.WaitForAll

    s.SaramaConfig.Producer.Return.Successes = true
    s.SaramaConfig.Producer.Return.Errors = true

    s.SaramaConfig.Producer.Retry.Max = 0 // disable retry, use application level retry
    s.SaramaConfig.Producer.Flush.Bytes = 1024 * 1024
    s.SaramaConfig.Producer.Flush.Frequency = time.Millisecond
    s.SaramaConfig.Net.MaxOpenRequests = 5

    s.SaramaConfig.Producer.Flush.MaxMessages = 10
    s.SaramaConfig.Producer.Flush.Messages = 10
    s.SaramaConfig.Producer.MaxMessageBytes = 10 * 1024 * 1024

    bytes := RandStringBytes(1024)
    go func() {
        for i:=0; i < 1000 * 80; i++ {
            s.Send(&Message{
                Meta: map[string]interface{}{
                    "flag": strconv.Itoa(i),
                },
                Data: bytes,
            })
        }
    }()

    <- time.After(10 * time.Second)

    fmt.Println("qps", len(s.Done))
Problem Description

Like Java client, there should be a configuration like "max.in.flight.requests.per.connection" which is default 5 in Java client. However, in Sarama all the requests are strictly sent one by one:

in "async_producer.go" line 694:


    go withRecover(func() {
        for set := range bridge {
            request := set.buildRequest()

            response, err := broker.Produce(request)

            responses <- &brokerProducerResponse{
                set: set,
                err: err,
                res: response,
            }
        }
        close(responses)
    })

The "Produce" function finally uses "sendNoReceive" in "broker.go" line 853:

func (b *Broker) sendAndReceive(req protocolBody, res protocolBody) error {
    responseHeaderVersion := int16(-1)
    if res != nil {
        responseHeaderVersion = res.headerVersion()
    }

    promise, err := b.send(req, res != nil, responseHeaderVersion)
    if err != nil {
        return err
    }

    if promise == nil {
        return nil
    }

    select {
    case buf := <-promise.packets:
        return versionedDecode(buf, res, req.version())
    case err = <-promise.errors:
        return err
    }
}

This design makes that the sending is strictly one send, waiting one request, one send, waiting one request. When the network has some latency, this scheme greatly reduced the client throughput.

I would recommend that it would be better if multiple requests can be send concurrently.

rtreffer-fita commented 2 years ago

This was fixed in Version 1.31.0 (2022-01-18) https://github.com/Shopify/sarama/blob/main/CHANGELOG.md#version-1310-2022-01-18

feat: allow AsyncProducer to have MaxOpenRequests inflight produce requests per broker by @xujianhai666 in https://github.com/Shopify/sarama/pull/1686

github-actions[bot] commented 1 year ago

Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. Please check if the main branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.

dnwe commented 1 year ago

Closing as fixed in Version 1.31.0