antonyvorontsov / RabbitMQ.Client.Core.DependencyInjection

.Net Core library-wrapper of RabbitMQ.Client for Dependency Injection.
MIT License
111 stars 36 forks source link
dependencyinjection rabbitmq

# RabbitMQ.Client.Core.DependencyInjection


Codacy Badge

This repository contains the library that provides functionality for wrapping RabbitMQ.Client code and adding it in your application via the dependency injection mechanism. That wrapper provides easy, managed message queue consume and publish operations. The library targets netstandard2.1.

(!)Current version of documentation is out of date for 5.0.0. Stay updated.(!)

Usage

This section contains only an example of a basic usage of the library. You can find the detailed documentation in the docs directory where all functionality fully covered.

Producer

To produce messages in the RabbitMQ queue you have to go through the routine of configuring a RabbitMQ connection and exchanges. In your Startup file you can do it simply calling couple methods in a fluent-Api way.

public static IConfiguration Configuration { get; set; }

public void ConfigureServices(IServiceCollection services)
{
    var rabbitMqSection = Configuration.GetSection("RabbitMq");
    var exchangeSection = Configuration.GetSection("RabbitMqExchange");

    services.AddRabbitMqClient(rabbitMqSection)
        .AddProductionExchange("exchange.name", exchangeSection);
}

By calling AddRabbitMqClient you add IQueueService that provides functionality of sending messages to queues. AddProductionExchange configures exchange to queues bindings (presented in json configuration) that allow messages to route properly. Example of appsettings.json is two sections below. You can also configure everything manually. For more information, see rabbit-configuration and exchange-configuration documentation files.

Now you can inject an instance of IQueueService inside anything you want.

[Route("api/[controller]")]
public class HomeController : Controller
{
    readonly IQueueService _queueService;
    public HomeController(IQueueService queueService)
    {
        _queueService = queueService;
    }
}

Now you can send messages using Send or SendAsync methods.

var messageObject = new
{
    Id = 1,
    Name = "RandomName"
};

await _queueService.SendAsync(
    @object: messageObject,
    exchangeName: "exchange.name",
    routingKey: "routing.key");

You can also send delayed messages.

await _queueService.SendAsync(
    @object: messageObject,
    exchangeName: "exchange.name",
    routingKey: "routing.key",
    millisecondsDelay: 10);

The mechanism of sending delayed messages described in the documentation. Dive into it for more detailed information.

Consumer

After making the message production possible let's make the consumption possible too! Imagine that a consumer is a simple console application.

class Program
{
    const string ExchangeName = "exchange.name";
    public static IConfiguration Configuration { get; set; }

    static void Main()
    {
        var builder = new ConfigurationBuilder()
            .SetBasePath(Directory.GetCurrentDirectory())
            .AddJsonFile("appsettings.json", optional: false, reloadOnChange: true);
        Configuration = builder.Build();

        var serviceCollection = new ServiceCollection();
        ConfigureServices(serviceCollection);

        var serviceProvider = serviceCollection.BuildServiceProvider();
        var queueService = serviceProvider.GetRequiredService<IQueueService>();
        queueService.StartConsuming();
    }

    static void ConfigureServices(IServiceCollection services)
    {
        var rabbitMqSection = Configuration.GetSection("RabbitMq");
        var exchangeSection = Configuration.GetSection("RabbitMqExchange");

        services.AddRabbitMqClient(rabbitMqSection)
            .AddConsumptionExchange("exchange.name", exchangeSection)
            .AddMessageHandlerSingleton<CustomMessageHandler>("routing.key");
    }
}

You have to configure everything almost the same way as you have already done previously with the producer. The main differences are that you need to declare (configure) a consumption exchange calling AddConsumptionExchange instead of a production exchange. For detailed information about difference in exchange declarations you may want to see the documentation. The other important part is adding custom message handlers by implementing the IMessageHandler interface and calling AddMessageHandlerSingleton<T> or AddMessageHandlerTransient<T> methods. IMessageHandler is a simple subscriber, which receives messages from a queue by selected routing key. You are allowed to set multiple message handlers for one routing key (e.g. one is writing it in a database, and the other does the business logic).

You can also use pattern matching while adding message handlers where * (star) can substitute for exactly one word and # (hash) can substitute for zero or more words. You are also allowed to specify the exact exchange which will be "listened" by the selected message handler with the given routing key (or a pattern).

services.AddRabbitMqClient(rabbitMqSection)
    .AddConsumptionExchange("exchange.name", exchangeSection)
    .AddMessageHandlerSingleton<CustomMessageHandler>("*.*.key")
    .AddMessageHandlerSingleton<AnotherCustomMessageHandler>("#", "exchange.name");

The very last step is to start "listening" (consuming) by simply calling StartConsuming method of IQueueService. After that you will start getting messages, and you can handle them in any way you want.

A message handler example.

public class CustomMessageHandler : IMessageHandler
{
    readonly ILogger<CustomMessageHandler> _logger;
    public CustomMessageHandler(ILogger<CustomMessageHandler> logger)
    {
        _logger = logger;
    }

    public void Handle(BasicDeliverEventArgs eventArgs, string matchingRoute)
    {
        // Do whatever you want with the message.
        _logger.LogInformation("Hello world");
    }
}

If you want to use an async magic then implement your custom IAsyncMessageHandler.

public class CustomAsyncMessageHandler : IAsyncMessageHandler
{
    readonly ILogger<CustomAsyncMessageHandler> _logger;
    public CustomAsyncMessageHandler(ILogger<CustomAsyncMessageHandler> logger)
    {
        _logger = logger;
    }

