Farfetch / kafkaflow

Apache Kafka .NET Framework to create applications simple to use and extend.
https://farfetch.github.io/kafkaflow/
MIT License
638 stars 114 forks source link

[Bug Report]: BatchConsumer with ManualMessageCompletion does not commit all messages #578

Open Gunth opened 3 months ago

Gunth commented 3 months ago

Prerequisites

Description

Hi there,

I'm trying to consume my kafka messages in batch and complete the message myself. For this I've added a consumer with batching configuration and WithManualMessageCompletion feature.

When consuming them in my handler I complete each messages from the batch and also the message containing the batch ( not sure if it's needed ). I'm seeing some logs saying "KafkaFlow: Offsets committed ..." but when I restart my service it re-process a lot of messages that has been already processed in my previous execution.

If I'm not using WithManualMessageCompletion() all messages a correctly committed and no messages are coming back with the next execution.

Did a miss something or we should not use ManualMessageCompletion with batching ?

G.

Steps to reproduce

Add consumer :

var config = new ConsumerConfig()
{
    BootstrapServers = configuration.GetValue<string>(Config.Properties.KafkaBrokers),
    SecurityProtocol = SecurityProtocol.SaslSsl,
    SaslMechanism = SaslMechanism.OAuthBearer,
    ClientId = "xyz",
};

.AddConsumer(consumer => consumer
    .WithConsumerConfig(new ConsumerConfig(config))
    .Topics([finalTopic, importTopic, globalTopic])
    .WithGroupId(topicConsumerGroup)
    .WithManualMessageCompletion()
    .AddMiddlewares(middlewares => middlewares
        .AddBatching(configuration.GetValue<int>(Config.Properties.KafkaMessageTrackerWorkersBatchSize
            , Config.Defaults.KafkaMessageTrackerWorkersBatchSize), TimeSpan.FromSeconds(5))
        .Add(resolver =>
        {
            return new TrackingEventProcessor();
        })
    )
)

Add handler to consume messages:

public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
{
    var messagesContext = context.GetMessagesBatch();

    DoSomeWorks...

    foreach (var messageContext in messagesContext)
    {
        messageContext.ConsumerContext.Complete();
    }

    context.ConsumerContext.Complete();

    await next(context);
}

Expected behavior

All messages should be committed when they are completed.

Actual behavior

Some of them are committed but not all of them. When sending 110 messages in kafka 23 are comming back on each new execution of the service.

KafkaFlow version

3.8.0