Open billygout opened 2 years ago
Hi @dnwe, could you perhaps help me find someone to answer this question? thanks!
Hi @dnwe, this remains an issue for me. Can you please advise? thanks!
Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. Please check if the main branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.
@billygout hmm, yes I’m surprised the example code has a Fatal here, whilst the client does attempt to resolve recoverable errors with a few attempts, the Consume call can return an err here if (e.g.,) metadata failed to refresh within the configured max attempts so it would be more sensible to log or metric based on the error type
hi @dnwe, thank you for your response. This issue is still a problem for me, and I'm not sure what the error types may be here. Can you please advise further on that or let me know if there are plans to update the example? Thanks!
hi @billygout , just out of interest, are you connecting to a self managed cluster or a Confluent Cloud Cluster? I am connected to a Confluent cluster, and it appears periodically that the connection dies. I experience consistent errors, at least once a week or so, where I get either the i/o timeout or write tcp ... write: broken pipe, and then my worker hangs indefinitely I am running one behind the latest - 1.40.0 version.
@david-bergman id recommend raising your own dedicated issue describing the problem and whether you’re seeing the lockup in producer or consumer etc. I/o timeout and disconnections are fairly typical when a cloud service does rolling restart or reboots, but your client should always be able to recover
thanks @dnwe , I will do that, my issue is with the consumer, on the consumergroup consume() method, I have implemented a code change now with a set of staggered retries in the loop on an error, where it continues instead of breaking out the loop, basically calling the consume method again even after the initial error. I will raise a separate issue if this does not resolve it on my end.
Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. Please check if the main branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.
@david-bergman Can you please let us know whether your changes are known to be working? If so, perhaps you can share them so that they may be worked into the example (https://github.com/IBM/sarama/blob/main/examples/consumergroup/main.go)? Thanks!
@dnwe Could we please keep this open until the consumergroup example may be updated per best practices?
hi @billygout , I think already the example code is better now that it has now been updated, removing the log.Panicf which was the default response to an error condition breaking out of the infinite for loop.
I found I had to put a retry mechanism in place to get past the consumer group breaking out of the loop. this code is what worked for me in the end. I would assume the custom retry should be unnecessary, as the retry strategy in the configuration should be good enough?
go func() {
defer c.group.Close()
maxRetries := 10
retryCount := 0
for {
err := c.group.Consume(ctx, topics, &c.handler)
if err != nil {
retryCount++
if retryCount >= maxRetries {
close(c.handler.msgc)
close(done)
return
}
time.Sleep(time.Duration(1+retryCount) * time.Second)
continue
}
retryCount = 0
if ctx.Err() != nil {
fmt.Println("context closed")
close(c.handler.msgc)
close(done)
return
}
}
}()
@david-bergman thanks for sharing. I don't know the answer to the question. Also, I note that the log.Panicf
is still present in the latest code at the time of this posting: https://github.com/IBM/sarama/blob/d4dc7bcff59ae890d90746b8b5fe257e6a7caa57/examples/exactly_once/main.go#L136
Versions
Configuration
What configuration values are you using for Sarama and Kafka?
Logs
read tcp [<REDACTED>]:58236->[<REDACTED>]:9093: i/o timeout
Problem Description
During a network maintenance when connectivity between consumers and kafka was temporarily unavailable, my application crashed with a log.Fatalf() call, displaying the error shown above (
i/o timeout
). Thelog.Fatalf()
call is placed in afor
loop followed the pattern found in sarama's provided consumer group example, as shown below: https://github.com/Shopify/sarama/blob/610514edec1825240d59b62e4d7f1aba4b1fa000/examples/consumergroup/main.go#L104-L116Question: Is it advisable to place in a production app the
log.Fatalf()
call in the same way as is found in the example shown above? Assuming a temporary network outage occurs, would it be better to simply log the Consume() error and continue to the next loop iteration in order to try calling Consume() again, hopefully successfully? Or does Consume() only ever return a non-nil error when in an unrecoverable state and thus exiting the program is the best recourse?