antonyvorontsov / RabbitMQ.Client.Core.DependencyInjection

.Net Core library-wrapper of RabbitMQ.Client for Dependency Injection.
MIT License
111 stars 36 forks source link

Reworked pipelines and added middlewares (while removing filters). #83

Closed antonyvorontsov closed 3 years ago

antonyvorontsov commented 3 years ago

This PR is still in work in progress coz I want to add some additional functionality in here. But anyway there are some things that can be useful.

I figured out that old "pipeline filters" functionality actually pointless and those filters can't be used properly. Thus, they have been removed, but the middleware substitution has been added.

Now MessageHandlingPipelineExecutingService work with the context MessageHandlingContext model that contains consumed message (of BasicDeliverEventArgs). MessageHandlingPipelineExecutingService passes that context to the pipeline of middlewares. You can add one simply implementing IMessageHandlingMiddleware interface

public class CustomMiddleware : IMessageHandlingMiddleware
{
        private readonly IProducingService _producingService;
        public CustomMiddleware(IProducingService producingService)
        {
            _producingService = producingService;
        }

        // A part of common executing pipeline. You can add additional logic right here (e.g. logging, metrics and so on).
        public async Task Handle(MessageHandlingContext context, Func<Task> next)
        {
            await next();
        }

        // A part of the execution pipeline that takes part when you can't process consumed message.
        public async Task HandleError(MessageHandlingContext context, Exception exception, Func<Task> next)
        {
            var message = context.Message;
            // Do something with consumed message.
            await _producingService.SendAsync(...)

            await next();
        }
}

And register it as via DI

services.AddMessageHandlingMiddleware<CustomMiddleware>();

You can also add multiple middlewares and combine them. As for RequeueFailedMessages functionality - you can disable default implementation and use your own. I am going to add a feature that will allow you to change HandleFailedMessageProcessing method logic as well. In this way the library will provide two options of reprocessing failed messages.

antonyvorontsov commented 3 years ago

@sblasetti I am looking for your approval or disapproval and some comments about how it fits an initial request. Please, take a look at it and let me know. Thank you!

antonyvorontsov commented 3 years ago

I've just separated message processing service into two services with smaller responsibility. Now we have an interface

public interface IErrorProcessingService
    {
        Task HandleMessageProcessingFailure(MessageHandlingContext context, Exception exception);
    }

and its default implementation with re-queueing and other stuff

public class ErrorProcessingService : IErrorProcessingService
{
    private readonly IProducingService _producingService;
    private readonly IEnumerable<RabbitMqExchange> _exchanges;
    private readonly ILoggingService _loggingService;

    public ErrorProcessingService(
        IProducingService producingService,
        IEnumerable<RabbitMqExchange> exchanges,
        ILoggingService loggingService)
    {
        _producingService = producingService;
        _exchanges = exchanges;
        _loggingService = loggingService;
    }

    /// <inheritdoc />
    public async Task HandleMessageProcessingFailure(MessageHandlingContext context, Exception exception)
    {
        var eventArgs = context.Message;
        context.AckAction(eventArgs);
        _loggingService.LogError(exception, $"An error occurred while processing received message with the delivery tag {eventArgs.DeliveryTag}.");
        await HandleFailedMessageProcessing(eventArgs).ConfigureAwait(false);
    }

    private async Task HandleFailedMessageProcessing(BasicDeliverEventArgs eventArgs)
    {
        var exchange = _exchanges.FirstOrDefault(x => x.Name == eventArgs.Exchange);
        if (exchange is null)
        {
            _loggingService.LogWarning($"Could not detect an exchange \"{eventArgs.Exchange}\" to determine the necessity of resending the failed message. The message won't be re-queued");
            return;
        }

        if (!exchange.Options.RequeueFailedMessages)
        {
            _loggingService.LogWarning($"RequeueFailedMessages option for an exchange \"{eventArgs.Exchange}\" is disabled. The message won't be re-queued");
            return;
        }

        if (string.IsNullOrEmpty(exchange.Options.DeadLetterExchange))
        {
            _loggingService.LogWarning($"DeadLetterExchange has not been configured for an exchange \"{eventArgs.Exchange}\". The message won't be re-queued");
            return;
        }

        if (exchange.Options.RequeueTimeoutMilliseconds < 1)
        {
            _loggingService.LogWarning($"The value RequeueTimeoutMilliseconds for an exchange \"{eventArgs.Exchange}\" less than 1 millisecond. Configuration is invalid. The message won't be re-queued");
            return;
        }

        if (exchange.Options.RequeueAttempts < 1)
        {
            _loggingService.LogWarning($"The value RequeueAttempts for an exchange \"{eventArgs.Exchange}\" less than 1. Configuration is invalid. The message won't be re-queued");
            return;
        }

        if (eventArgs.BasicProperties.Headers is null)
        {
            eventArgs.BasicProperties.Headers = new Dictionary<string, object>();
        }

        if (!eventArgs.BasicProperties.Headers.ContainsKey("re-queue-attempts"))
        {
            eventArgs.BasicProperties.Headers.Add("re-queue-attempts", 1);
            await RequeueMessage(eventArgs, exchange.Options.RequeueTimeoutMilliseconds);
            return;
        }

        var currentAttempt = (int)eventArgs.BasicProperties.Headers["re-queue-attempts"];
        if (currentAttempt < exchange.Options.RequeueAttempts)
        {
            eventArgs.BasicProperties.Headers["re-queue-attempts"] = currentAttempt + 1;
            await RequeueMessage(eventArgs, exchange.Options.RequeueTimeoutMilliseconds);
        }
        else
        {
            _loggingService.LogInformation("The failed message would not be re-queued. Attempts limit exceeded");   
        }
    }

    private async Task RequeueMessage(BasicDeliverEventArgs eventArgs, int timeoutMilliseconds)
    {
        await _producingService.SendAsync(eventArgs.Body, eventArgs.BasicProperties, eventArgs.Exchange, eventArgs.RoutingKey, timeoutMilliseconds);
        _loggingService.LogInformation("The failed message has been re-queued");
    }
}

But for now if you ain't satisfied with this implementation you can write your own error processing service simply implementing given interface

public class CustomErrorProcessingService : IErrorProcessingService
{
    public Task HandleMessageProcessingFailure(MessageHandlingContext context, Exception exception)
    {
        // Do not forget to ack a message (!).
        context.AckAction(context.Message);

        // Do other things like resending messages or writing them into DB.
        return Task.CompletedTask;
    }
}

and then register it in DI

collection.AddCustomMessageHandlingErrorProcessingService< CustomErrorProcessingService >();

It will do the work for you for replacing default implementation with your own

public static IServiceCollection AddCustomMessageHandlingErrorProcessingService<T>(this IServiceCollection services)
    where T : class, IErrorProcessingService
{
    var descriptor = services.FirstOrDefault(x => x.ServiceType == typeof(IErrorProcessingService));
    if (descriptor is not null)
    {
        services.Remove(descriptor);
    }

    return services.AddSingleton<IErrorProcessingService, T>();
}

Basically this is another way of handling errors (a.k.a. exceptions).