antonyvorontsov / RabbitMQ.Client.Core.DependencyInjection

.Net Core library-wrapper of RabbitMQ.Client for Dependency Injection.
MIT License
111 stars 36 forks source link

How to purge or delete then recreate a queue #29

Closed jerpic closed 4 years ago

jerpic commented 4 years ago

Hello Antony,

Hope you are well.

I'm coming with you with the hope that you can help me with my problem.

In my process, I have to purge a queue, I'm able to do that with this command :

var nbMessagePurged = _queueService.Channel.QueuePurge("RequestQueue");

The command purges all the message with the status "Ready" but not the other ones like "Unacked". So as a workaround, I deleted the queue with the command :

var nbMessagePurged = _queueService.Channel.QueueDelete("RequestQueue", false, false);

This is a radical method, that works but unfortunately I have to recreate the queue....everything is possible but I have the impression that I go to far from the usage of your API.

Do you have a suggestion/idea for this issue that can help me?

Best Regards, Jérôme.

antonyvorontsov commented 4 years ago

Hi, Jérôme!

I believe that the question you asked goes far beyond that project. You can purge only "Ready" messages, but not the "Unacked". If a message is "Unacked" then it is being processed by a message handler. So either you have to wait until it will process a message and send an acknowledge.

The only way I can help you is to add something that can shut down message consumption in the next library version. I think this can be done with a method which is opposite to the StartConsuming method (let's name it StopConsuming 😄 ). But even tho after calling StopConsuming you will have to wait until all unacked messages are processed. I do not know what your message handlers do and how long do they take to process a message, so the code

_queueService. StopConsuming();
await Task.Delay(2000); // Some amount of time.
var nbMessagePurged = _queueService.Channel.QueuePurge("RequestQueue");
_queueService.StartConsuming();

looks dubiously. So you won't have a guarantee of complete purging messages from queue.

So my advise for you is to think twice why do you even need to purge messages and what other ways you can use in your services from architectural (conceptual) point of view.

But still, I can add StopConsuming, it is easy-to-implement thing. The choice is yours.

Best regards, Antony

jerpic commented 4 years ago

Hi Antony,

Many thanks for answering me! and many thanks for your advice! You are right I have to think twice ;-)

I reviewed my process but I don't realy know how to implement it... Here is the new process :

I have 1 producer and many consumers (the number of consumers can change in time, it depends of the CPU/Memory consumption of each consumers, everything will run with Kubernetes).

To stop the process, the producer will send a message through a specific exchange (CancellationExchange), the consumers will intercept the message and stop their process.

I know that I have to use for the CancellationExchange the type "Fanout", but I don't know how to configure the consumers .... each consumer have to listen to a dedicated queue? But as I explained, I don't know in advance the number of consumers.

Do you have an idea/example about the usage of a "fanout" exchange with your API?

In advance, many thanks for your precious help.

Best Regards, Jérôme.

jerpic commented 4 years ago

Hi Antony,

This message just to tell you that with a "Manual configuration" I'm able to create queues on the fly.

Best Regards, Jérôme.

antonyvorontsov commented 4 years ago

Hi, Jérôme.

I am sorry for the late reply, I was away from my computer for a while. As for your question, sure you can make a topic exchange. If I understand your case right code will look like this.

Let's say that you have an application, which sends messages in the topic queue and does no more.

public class Startup
{
    public static IConfiguration Configuration;

    public Startup(IConfiguration configuration)
    {
        Configuration = configuration;
    }

    public void ConfigureServices(IServiceCollection services)
    {
        var clientConfiguration = Configuration.GetSection("RabbitMq");
        var exchangeConfiguration = Configuration.GetSection("TopicExchange");

       // We are adding only topic exchange for the message production.
        services.AddRabbitMqClient(clientConfiguration)
            .AddProductionExchange("TopicExchange", exchangeConfiguration);
    }

    public void Configure(IApplicationBuilder app, IHostingEnvironment env)
    {
          // Other code.
    }
}

Let's imagine, that you have for "listeners" (independent applications). The configuration file for the topic exchange is:

{
 "TopicExchange": {
    "Type": "topic",
    "Durable": true,
    "AutoDelete": false,
    "Queues": [
     { "Name": "first.queue" },
     { "Name": "second.queue" },
     { "Name": "third.queue" },
     { "Name": "fourth.queue" }
    ]
  }
}

The key part is to set "type" as "topic". Also be aware that Queues in that example configured in a certain way. Routing keys are ignored and queue names will be used to bind a queue to the topic exchange. Four "listeners" equal to four queues. The idea is to make each consumer to "listen" different queues suited for each of them.

So your producer publishes a message to the topic exchange. A routing key does not really matter.

queueService.Send(message, exchangeName: "TopicExchange", routingKey: "does.not.matter");

And that message gets into each queue bound to that exchange.

Your first consumer will "listen" for messages from the "first" queue (and so on). We will have to use a queue name instead of a routing key (remember the way they have been bound) or a pattern (e.g. # to handle any message from the specified exchange)

public class Startup
{
    public static IConfiguration Configuration;

    public Startup(IConfiguration configuration)
    {
        Configuration = configuration;
    }

    public void ConfigureServices(IServiceCollection services)
    {
        var clientConfiguration = Configuration.GetSection("RabbitMq");
        var exchangeConfiguration = Configuration.GetSection("TopicExchange");

       // We are adding only topic exchange for the message production.
        services.AddRabbitMqClient(clientConfiguration)
            .AddConsumptionExchange("TopicExchange", exchangeConfiguration)
            .AddMessageHandlerSingleton<MessageHandler>("first.queue", "TopicExchange");
            // Or use a pattern
            //.AddMessageHandlerSingleton<MessageHandler>("#", "TopicExchange");
    }

    public void Configure(IApplicationBuilder app, IHostingEnvironment env)
    {
          // Other code.
    }
}

The appsettings.json file will be the same. And the MessageHandler will do its dirty job of shutting down itself (the consumer).

Best regards, Antony

antonyvorontsov commented 4 years ago

But there is no difference between appsettings.json and manual configuration since you managed to get everything done and working.

If your question is settled, then feel free to close this issue.