zarusz / SlimMessageBus

Lightweight message bus interface for .NET (pub/sub and request-response) with transport plugins for popular message brokers.
Apache License 2.0
466 stars 78 forks source link

[Host.Kafka] Leverage autocommit to achieve faster consumption #131

Open zarusz opened 1 year ago

zarusz commented 1 year ago

Currently, the Kafka provider uses a manual commit approach (every N message, every M seconds). We should also give the option to perform auto commits by Kafka. This should significantly increase consumption speed.

themiken commented 1 week ago

Hi @zarusz!!!

Currently, messages are being auto-committed by the consumer. However, I need to handle the commit manually because my consumer is encountering an exception during message processing. Despite the error, the library is still committing the offset, which is not the desired behavior.

Is there a way to disable the auto-commit feature and manually commit the messages only after successful processing?

Correct me if what I mentioned earlier is correct.

zarusz commented 1 week ago

So yes, the kafka provider moves forward which is not desired in your case.

This is a standardized way to tap into error handling (in case you haven't seen it): https://github.com/zarusz/SlimMessageBus/blob/master/docs/intro.md#error-handling

However, what is the desired error handling? If not auto commit and move forward then what else?

I could expose something similar as for RabbitMQ if that would work for you to give you a bit more control https://github.com/zarusz/SlimMessageBus/blob/master/docs/provider_rabbitmq.md#custom-consumer-error-handler

See option 1 (new feature). Options 2 and 3 should be achievable now.

public class CustomKafkaConsumerErrorHandler<T>(IMessageBus Bus) : IKafkaConsumerErrorHandler<T>
{
    // Inject needed dependencies via construction

    public async Task<bool> OnHandleError(T message, Func<Task> retry, IConsumerContext consumerContext, Exception exception)
    {
        // Option1: New feature to stop consumer
        // consumerContext.StopConsumer();

        // Option 2: Retry once more
        // await retry(); 

        // Option3: or forward the message to another topic: 
        // await Bus.Publish(message, path: "errors");

        return true; // signify the error has been handled, it will advance offset to next message
    }
}

Let me know.

jarb46 commented 1 week ago

How can I generate manual commit on the "SlimMessageBus.Host.Kafka" library? Since the "EnableAutoCommit" property is set to "false" in the "CreateConsumer" method of the "KafkaGroupConsumer" class. Additionally, what should be applied so that the "OnStart" method of the "KafkaGroupConsumer" class does not fire? since the exercise was done to modify this value of the "EnableAutoCommit" property to "true", but the "OnStart" method is still being triggered, which generates actions to process the message and commit the message

zarusz commented 6 days ago

How can I generate manual commit on the "SlimMessageBus.Host.Kafka" library? Since the "EnableAutoCommit" property is set to "false" in the "CreateConsumer" method of the "KafkaGroupConsumer" class.

The current version of the provides only supports manual commits managed by the library, which you can set every N messages or after every T time. Looking at the docs right now, it's not documented so I will add it here soon.

The consumer implementation does override the EnableAutoCommit here to take over the the control on when to commit.

Additionally, what should be applied so that the "OnStart" method of the "KafkaGroupConsumer" class does not fire? since the exercise was done to modify this value of the "EnableAutoCommit" property to "true", but the "OnStart" method is still being triggered, which generates actions to process the message and commit the message

By default when the Bus starts it will auto start all consumers. Wheather the consumers should start or to stop them during runtime could be controlled see here.

What I am getting from this conversation including feedback (from @themiken earlier) is that we need these features:

This ticket was originally about enabling auto commit controlled purely by Kafka (to increase throughput)

Please help me understand given what I've explained above, what is missing feature wise and I can look at adding it. We'd have to look at the proposed API in this ticket, iterate few times to get it right and then I could follow with an enhancement.

jarb46 commented 6 days ago

Thank you very much in advance for answering our questions.

To better understand our problem, it is all related to the loss of messages when consuming them, either because an error occurs in our application layer or because an error occurs in the "SlimMessageBus" library.

In our tests, linking the library as a project, we have identified that in the "ConsumerLoop" method of the "KafkaGroupConsumer" class, the "OnPartitionEndReached(consumeResult.TopicPartitionOffset);" instruction is executed, which causes it to generate commits to the offset of partitions that have not been processed and therefore generate the loss of these messages, and then execute in the same loop statement the instruction "await OnMessage(consumeResult).ConfigureAwait(false);", which processes the message obtained from the identified offset and commits the corresponding offset. To avoid the above, the value of the "EnablePartitionEof" property found in the "CreateConsumer" method of the "KafkaGroupConsumer" class was modified to "false" and thus avoid executing the "OnPartitionEndReached(consumeResult.TopicPartitionOffset);" instruction. These actions could be controlled if the "EnablePartitionEof" property could be configured from the user, so that the "OnPartitionEndReached(consumeResult.TopicPartitionOffset);" instruction is not executed. Additionally, it was found that when applying the previously mentioned setting with the "await OnMessage(consumeResult).ConfigureAwait(false);" instruction and having around 5 events/messages in queue, commits are generated only for some messages, not for all 5 events/messages, even so they can be processed without incident in our application layer. Following up on this latest development, we found that after processing the offset message in our application and then applying the commit, it was not generated, since the return value of the "Increment" method of the "CheckpointTrigger" class was "false" preventing the commit from being made, this caused it to consume an event again at an offset that had already been processed, but without generating a commit, duplicating events/messages in the application.

We would appreciate your advice to achieve the correct generation of commits, without generating loss of events/messages, whether the control is held by the library or by the user.

zarusz commented 6 days ago

@jarb46 thanks for reporting and the detail provided.

I have few additional questions:

Let me analyze the problem here on my side.

Also as part of #300, I clarified the documentation.

As a side note there is this Source Link which should allow you to step into the library source.

zarusz commented 5 days ago

@jarb46 I've identified the issue around the PartitionEOF and fixed it in #301. There is a preview version if you want to try it: https://www.nuget.org/packages/SlimMessageBus.Host.Kafka/2.5.3-rc1

I need to address a second part of the problem, so I will report here once I have an update.