IBM / sarama

Sarama is a Go library for Apache Kafka.
MIT License
11.43k stars 1.75k forks source link

It took 5 seconds for a consumer group to start #2262

Open Siyu-silvia opened 2 years ago

Siyu-silvia commented 2 years ago
Versions

Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly.

Sarama Kafka Go
v1.34.0 1.16
Configuration

What configuration values are you using for Sarama and Kafka?

default configuration
Logs

When filing an issue please provide logs from Sarama and Kafka if at all possible. You can set sarama.Logger to a log.Logger to capture Sarama debug output.

logs: CLICK ME

``` ```

Problem Description

Hi, I'm using the sarama consumer group in our service. In the unit test, I created a mock broker, and create one consumer group object. In the test function, I started another goroutine to let the consumer group start to consume messages from the mocked broker, then the original one just sleep for a while and cancel the consumer group goroutine to stop consuming.

func (suite *KafkaConsumerTestSuite) TestConsumeMessage() {
    handler := func(ctx context.Context, message string) error {
        suite.logger.Printf("handle: %s", message)
        suite.True(message == "foo" || message == "bar")
        return nil
    }

    ctx, cancel := context.WithCancel(suite.ctx)

    // ConsumeMessage is a blocking call, runs in a goroutine
    go func() {
        suite.logger.Infof("start consuming")
                 // ConsumeMessage is a wrapper function of consumerGroup.Consume()
        suite.Nil(suite.consumer.ConsumeMessage(ctx, handler))
    }()

    // Wait 5 seconds to let ConsumeMessage finish and then cancel
    // Test on a mac with local kafka cluster shows it takes at least 5 seconds
    // for a sarama consumer group comming up and running
    time.Sleep(5 * time.Second)
    cancel()
}

After the test, I found it took at least 5s for a consumer group to start. If I set the time less then 5s, I got the error log:

time="2022-07-11T14:29:00-07:00" level=error msg="Error from consumer: kafka: tried to use a consumer group that was closed"

On the contrary, if it's longer than 5s :

time="2022-07-11T14:39:04-07:00" level=info msg="start consuming"
time="2022-07-11T14:39:08-07:00" level=info msg="Sarama consumer up and running!..."
time="2022-07-11T14:39:08-07:00" level=info msg="handle: foo"
time="2022-07-11T14:39:08-07:00" level=info msg="handle: bar"

We need to add many unit tests of consumer group to our service, including the functionality of handler, and each topic has different ways of handling. Since there is no mock for the consumer group, we create a mock broker for each consumer for simplicity. However, due to this "5s", our unit tests for the consumer are ended up around 1 min in total to finish, which is quite long, comparing to the rest module in our services which took at most 1s.

Is there anyway to get a quick response from the consumer group, or is there anything I missed? I really appreciate your help. Thanks

Siyu-silvia commented 2 years ago

To add timer in log make it more straight forward:

time="2022-07-11T16:40:00-07:00" level=info msg="Start start at 2022-07-11 16:40:00.558992 -0700 PDT m=+0.005016313"
time="2022-07-11T16:40:00-07:00" level=info msg="start consuming"
time="2022-07-11T16:40:04-07:00" level=info msg="Sarama consumer up and running!..."
time="2022-07-11T16:40:04-07:00" level=info msg="Start consume at 2022-07-11 16:40:04.565406 -0700 PDT m=+4.011459949"
time="2022-07-11T16:40:04-07:00" level=info msg="handle: foo"
time="2022-07-11T16:40:04-07:00" level=info msg="handle: bar"
tuliogomesbarbosa commented 2 years ago

The same thing is happening here, several integration tests are hanging because of a slow consumer group stuck at loopCheckPartitionNumbers. I tried to configure saramaCfg.Metadata.RefreshFrequency = 30 * time.Second, no luck so far.

It just gets stuck so far indefinitely. If I insert a sleep of ~ 5 seconds, it passes.

Let me know if you need more data.

github-actions[bot] commented 1 year ago

Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. Please check if the main branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.

dnwe commented 1 year ago

@tuliogomesbarbosa @Siyu-silvia are you still seeing this behaviour with modern Sarama versions? There were a bunch of fixes and improvements in this area of code over time.

github-actions[bot] commented 10 months ago

Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. Please check if the main branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.

yaron2 commented 9 months ago

@dnwe it looks like this is still happening: https://github.com/dapr/components-contrib/issues/3263

olitomlinson commented 9 months ago

@dnwe this is causing significant problems for us who use Dapr for Kafka PubSub

olitomlinson commented 9 months ago

@dnwe is this something that you are able to triage again?

github-actions[bot] commented 5 months ago

Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. Please check if the main branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.

ParrySMS commented 5 months ago

@tuliogomesbarbosa @Siyu-silvia are you still seeing this behaviour with modern Sarama versions? There were a bunch of fixes and improvements in this area of code over time.

Hi, Could you please help to show the pull request links?