    public async Task Handle(BasicDeliverEventArgs eventArgs, string matchingRoute)
    {
       // await something.
    }
}

If you want to send messages from the Handle method inside your message handler then use INonCyclicMessageHandler. You can't inject IQueueService inside any message handlers, otherwise you will get a cyclic dependency exception. But INonCyclicMessageHandler allows you to avoid this as it accepts an instance of IQueueService as a parameter of the method Handle.

public class MyNonCyclicMessageHandler : INonCyclicMessageHandler
{
    // Inject anything you want except IQueueService.
    readonly ILogger<MyNonCyclicMessageHandler> _logger;
    public MyNonCyclicMessageHandler(ILogger<MyNonCyclicMessageHandler> logger)
    {
        _logger = logger;
    }

    public void Handle(BasicDeliverEventArgs eventArgs, string matchingRoute, IQueueService queueService)
    {
        // Send anything you want using IQueueService instance.
        var anotherMessage = new MyMessage { Foo = "Bar" };
        queueService.Send(anotherMessage, "exchange.name", "routing.key");
    }
}

INonCyclicMessageHandler has its asynchronous analogue IAsyncNonCyclicMessageHandler.

public class MyAsyncNonCyclicMessageHandler : IAsyncNonCyclicMessageHandler
{
    // Inject anything you want except IQueueService.
    readonly ILogger<MyAsyncNonCyclicMessageHandler> _logger;
    public MyAsyncNonCyclicMessageHandler(ILogger<MyAsyncNonCyclicMessageHandler> logger)
    {
        _logger = logger;
    }

    public async Task Handle(BasicDeliverEventArgs eventArgs, string matchingRoute, IQueueService queueService)
    {
        // Do async stuff.
        var anotherMessage = new MyMessage { Foo = "Bar" };
        await queueService.SendAsync(anotherMessage, "exchange.name", "routing.key");
    }
}

You can register any of those message handlers the same way you registered IMessageHandler before. There are similar extension methods for each type of message handlers. For more information, see the message-consuming documentation file.

You can also find example projects in the repository inside the examples directory.

Configuration

In both cases for producing and consuming messages the configuration file is the same. appsettings.json consists of those sections: (1) settings to connect to the RabbitMQ server and (2) sections that configure exchanges and queue bindings. You can have multiple exchanges and one configuration section per each exchange. Exchange sections define how to bind queues and exchanges with each other using specified routing keys (or patterns). You allowed to bind a queue to an exchange with more than one routing key, but if there are no routing keys in the queue section, then that queue will be bound to the exchange with its name.

{
  "RabbitMq": {
    "HostName": "127.0.0.1",
    "Port": "5672",
    "UserName": "guest",
    "Password": "guest"
  },
  "RabbitMqExchange": {
    "Type": "direct",
    "Durable": true,
    "AutoDelete": false,
    "DeadLetterExchange": "default.dlx.exchange",
    "RequeueFailedMessages": true,
    "Queues": [
      {
        "Name": "myqueue",
        "RoutingKeys": [ "routing.key" ]
      }
    ]
  }
}

For more information about appsettings.json and manual configuration features, see rabbit-configuration and exchange-configuration documentation files.

Batch message handlers

There are also a feature that you can use in case of necessity of handling messages in batches. First of all you have to create a class that inherits a BaseBatchMessageHandler class. You have to set up values for QueueName and PrefetchCount properties. These values are responsible for the queue that will be read by the message handler, and the size of batches of messages.

public class CustomBatchMessageHandler : BaseBatchMessageHandler
{
    readonly ILogger<CustomBatchMessageHandler> _logger;

    public CustomBatchMessageHandler(
        IRabbitMqConnectionFactory rabbitMqConnectionFactory,
        IEnumerable<BatchConsumerConnectionOptions> batchConsumerConnectionOptions,
        ILogger<CustomBatchMessageHandler> logger)
        : base(rabbitMqConnectionFactory, batchConsumerConnectionOptions, logger)
    {
        _logger = logger;
    }

    public override ushort PrefetchCount { get; set; } = 50;

    public override string QueueName { get; set; } = "queue.name";

    public override TimeSpan? MessageHandlingPeriod { get; set; } = TimeSpan.FromMilliseconds(500);

    public override Task HandleMessages(IEnumerable<BasicDeliverEventArgs> messages, CancellationToken cancellationToken)
    {
        _logger.LogInformation("Handling a batch of messages.");
        foreach (var message in messages)
        {
            _logger.LogInformation(message.GetMessage());
        }
        return Task.CompletedTask;
    }
}

After all you have to register that batch message handler via DI.

services.AddBatchMessageHandler<CustomBatchMessageHandler>(Configuration.GetSection("RabbitMq"));

The message handler will create a separate connection and use it for reading messages. When the message collection is full to the size of PrefetchCount they are passed to the HandleMessage method. You can also set a MessageHandlingPeriod property value and the method HandleMessage will be executed repeatedly so messages in unfilled batches could be processed too. For more information, see the message-consuming documentation file.

Advanced usage and nuances

RabbitMQ client implemented in this library (class which implements IQueueService) opens two connections to the RabbitMQ server. One connection is used for message production, and the other one is for message consumption. This behavior covered in the advanced usage documentation file, dive into it deeply if you want to control the client behavior tighter.

There is also an example project that demonstrates an advanced usage of the RabbitMQ client.

Changelog

All notable changes covered in the changelog file.

License

This library licensed under the MIT license.

Feel free to contribute!