antonyvorontsov / RabbitMQ.Client.Core.DependencyInjection

.Net Core library-wrapper of RabbitMQ.Client for Dependency Injection.
MIT License
111 stars 36 forks source link

consuming but unable to produce #15

Closed TomKirby closed 4 years ago

TomKirby commented 4 years ago

Hi,

Thanks for this library, its proving very useful and yielding very clean code!!

I am however stuggling to get the implimentation correct where I need to consume from one queue, and publish messages to another exchange.

It seems to be consuming fine, and i am seeing the output of the consumed message from the FileCreationQueue which is bound to the mbm-FileCreation exchange on the console writeline output. but I can not see the newly published message on the mbm-DuplicateFile exchange when i check RabbitMQ's admin page.

I have the following code:

startup.cs

        public void ConfigureServices(IServiceCollection services)
        {
            var clientConfiguration = Configuration.GetSection("RabbitMq");
            var FileCreationExchangeConfiguration = Configuration.GetSection("FileCreationExchange");
            var DuplicateFileExchangeConfiguration = Configuration.GetSection("DuplicateFileExchange");

            services.AddRabbitMqClient(clientConfiguration)
                    .AddConsumptionExchange("mbm-FileCreation", FileCreationExchangeConfiguration)
                    .AddProductionExchange("mbm-DuplicateFile", DuplicateFileExchangeConfiguration)
                    .AddNonCyclicMessageHandlerTransient<NewFileCreationHandler>(String.Empty);

            services.AddHostedService<FileCreationConsumerService>();
}

FileCreationConsumerService.cs

    public class FileCreationConsumerService : BackgroundService
    {
        private readonly IQueueService _queueService;
        public FileCreationConsumerService(IQueueService queueService)
        {
            _queueService = queueService;

        }
        protected override Task ExecuteAsync(CancellationToken stoppingToken)
        {
            _queueService.StartConsuming();
            return Task.CompletedTask;
        }
    }

NewFileCreationHandler.cs

    public class NewFileCreationHandler : INonCyclicMessageHandler
    {
        public void Handle(string message, string routingKey, IQueueService queueService)
        {
            Console.WriteLine(message);

            queueService.Send("new Message", "mbm-DuplicateFile", String.Empty);
        }
    }

appsettings.json

"AllowedHosts": "*",
  "RabbitMq": {
    "HostName": "127.0.0.1",
    "Port": "5672",
    "UserName": "guest",
    "Password": "guest",
    "ClientProvidedName": "Custom connection name",
    "VirtualHost": "/",
    "AutomaticRecoveryEnabled": true,
    "TopologyRecoveryEnabled": true,
    "RequestedConnectionTimeout": 60000,
    "RequestedHeartbeat": 60
  },
  "FileCreationExchange": {
    "Type": "fanout",
    "Durable": false,
    "Queues": [
      {
        "Name": "FileCreationQueue",
      }
    ]
  },
  "DuplicateFileExchange": {
    "Type": "direct",
    "Queues": [
      {
        "Name": "DuplicateFileQueue"
      }
    ]
  }
TomKirby commented 4 years ago

I have extended my Handler with the following code:

        public void Handle(string message, string routingKey, IQueueService queueService)
        {
            Console.WriteLine(message);

            try
            {
                queueService.Send("new Message", "mbm-DuplicateFile", "mbm-DuplicateFile");
            }
            catch (Exception e)
            {
                Console.WriteLine(e);
                throw;
            }
            Console.WriteLine("Published The MEssage!");
        }

And i the line Console.WriteLine("Published The MEssage!"); is NOT excecuting. what made me think an exception was occuring in the publish method, but the Catch doesn't catch any sort of exception..

I also tried using the exchange name as the routing key, as previously i was not providing any... this did not resolve the problem either.

antonyvorontsov commented 4 years ago

Hello Tom!

I am sorry for the late reply. And also thank you for that question.

It seems to be consuming fine, and i am seeing the output of the consumed message from the FileCreationQueue which is bound to the mbm-FileCreation exchange on the console writeline output.

First of all I have to clarify that. What you have been seeing for all messages you published is not a Console.WriteLine output, but a ILogger<T> result. When QueueService receives a message it logs that event as well as message itself:

_logger.LogInformation($"New message was received with deliveryTag {@event.DeliveryTag}.");
_logger.LogInformation(message);

You can have a closer look into the source code if, of course, it will be interesting for you.

As for your code - it is almost correct, but we need to decorate it a little.

