IBM / sarama

Sarama is a Go library for Apache Kafka.
MIT License
11.57k stars 1.76k forks source link

asyncproducer fails with topic does not exist when consecutive messages are published to different topics in the same millisecond #2438

Closed dineshudayakumar closed 1 year ago

dineshudayakumar commented 1 year ago
Versions
Sarama Kafka Go
1.37.2 2.0.0 1.20.1
Configuration
func ProducerConfig() *sarama.Config {
    conf := sarama.NewConfig()
    conf.Producer.Compression = sarama.CompressionSnappy
    conf.Producer.RequiredAcks = sarama.WaitForAll
    conf.Producer.Return.Successes = true
    conf.Producer.Return.Errors = true
    conf.Producer.Retry.Max = 5
    conf.Producer.Retry.Backoff = 500 * time.Millisecond
    conf.Metadata.Full = false
    conf.Metadata.Retry.Max, _ = 50
    conf.Metadata.Retry.Backoff = time.Duration(100) * time.Millisecond
    conf.ClientID = "clientID"
    conf.Version = sarama.V2_0_0_0
    return conf
}
Logs

When I enable logs to print directly to stdout it takes few ms so the issue is not reproducible so I have the logger to print to stdout using a goroutine, so the below logs lines might not be in the exact order. I updated source code to print stacktrace at places where all ErrUnknownTopicOrPartition was thrown

The below test uses a consumer to consume from 2 topics and a async producer to publish to 2 topics

logs: CLICK ME

