segmentio / kafka-go

Kafka library in Go
MIT License
7.44k stars 770 forks source link

WriteMessages memory leak #1038

Open gokhankoc90 opened 1 year ago

gokhankoc90 commented 1 year ago

Describe the bug &kafka.Writer then w.WriteMessages then w.Close() leaves go routine open, leaks memory.

To verify issue you can run your code at vs code in DEBUG MODE then check "CALL STACK" area multi-able kakf-go(*conn).run will appear. OR run code in docker, check memory usage.

Kafka Version

  • What version(s) of Kafka are you testing against? confluentinc/cp-kafka:7.2.1 / 3.2-IV0
  • What version of kafka-go are you using? latest release To Reproduce

Resources to reproduce the behavior: vscode, any



//##############################################################3
func Write(assetPair AssetPairBase, tlsConfig *tls.Config) {

    var testMessage []kafka.Message

    for _, value := range assetPair.Klines {
        byteValue, errJSON := json.Marshal(value)
        key := strconv.FormatInt(value.StartTime, 10) // s == "97" (decimal)
        if errJSON != nil {
            fmt.Println(errJSON)
        }

        testMessage = append(testMessage, kafka.Message{
            Key:   []byte(key),
            Value: []byte(byteValue),
        })

    }

    w := &kafka.Writer{
        //AllowAutoTopicCreation: true,
        Addr:     kafka.TCP(Address),
        Topic:    assetPair.TopicName,
        Balancer: &kafka.LeastBytes{},
        Transport: &kafka.Transport{
            TLS: tlsConfig,
        },
    }

        // ISSUE APPEAR AFTER RUNNING THIS LINE######
    err := w.WriteMessages(context.TODO(),   
        testMessage...,
    )
    if err != nil {
        fmt.Println("failed to write messages")
    } else {
        fmt.Println("write completed")
    }

        // ISSUE APPERS AFTER THIS LION
    if err := w.Close(); err != nil {
        fmt.Println("failed to close writer")
        tlsConfig = nil
        //log.WithField("Reason", err.Error()).Fatal("failed to close writer")
    } else {
        fmt.Println("closed writer")
        testMessage = nil
    }

}
#################################################3
func GetTLSConfig() *tls.Config {

    tlsConfig, err := NewTLSConfig("api/keys/client.cer.pem",
        "api/keys/client.key.pem",
        "api/keys/server.cer.pem")
    if err != nil {
        log.WithField("Reason", err.Error()).Fatal("connection failed")
    }
    // This can be used on test server if domain does not match cert:
    tlsConfig.InsecureSkipVerify = true

    return tlsConfig

}
gokhankoc90 commented 1 year ago

I further debugged the issue. w.WriteMessages create go routines which keep calling " func (p *connPool) discover(ctx context.Context, wake <-chan event) " so go routines keep staying alive. If discover has to be called call for each topic then should be some sort of turn off option. I have 1800 topics and each updated every 10 minutes so keeping that many discovery with SSL certificates simply eats up 5 GB RAM.

Abdulsametileri commented 1 year ago

When I started to use uber's goleak I faced similar issue, 🤔

integration_test.go:41: found unexpected goroutines:
        [Goroutine 58 in state select, with github.com/segmentio/kafka-go.(*connPool).discover on top of the stack:
        goroutine 58 [select]:
        github.com/segmentio/kafka-go.(*connPool).discover(0x140002fa240, {0x10511b290, 0x140002d1d40}, 0x14000352360)
            /Users/samet.ileri/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.38/transport.go:620 +0x330
        created by github.com/segmentio/kafka-go.(*Transport).grabPool
            /Users/samet.ileri/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.38/transport.go:254 +0x5d8

         Goroutine 83 in state chan receive, with github.com/segmentio/kafka-go.(*conn).run on top of the stack:
        goroutine 83 [chan receive]:
        github.com/segmentio/kafka-go.(*conn).run(0x140005a4000, 0x14000500140, 0x0?)
            /Users/samet.ileri/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.38/transport.go:1246 +0x74
        created by github.com/segmentio/kafka-go.(*connGroup).connect
            /Users/samet.ileri/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.38/transport.go:1224 +0xa3c

         Goroutine 25 in state sync.Cond.Wait, with sync.runtime_notifyListWait on top of the stack:
        goroutine 25 [sync.Cond.Wait]:
        sync.runtime_notifyListWait(0x140005c2250, 0x1)
            /opt/homebrew/Cellar/go/1.19.2/libexec/src/runtime/sema.go:517 +0x16c
        sync.(*Cond).Wait(0x140005c2240)
            /opt/homebrew/Cellar/go/1.19.2/libexec/src/sync/cond.go:70 +0xd0
        github.com/segmentio/kafka-go.(*batchQueue).Get(0x140005a67f8)
            /Users/samet.ileri/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.38/writer.go:951 +0x8c
        github.com/segmentio/kafka-go.(*partitionWriter).writeBatches(0x140005a67e0)
            /Users/samet.ileri/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.38/writer.go:1011 +0x40
        github.com/segmentio/kafka-go.(*Writer).spawn.func1()
            /Users/samet.ileri/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.38/writer.go:549 +0x5c
        created by github.com/segmentio/kafka-go.(*Writer).spawn
            /Users/samet.ileri/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.38/writer.go:547 +0x8c
        ]
Abdulsametileri commented 1 year ago

@achille-roussel @yolken-segment @stevevls

achille-roussel commented 1 year ago

Hello @Abdulsametileri

As far as I can tell this is expected behavior, you need to call CloseIdleConnections on the kafka.Transport to stop the goroutines managing client connections.

Laurel-rao commented 1 year ago

it's already fixed?

achille-roussel commented 1 year ago

Hello @Laurel-rao, do you mind fleshing out your question a bit more? I'm not sure what you mean by "fixed" since it appears the application is behaving as expected here.

maga-lak commented 1 year ago

Hello @achille-roussel There is a bug here, when writer.Close() is called, transport.CloseIdleConnections() will not be called. When the writer is not created via NewWriter w.transport will not be set. kafka-go/writer.go:573 . Accordingly, goroutines will not be removed after writer.Close .

abh commented 1 year ago

@maga-lak it sounds like it's an intended feature that the library keeps keep alive connections ready in the background (similar to http.Client) unless they are closed by the application.