Open SonicGD opened 7 months ago
Hello @SonicGD I've pin-pointed the problem in KafkaFlow.
The issue lies with the revoking handling code. The service responsible for pausing/resuming the consumer is being stopped before the retry loop has a chance to resume the consumer. Effectively not doing anything that actually resumes the consumer.
I've opened an issue there where you can read a bit more in detail and I've also opened a PR with a fix.
Also, if you'd like to see the problem manifesting in you repo, change your handler to the following code:
using KafkaFlow;
namespace KafkaRetryStuck;
using KafkaFlow.Consumers;
using Microsoft.Extensions.Logging;
class MessagesHandler : IMessageHandler<TestMessage>
{
private readonly IConsumerAccessor consumerAccessor;
private readonly ILogger<MessageHeaders> logger;
public MessagesHandler(IConsumerAccessor consumerAccessor, ILogger<MessageHeaders> logger)
{
this.consumerAccessor = consumerAccessor;
this.logger = logger;
}
public async Task Handle(IMessageContext context, TestMessage message)
{
var pausedPartitions = this.consumerAccessor.GetConsumer(context.ConsumerContext.ConsumerName)
.PausedPartitions;
this.logger.LogInformation($"Paused partitions count {pausedPartitions.Count}");
await Task.Delay(TimeSpan.FromSeconds(1));
if (message.Id == TestMessage.BadId)
{
throw new InvalidOperationException("BAD MESSAGE");
}
}
}
It prints the number of paused partitions. You will see it print 0 right before the last retry of the loop, this is when the internal list of topic+partitions was cleared, and why the resume doesn't work.
22:33:46 info: KafkaFlow.MessageHeaders[0] Paused partitions count 8
22:33:58 warn: KafkaFlow[0] Partitions revoked | Data: {"GroupId":"consumers","ConsumerName":"consumer-2","Topics":[{"Topic":"test-topic","PartitionsCount":8,"Partitions":[1,4,7,10,13,16,19,22]}]}
22:34:12 info: KafkaFlow.MessageHeaders[0] Paused partitions count 0
22:34:13 fail: KafkaFlow[0] Exception captured by RetryForeverMiddleware. Retry in process. | Data: {"AttemptNumber":3,"WaitMilliseconds":120000,"PartitionNumber":16,"Worker":5,"ExceptionType":"System.InvalidOperationException"} System.InvalidOperationException: BAD MESSAGE at KafkaRetryStuck.MessagesHandler.Handle(IMessageContext context, TestMessage message) in C:\source\KafkaRetryStuck\KafkaRetryStuck\MessagesHandler.cs:line 30 at KafkaFlow.Middlewares.TypedHandler.TypedHandlerMiddleware.Invoke(IMessageContext context, MiddlewareDelegate next) at KafkaFlow.Middlewares.Serializer.DeserializerConsumerMiddleware.Invoke(IMessageContext context, MiddlewareDelegate next) at Polly.AsyncPolicy.<>c__DisplayClass40_0.<<ImplementationAsync>b__0>d.MoveNext() --- End of stack trace from previous location --- at Polly.Retry.AsyncRetryEngine.ImplementationAsync[TResult](Func`3 action, Context context, CancellationToken cancellationToken, ExceptionPredicates shouldRetryExceptionPredicates, ResultPredicates`1 shouldRetryResultPredicates, Func`5 onRetryAsync, Int32 permittedRetryCount, IEnumerable`1 sleepDurationsEnumerable, Func`4 sleepDurationProvider, Boolean continueOnCapturedContext)
22:34:28 info: KafkaFlow[0] Consumer resumed by retry process | Data: {"ConsumerGroup":"consumers","ConsumerName":"consumer-2","Worker":5}
Thank you so much! This issue caused us many problems. Our current fix is to use custom rerty middleware, which stores paused partitions list and assignment handler to restart consumer if those paused partitions are assigned again. Will wait for your fix to be merged :)
Prerequisites
Description
Hello. We are using RetryForeverMiddleware in our project. And we experienced this strange behavior. If there is partitions rebalance ( other worker join/leave group ) while message is in retry loop - the loop will stop and processing of partition will not continue.
I'm not sure that this is retry middleware problem, maybe it caused by KafkaFlow/Confluent.Kafka/librdkafka. But let's start here =)
Steps to reproduce
docker-compose.yml
from sample repo to start kafka and zookeeper.bin/Debug/net7.0
directoryAnd nothing happening. We can see that there is lag for partition: But no attempts to continue processing again.
If we restart this stuck consumer - then processing will begin:
Expected behavior
After rebalance is complete consumer should again start to process "bad" message.
Actual behavior
Consumer is stuck, processing is stopped
KafkaFlow Retry Extensions version
3.0.1