``` === RUN TestRoundTrip Successfully initialized new client Initializing new client Initializing new client Successfully initialized new client Initializing new client Successfully initialized new client Initializing new client Successfully initialized new client Initializing new client Successfully initialized new client Initializing new client Initializing new client Consumer.MaxWaitTime is very low, which can cause high CPU and network usage. See documentation for details. Deprecation warning: Consumer.Offsets.CommitInterval exists for historical compatibility and should not be used. Please use Consumer.Offsets.AutoCommit, the current value will be ignored Successfully initialized new client Successfully initialized new client Initializing new client Successfully initialized new client client/metadata fetching metadata for [test_rn.pos_transaction.pb test_rn.pos_transaction_historical.pb] from broker kafka:9092 Consumer.MaxWaitTime is very low, which can cause high CPU and network usage. See documentation for details. Deprecation warning: Consumer.Offsets.CommitInterval exists for historical compatibility and should not be used. Please use Consumer.Offsets.AutoCommit, the current value will be ignored Connected to broker at kafka:9092 (unregistered) client/brokers registered new broker #0 at 44fe8f289aff:29092Consumer.MaxWaitTime is very low, which can cause high CPU and network usage. See documentation for details. Deprecation warning: Consumer.Offsets.CommitInterval exists for historical compatibility and should not be used. Please use Consumer.Offsets.AutoCommit, the current value will be ignored Connected to broker at 44fe8f289aff:29092 (registered as #0) client/metadata fetching metadata for [test_rn.pos_transaction.pb test_rn.pos_transaction_historical.pb] from broker kafka:9092 client/coordinator requesting coordinator for test-ab1f9599-adae-11ed-8049-80008c44297f-default-worker from kafka:9092 client/coordinator coordinator for test-ab1f9599-adae-11ed-8049-80008c44297f-default-worker is #0 (44fe8f289aff:29092) client/coordinator requesting coordinator for test-ab1f9599-adae-11ed-8049-80008c44297f-default-worker from kafka:9092 client/coordinator coordinator for test-ab1f9599-adae-11ed-8049-80008c44297f-default-worker is #0 (44fe8f289aff:29092) Successfully initialized new client Initializing new client client/metadata fetching metadata for [test_rn.pos_transaction.pb] from broker kafka:9092 goroutine 179222 [running]: runtime/debug.Stack() /usr/local/go/src/runtime/debug/stack.go:24 +0x64 runtime/debug.PrintStack() /usr/local/go/src/runtime/debug/stack.go:16 +0x1c github.com/Shopify/sarama.(*client).Partitions(0x0?, {0x4006b660c0, 0x25}) /Users/dinesh.udayakumar/go/pkg/mod/github.com/!shopify/sarama@v1.37.2/client.go:348 +0xb4 github.com/Shopify/sarama.(*topicProducer).partitionMessage.func1() /Users/dinesh.udayakumar/go/pkg/mod/github.com/!shopify/sarama@v1.37.2/async_producer.go:534 +0x98 github.com/eapache/go-resiliency/breaker.(*Breaker).doWork.func1(0x40062b0678?, 0x177a4?) /Users/dinesh.udayakumar/go/pkg/mod/github.com/eapache/go-resiliency@v1.3.0/breaker/breaker.go:85 +0x54 github.com/eapache/go-resiliency/breaker.(*Breaker).doWork(0x0?, 0x0, 0x0?) /Users/dinesh.udayakumar/go/pkg/mod/github.com/eapache/go-resiliency@v1.3.0/breaker/breaker.go:86 +0x30 github.com/eapache/go-resiliency/breaker.(*Breaker).Run(...) /Users/dinesh.udayakumar/go/pkg/mod/github.com/eapache/go-resiliency@v1.3.0/breaker/breaker.go:55 github.com/Shopify/sarama.(*topicProducer).partitionMessage(0x4007d9fe40?, 0x4005c70b40) /Users/dinesh.udayakumar/go/pkg/mod/github.com/!shopify/sarama@v1.37.2/async_producer.go:525 +0x90 github.com/Shopify/sarama.(*topicProducer).dispatch(0x4007d9fe40) /Users/dinesh.udayakumar/go/pkg/mod/github.com/!shopify/sarama@v1.37.2/async_producer.go:502 +0x60 github.com/Shopify/sarama.withRecover(0x0?) /Users/dinesh.udayakumar/go/pkg/mod/github.com/!shopify/sarama@v1.37.2/utils.go:43 +0x40 created by github.com/Shopify/sarama.(*asyncProducer).newTopicProducer /Users/dinesh.udayakumar/go/pkg/mod/github.com/!shopify/sarama@v1.37.2/async_producer.go:495 +0x1fc err=kafka: Failed to produce message to topic test_rn.pos_transaction_historical.pb: kafka server: Request was for a topic or partition that does not exist on this broker client/metadata fetching metadata for [test_rn.pos_transaction_historical.pb] from broker kafka:9092 Flush Start consumer/broker/0 accumulated 6 new subscriptions consumer/broker/0 added subscription to test_rn.pos_transaction_historical.pb/0 consumer/broker/0 added subscription to test_rn.pos_transaction.pb/2 consumer/broker/0 added subscription to test_rn.pos_transaction_historical.pb/1 consumer/broker/0 added subscription to test_rn.pos_transaction.pb/0 consumer/broker/0 added subscription to test_rn.pos_transaction_historical.pb/2 consumer/broker/0 added subscription to test_rn.pos_transaction.pb/1 Connected to broker at kafka:9092 (unregistered) client/brokers registered new broker #0 at 44fe8f289aff:29092producer/broker/0 state change to [open] on test_rn.pos_transaction.pb/2 producer/broker/0 starting up Connected to broker at 44fe8f289aff:29092 (registered as #0) utility.go:61: lightspeed_fetcher_test.go:314 Got error: kafka producer errors: [kafka: Failed to produce message to topic test_rn.pos_transaction_historical.pb: kafka server: Request was for a topic or partition that does not exist on this broker] --- FAIL: TestRoundTrip (0.15s) ```

Problem Description

We are using the AsyncProducer to publish messages to Kafka and in one of our test cases we are creating the AsyncProducer and publishing multiple messages and intermittently we see that the publish fails with topic/partition does not exist on this broker. By looking at the Kafka request logs and the sarama source code, I could identify that when the first 2 messages getting published are sent at the same millisecond the refreshMetadata method is returning nil and paritionMessage method is throwing the error.

This could be due to the below line in method tryRefreshMetadata in client.go

        t := atomic.LoadInt64(&client.updateMetaDataMs)
        if !atomic.CompareAndSwapInt64(&client.updateMetaDataMs, t, time.Now().UnixNano()/int64(time.Millisecond)) {
            return nil
        }
github-actions[bot] commented 1 year ago

Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. Please check if the main branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.

dnwe commented 1 year ago

@dineshudayakumar please can you test with latest sarama to see if the concurrency fixes have resolved this issue?

dineshudayakumar commented 1 year ago

Sure, will test and confirm in a day or two (and thanks for fixing)

dineshudayakumar commented 1 year ago

Looks like I am not seeing this issue anymore, happy to close it. Thank you!!