Azure / azure-functions-kafka-extension

Kafka extension for Azure Functions
MIT License
114 stars 78 forks source link

autocommit flag #183

Open borys86 opened 3 years ago

borys86 commented 3 years ago

Dear developers AutoCommitIntervalMs allows to set interval for auto commit. What about a case when handling failed we want to retry, could it be possible to set auto commit to false in some way?

TsuyoshiUshio commented 3 years ago

Thank you for the suggestion, Let me investigate it.

rkbalu commented 3 years ago

@TsuyoshiUshio - The current default value is 5sec as per librdkafka configuration. Does this mean any consumer must be able to process the message (Say batch size is 64) in 5 sec to avoid data loss?

tech7857 commented 3 years ago

Hello @TsuyoshiUshio

any timeline on this enhancement? We also have a need for functionality where to read the same message again if it failed in any way while processing.

Thanks

TsuyoshiUshio commented 3 years ago

Probably, most of the case, we don't need to set auto commit to false. We have Retry policy https://docs.microsoft.com/en-us/azure/azure-functions/functions-bindings-error-pages?tabs=csharp#retry-policies-preview. Combined with this retry policy, what happens is,

  1. Kafka Extension read the messages from a broker
  2. Try Execute your function, if it throws exception, it will replay according to the Retry Policy. However, just repeat 2. Once success or reach the threshold, go on to step 3.
  3. Execute Store Offset. https://docs.confluent.io/clients-confluent-kafka-dotnet/current/overview.html#store-offsets
  4. Librdkafka will auto-commit in background only the Store Offset is committed.

So that most of the case, message loss doesn't happen. If the functions host is dead between 3 and 4, I could happen. However, if we make it Auto commit as false and do Synchronous commit strategy, we can reduce the risk, however, it will impact the throughput.

So that IMO, we don't need the Auto commit as false. However, if you guys really need, our extension has extension part of the Commit Strategy, the implementation is not very difficult for Synchronous commit.

tech7857 commented 3 years ago

@TsuyoshiUshio

Thanks for replying..

If in step 2 , we are unable to process the message because of any reason maybe message is bad or application error, we dont want that message to commit if retry also exhausted, so what should be done to stop commit of that message and further processing?

Thanks

TsuyoshiUshio commented 3 years ago

Through the discussion, what you want is, the feature that is not commit if the exception thrown, right?

It is possible, however, currently we are designing the exception handling as the same as EventHubs extension. https://docs.microsoft.com/en-us/azure/azure-functions/functions-reliable-event-processing#how-azure-functions-consumes-event-hubs-events

I need to discuss with @apawast . What do you think?

tech7857 commented 3 years ago

@TsuyoshiUshio

Yes, this is correct. If there is an exception we dont want to commit and halt the process.

thanks

tech7857 commented 3 years ago

@TsuyoshiUshio - any update on this?

MichalLechowski commented 2 years ago

I am having similar problem. Nothing good comes out from Retry Policy, it's just counting on a successful processing on a 2nd or 3th try. If it doesn't happen, we still have no way to suppress committing offset and we lose a message.

I've investigated a bit and it seems like the only way for now to make use of such feature is to use Confluent.Kafka package directly and setup Consumer manually.

We'd sure appreciate Commit() method access and EnableAutoCommit flag within trigger options.

the-rule commented 1 year ago

We'd also benefit greatly from having Commit() method especially since we committed to the latest and greatest for new projects like .net7 and Isolated process functions and retry policies are not available there.

superkartoffel commented 1 year ago

We use ExponentialBackoffRetry with maxRetryCount=-1 That way we do not loose any messages, since it will retry indefinitely if an exception occurs.

Nevertheless, it would be nice if we could prevent the extension from calling StoreOffset for use-cases where we need to delay the processing of messages further and only want to commit them after processing has finished. We would then call storeOffset manually. What we need would be the equivalent behavior of kafka-dotnet when

    EnableAutoCommit = true // (the default)
    EnableAutoOffsetStore = false

as described here: https://docs.confluent.io/kafka-clients/dotnet/current/overview.html#store-offsets

@TsuyoshiUshio , you mention that it is possible to extent the behavior of the extension in that regard? Can you point me to where to start?