Farfetch / kafkaflow-retry-extensions

Kafka Flow Retry Patterns Extensions
https://farfetch.github.io/kafkaflow-retry-extensions/
MIT License
56 stars 7 forks source link

[Feature Request]: Have Durable Retry support Kafka retry topics #102

Closed douglasg14b closed 1 year ago

douglasg14b commented 1 year ago

Is your request related to a problem you have?

I don't use SQL Server or MongoDB. I use PostgreSQL. Everyone using this library is using Kafka, it could support Kafka retry topics out of the box for complete support for all potential users.

Describe the solution you'd like

Instead of sending these off to a specific, supported, flavor of DB. Send these off to a retry topic that can then be consumed from with a Forever Retry policy.

Are you able to help bring it to life and contribute with a Pull Request?

Maybe

Additional context

I'm in the processes of switching to Kafka Flow, so I am unwilling to make a PR contribution till I find that I am stable in using it and start investing on retries which could involve this lib assuming such support either exists by that point, or I try and add such support myself.

martinhonovais commented 1 year ago

Hi @douglasg14b First of all, thanks for your request. We are very glad to hear from you.

About your request, we already have considered developing all these features using Kafka as a repository. Once we do it, there won't be more dependencies beyond Kafka. You can keep track of it here #53.

However, while we don’t have it, our interpretation of your request is that you can accomplish is now by using a middleware that publishes to a retry topic and then you can have a second consumer consuming from there with a retry it forever. Something like this:

Kafka Config:

.AddConsumer(consumer => consumer
    .Topic("main-topic")
    .WithGroupId("main-topic")
    .WithName("MainTopic")
    .WithBufferSize(1)
    .WithWorkersCount(1)
    .WithAutoOffsetReset(AutoOffsetReset.Latest)
    .AddMiddlewares(
        middlewares => middlewares
            .Add<ExceptionCatcherAndProduceToAnotherTopicMiddleware>()
            .AddTypedHandlers(
                handlers => handlers
                    .WithHandlerLifetime(InstanceLifetime.Transient)
                    .AddHandler<ExceptionCatcherAndProduceToAnotherTopicHandler>()))
    )
.AddProducer(
    "producer-dlq-topic",
    producer => producer
        .DefaultTopic("retry-dlq-topic")
        .WithAcks(Acks.All)
)
.AddConsumer(consumer => consumer
    .Topic("retry-dlq-topic")
    .WithGroupId("retry-dlq-topic")
    .WithName("RetryDlqTopic")
    .WithBufferSize(1)
    .WithWorkersCount(1)
    .WithAutoOffsetReset(AutoOffsetReset.Latest)
    .AddMiddlewares(
        middlewares => middlewares
            .RetryForever(
                (configure) => configure
                    .Handle<NotImplementedException>()
                    .WithTimeBetweenTriesPlan(
                        TimeSpan.FromMilliseconds(500),
                        TimeSpan.FromMilliseconds(1000))
            )
            .AddTypedHandlers(
                handlers => handlers
                    .WithHandlerLifetime(InstanceLifetime.Transient)
                    .AddHandler<RetryDlqTopicHandler>()))
)

Middleware:

internal class ExceptionCatcherAndProduceToAnotherTopicMiddleware : IMessageMiddleware
{
    private readonly IMessageProducer messageProducer;

    public ExceptionCatcherAndProduceToAnotherTopicMiddleware(
        IProducerAccessor producerAccessor)
    {
        this.messageProducer = producerAccessor.GetProducer("producer-dlq-topic");
    }

    public async Task Invoke(
        IMessageContext context,
        MiddlewareDelegate next)
    {
        try
        {
            await next(context).ConfigureAwait(false);
        }
        catch (NotImplementedException)
        {
            await this.messageProducer.ProduceAsync(context.Message.Key, context.Message.Value).ConfigureAwait(false);
        }
    }
}

Let us know if we are thinking on the same page, please.

douglasg14b commented 1 year ago

Hi, that's actually great!

Thanks for this. It will be a bit before I swing back around to this.

martinhonovais commented 1 year ago

Thank you. Closing this issue. The feature can be tracked on #53.

gsferreira commented 1 year ago

@douglasg14b by the way, there's an open PR to add PostgreSQL as a Storage provider. You can find it here https://github.com/Farfetch/kafkaflow-retry-extensions/pull/108.