Farfetch / kafkaflow-retry-extensions

Kafka Flow Retry Patterns Extensions
https://farfetch.github.io/kafkaflow-retry-extensions/
MIT License
55 stars 7 forks source link

[Feature Request]: Honor typed handler lifetime #142

Open seruminar opened 8 months ago

seruminar commented 8 months ago

Is your request related to a problem you have?

Please let me know if this discussed somewhere πŸ™‚

It appears that a consumer configuration similar to the following is not supported:

middlewareConfiguration
    .RetryForever((retryDefinition) =>
        retryDefinition
            .HandleAnyException()
            .WithTimeBetweenTriesPlan(
                TimeSpan.FromSeconds(10)
                )
        );

middlewareConfiguration
    .AddTypedHandlers(handlerConfiguration => handlerConfiguration
        .WithHandlerLifetime(InstanceLifetime.Transient)  // <----------------------------------------
        .AddHandlersFromAssemblyOf(assembly.DefinedTypes.First())
        );

Specifically, non-singleton values of InstanceLifetime do not imply that a new handler is created when an exception is thrown (deep) inside a handler and it bubbles up all the way to Polly. I can observe this by altering a handler dependency (i.e. setting a property value), watching the retry occur, and then inspecting that altered property.

This tends to cause a problem with disposed DB connections πŸ˜…

Describe the solution you'd like

Does this make sense? Is there a workaround? Should I try to minimally reproduce in case something else is going on?

Ideally the retry middleware would be placed in a way that creates the handler again as if it was a new incoming message.

Are you able to help bring it to life and contribute with a Pull Request?

No

Additional context

It may not be possible to do this with middlewares based on the implementation of https://github.com/Farfetch/kafkaflow/blob/master/src/KafkaFlow/MiddlewareExecutor.cs

luispfgarces commented 8 months ago

Hi @seruminar,

From your description, it seems to be a bug, but we'll check this out and get back to you with further details. Can you provide us with a minimal repro for us to look into it?

seruminar commented 7 months ago

Hi @luispfgarces thank you for the response!

Attached is a csproj and Program.cs for .NET 6 and KafkaFlow 2.5.0 and KafkaFlow.Retry 2.1.3.

KafkaflowRetryScopeIssue1.zip

Assuming there is Kafka running at localhost:9092, for example from https://hub.docker.com/r/confluentinc/cp-kafka, my console output is as follows:

Hello, World!
Starting host

KafkaFlow: An error occurred creating topic {Topic}: {Reason} | Data: {"Topic":"topic1","Reason":"Topic \u0027topic1\u0027 already exists."}
info: Microsoft.Hosting.Lifetime[0]
      Application started. Press Ctrl+C to shut down.
info: Microsoft.Hosting.Lifetime[0]
      Hosting environment: Production
info: Microsoft.Hosting.Lifetime[0]
      Content root path: C:\S\s\KafkaflowRetryScopeIssue1\KafkaflowRetryScopeIssue1\bin\Debug\net6.0
Starting test
Producing message

KafkaFlow: Partitions assigned | Data: {"GroupId":"KafkaflowRetryScopeIssue1_topic1","ConsumerName":"f8e87759-9de4-4f86-94da-11d33325acce","Topics":[{"Topic":"topic1","PartitionsCount":1,"Partitions":[0]}]}
Message with a new scope every time: test1

KafkaFlow: Consumer paused by retry process | Data: {"ConsumerGroup":"KafkaflowRetryScopeIssue1_topic1","ConsumerName":"f8e87759-9de4-4f86-94da-11d33325acce","Worker":0}

KafkaFlow: Exception captured by RetryForeverMiddleware. Retry in process. | Data: {"AttemptNumber":1,"WaitMilliseconds":2000,"PartitionNumber":0,"Worker":0,"ExceptionType":"System.NotSupportedException"} | Exception: {"Type":"System.NotSupportedException","Message":"Specified method is not supported.","StackTrace":"   at TestMessageHandler.Handle(IMessageContext context, TestMessage message) in C:\\S\\s\\KafkaflowRetryScopeIssue1\\KafkaflowRetryScopeIssue1\\Program.cs:line 186\r\n   at KafkaFlow.TypedHandler.TypedHandlerMiddleware.\u003C\u003Ec__DisplayClass3_0.\u003CInvoke\u003Eb__0(Type handler)\r\n   at System.Linq.Enumerable.SelectListIterator\u00602.MoveNext()\r\n   at System.Threading.Tasks.Task.WhenAll(IEnumerable\u00601 tasks)\r\n   at KafkaFlow.TypedHandler.TypedHandlerMiddleware.Invoke(IMessageContext context, MiddlewareDelegate next)\r\n   at Polly.AsyncPolicy.\u003C\u003Ec__DisplayClass40_0.\u003C\u003CImplementationAsync\u003Eb__0\u003Ed.MoveNext()\r\n--- End of stack trace from previous location ---\r\n   at Polly.Retry.AsyncRetryEngine.ImplementationAsync[TResult](Func\u00603 action, Context context, CancellationToken cancellationToken, ExceptionPredicates shouldRetryExceptionPredicates, ResultPredicates\u00601 shouldRetryResultPredicates, Func\u00605 onRetryAsync, Int32 permittedRetryCount, IEnumerable\u00601 sleepDurationsEnumerable, Func\u00604 sleepDurationProvider, Boolean continueOnCapturedContext)"}
Message with a reused scope: test1

KafkaFlow: Consumer resumed by retry process | Data: {"ConsumerGroup":"KafkaflowRetryScopeIssue1_topic1","ConsumerName":"f8e87759-9de4-4f86-94da-11d33325acce","Worker":0}

KafkaFlow: Offsets committed | Data: {"Offsets":[{"Topic":"topic1","Partitions":[{"Partition":0,"Offset":9}]}]}

I expect to see just the line Message with a new scope every time: test1 and an exception and retry loop over and over. Instead, it shows that and then completes with Message with a reused scope: test1.