First of all lets start with the configuration file. The key part is binding queues and exchanges.

"FileCreationExchange": {
    "Type": "fanout",
    "Durable": false,
    "Queues": [
      {
        "Name": "FileCreationQueue",
      }
    ]
  }

In this section you bind exchange "FileCreationExchange" (which will be named "mbm-FileCreation" afterwards and we have to use this name in the code) with the queue "FileCreationQueue" without setting routing key manually. So the queue "FileCreationQueue" will be bound with the exchange "mbm-FileCreation" using its name as a routing key "FileCreationQueue". This is important because message handlers "listen" queues also by routing keys.

 "DuplicateFileExchange": {
    "Type": "direct",
    "Queues": [
      {
        "Name": "DuplicateFileQueue"
      }
    ]
  }

This section works the same way. You bind the "DuplicateFileQueue" queue by the routing key "DuplicateFileQueue" (its name) to the exchange "mbm-DuplicateFile".

Great! Everything is set up. Let's consume now!

To consume a message from (from a "consumption" which is good to go already) you have to register a message handler which will be getting messages by the specified routing key. You were wrong only here:

.AddNonCyclicMessageHandlerTransient<NewFileCreationHandler>(String.Empty)

Messages come from queues with routing key by which queues bound to exchanges. In your case it has to be like this:

.AddNonCyclicMessageHandlerTransient<NewFileCreationHandler>("FileCreationQueue");

So you can receive all messages by that routing key.

You can also set a collection of routing keys in case you want to handle different messages by one message handler

.AddNonCyclicMessageHandlerTransient<NewFileCreationHandler>(new[] { "FileCreationQueue" /*Other routing keys go here*/});

And what about production in the another exchange? Yeah, we need a small change here too. So your code is

public class NewFileCreationHandler : INonCyclicMessageHandler
    {
        public void Handle(string message, string routingKey, IQueueService queueService)
        {
            Console.WriteLine(message);

            queueService.Send("new Message", "mbm-DuplicateFile", String.Empty);
        }
    }

When NewFileCreationHandler receive a message it will handle it and send another message to the exchange "mbm-DuplicateFile" with an empty routing key. But as we configured no queues bound to the "mbm-DuplicateFile" with an empty routing key. So you have to specify which key to use (and we know the exact key as we bound queues ourselves).

public class NewFileCreationHandler : INonCyclicMessageHandler
    {
        public void Handle(string message, string routingKey, IQueueService queueService)
        {
            Console.WriteLine($"NewFileCreationHandler got a message {message}");

            var anotherExchange = "mbm-DuplicateFile";
            var anotherRoutingKey = "DuplicateFileQueue";
            queueService.Send("new Message", anotherExchange, anotherRoutingKey);
        }
    }

Try it out and please say if it is working or not!

Best regards, Antony

antonyvorontsov commented 4 years ago

I can also advise you to use on an official "naming convention" under which each routing keys is a set of words separated by a dot. So the most appropriate routing key for DuplicateFileQueue will be duplicate.file or file.duplicate.

To use this with this library you will have to set in in the configuration section

"DuplicateFileExchange": {
    "Type": "direct",
    "Queues": [
      {
        "Name": "DuplicateFileQueue",
        "RoutingKeys": [ "file.duplicate" ]
      }
    ]
  }

This also can help you avoiding situations like this when names are little bit confusing.

Best regards, Antony

TomKirby commented 4 years ago

Hi,

Still don't seem to be able to get it to work... I might be missing somthing very simple...

Here is a gist of my files: https://gist.github.com/TomKirby/ff62bbfbfd637a5c9c6b4b1643abaee8

TomKirby commented 4 years ago

I have been thinking, and I’m not sure if the handler is even being called. I remember putting a break point on it and it never hit the breakpoint..

I am away from my code at the moment but will have a bit more of an experiment latter today and see..

TomKirby commented 4 years ago

Right, I have finally solved the problem, it appears my current way of writing onto the Exchange wasn't providing a routing key, so the handler was never being called.

This was a red herring, as i was seeing a log output into the console, but removing any refrence of the handler caused the log output to still appear, showing it clearly isn't being called.

The moment i started loading messages onto the queue with the correct routing key, everything fell into place!

Thanks for your help!

antonyvorontsov commented 4 years ago

Hi, Tom!

I am sorry that I did not answer you in time. I have just returned from my short vacation so I was a little bit off my computer.

I am really happy that you managed to set everything up!

Feel free to open another issue if you will face other difficulties in the future.

Best regards, Antony