pardahlman / RawRabbit

A modern .NET framework for communication over RabbitMq
MIT License
746 stars 144 forks source link

RawRabbit Subscriber does not work in ASP.NET Core #363

Closed shahabganji closed 6 years ago

shahabganji commented 6 years ago

I created a thread in Stackoverflow, no one answered though, I ask it here.

I am creating a RestApi in asp.net core, and in one of my services, I publish a message to RabbitMQ using RawRabbit. That being said, I see the message is published in the RabbitMQ control panel when I have commented the subscriber part, and the number of consumers is 0, when I add the subscriber part number of consumers become 1 and the messages are getting consumed so there is no message in the control panel, BUT what is weird is that none of the codes ( in this case it's just a log ) in the subscriber does not run.

Publisher part:

public async void RaiseAsync(string event_name, ShoppingCartItemAdded data) {
  const string EXCHANGE_NAME = "myRabbit";
  Action<IPublishContext> x = (ctx) => ctx
    .UsePublishConfiguration(xfg => xfg.OnExchange(EXCHANGE_NAME));
    //.WithRoutingKey("shoppingcartitemadded"));
  await this.Client.PublishAsync<ShoppingCartItemAdded>(data, x );
}

Subscriber part:

public class ShoppingCartItemAddedConsumer
{
  private readonly ILogger<ShoppingCartItemAddedConsumer> logger;
  private readonly IBusClient client;

  public ShoppingCartItemAddedConsumer(ILogger<ShoppingCartItemAddedConsumer> logger, IBusClient client)
  {
      this.logger = logger;
      this.client = client;
      this.logger.LogInformation("Subscriber created");
  }

  public async void Run()
  {
    const string QUEUE_NAME = "myWebApi";
    const string EXCHANGE_NAME = "myRabbit";

    this.logger.LogInformation("Registering subscriber");

    await client.SubscribeAsync<ShoppingCartItemAdded>(async msg =>
    {
      this.logger.LogInformation("Message received from rabbitmq : {message}", msg);
    }, ctx => ctx
      .UseSubscribeConfiguration(cfg => cfg
        .OnDeclaredExchange(dex => dex
          .WithName(EXCHANGE_NAME)
          .WithAutoDelete(false)
          .WithDurability(true)
          .WithType(ExchangeType.Topic))
      .FromDeclaredQueue(dq => dq
          .WithName(QUEUE_NAME)
          .WithExclusivity(false)
          .WithDurability(true)
          .WithAutoDelete(false)))
      );

      this.logger.LogInformation("Subscriber registered");
    }
}```

And I registered the subscriber as `singleton` so later I can do: 

```cs
var consumer = app.ApplicationServices.GetRequiredService<ShoppingCartItemAddedConsumer>();
consumer.Run();

and this is the log at Startup of the services:

    [09:02:25 INF] Subscriber created
    [09:02:26 INF] Registering subscriber
    [09:02:26 INF] Configuration action for shoppingcartitemadded found.
    [09:02:26 INF] Declaring queue myWebApi.
    [09:02:26 INF] Declaring exchange myRabbit.
    [09:02:26 INF] Binding queue myWebApi to exchange myRabbit with routing key shoppingcartitemadded
    [09:02:26 INF] Preparing to consume message from queue 'myWebApi'.
    [09:02:26 INF] Subscriber registered

and the following is the log when publishing a message:

    [14:13:35 INF] Setting 'Publish Acknowledge' for channel '3'
    [14:13:35 INF] Setting up publish acknowledgement for 1 with timeout 0:00:01
    [14:13:35 INF] Sequence 1 added to dictionary
    [14:13:35 WRN] No body found in the Pipe context.
    [14:13:35 INF] Performing basic publish with routing key shoppingcartitemadded on exchange myRabbitAPI.
    [14:13:35 INF] Setting up publish acknowledgement for 2 with timeout 0:00:01
    [14:13:35 INF] Sequence 2 added to dictionary
    [14:13:35 WRN] No body found in the Pipe context.
    [14:13:35 INF] Performing basic publish with routing key shoppingcartitemadded on exchange myRabbitAPI.
    [14:13:35 INF] Executed action method HelloMicroServices.Controllers.ShoppingCartController.Post (HelloMicroServices), returned result Microsoft.AspNetCore.Mvc.OkObjectResult in 304.6292ms.
    [14:13:35 INF] Executing ObjectResult, writing value of type 'HelloMicroServices.Datastores.Models.ShoppingCart'.
    [14:13:35 INF] Executed action     HelloMicroServices.Controllers.ShoppingCartController.Post (HelloMicroServices) in 414.4309ms
    [14:13:35 INF] Request finished in 475.1868ms 200 application/json; charset=utf-8
    [14:13:35 INF] Recieived ack for 1
    [14:13:35 INF] Recieived ack for 2

The RabbitMQ control panel when the subscriber part is commented:

rmq

when uncommenting, the value in front of Consumer is 1

pardahlman commented 6 years ago

Top of the morning, @shahabganji ☕️

Most of your configuration looks good (even though you should be able to simplify it, as you are specifying the default values for Durable and Exclusive).

Looking through the log file of the publisher I see

[14:13:35 WRN] No body found in the Pipe context.

Which makes me believe that the data you are publishing (that is, the ShoppingCartItemAdded event) is null. It is possible that the subscriber crashes or simply skips calling the subscribe method when the value isn't set. Can you check for null before posting the message on the bus and report back?

If this is indeed the case, I think that we should fix it so that the user defined subscribe method is invoked will null/default if no body is published.

shahabganji commented 6 years ago

@pardahlman,

I checked the data when publishing:

data

consumer

while published data on c# part is not null, the payload is 0 bytes!!! next image ( I have commented the subscriber part to be able to see messages on the panel )

payload

I can upload the source code if required.

shahabganji commented 6 years ago

(even though you should be able to simplify it, as you are specifying the default values for Durable and Exclusive).

Yes, I know that :pray: . Since I am new to the message brokers I was just playing around with the configs :smile: . and one more question: Is it possible to publish to a queue directly and not through an exchange?

RedOnePrime commented 6 years ago

If you wanted to upload your project I could probably take a look in short order. However on the surface the only thing I see is that in your publishing you are using OnExchange while we using OnDeclaredExchange. Not sure on the impact of using one or the other. Also we use a custom serializer but this was added later in order to allow the similar class structures defined in different projects to deserialize when being sent.

It does, however, look like your issue is configuration or serialization though as it is publishing 0 bytes. That explains the WRN you get on the subscription side.

return RawRabbitFactory.CreateSingleton(new RawRabbitOptions { ... DependencyInjection = di => di.AddSingleton<ISerializer, CustomSerializer>(); });

public class CustomSerializer : RawRabbit.Serialization.JsonSerializer { public CustomSerializer() : base(new JsonSerializer { TypeNameAssemblyFormatHandling = TypeNameAssemblyFormatHandling.Simple, Formatting = Formatting.None, CheckAdditionalContent = true, ContractResolver = new CamelCasePropertyNamesContractResolver(), ObjectCreationHandling = ObjectCreationHandling.Auto, DefaultValueHandling = DefaultValueHandling.Ignore, TypeNameHandling = TypeNameHandling.Auto, ReferenceLoopHandling = ReferenceLoopHandling.Serialize, MissingMemberHandling = MissingMemberHandling.Ignore, PreserveReferencesHandling = PreserveReferencesHandling.Objects, NullValueHandling = NullValueHandling.Ignore }) { } }

Hope that helps,

-Red

shahabganji commented 6 years ago

@RedOnePrime

I have used OnDeclaredExchange likethe following, I got 0 bytes though.

Action<IPublishContext> x = 
                (ctx) => ctx.UsePublishConfiguration(
                                xfg => xfg.OnDeclaredExchange( 
                                    exchange => exchange.WithName( Constants.EXCHANGE_NAME).WithAutoDelete(false).WithDurability(true) ) ); 

await this.Client.PublishAsync<ShoppingCartItemAdded>(data, x );

I''ll check the deserializer you mentioned above, and if failed I will create a repository.

BR, -Shahab

shahabganji commented 6 years ago

I have the same sample on console all the codes are the same except that in that sample my subscriber abd publisher are on the same IBusClient object, however, in ASP.Net I give it up to the DI engine. the console app works just as it should you can see a sample code here.

If anyone could help, I have created a repo here. The publisher part is here and the subscriber is within an IHostedService calling this method

@RedOnePrime, I used OnDeclaredExchange and a deserializer

:weary:

Thanks guys

pardahlman commented 6 years ago

I think @RedOnePrime makes a valid point, it could be the serializer (even though it looks like you only override the json configuration 🤷‍♂️ ). Could you try using the default serializer and see if you get a message body?

I'm still traveling, so I havn't got the possibility to dig deeper into your issue at this point. If I were you, I would start removing custom configuration down to a point where it works and then try to re-add it one step at the time.

Is it possible to publish to a queue directly and not through an exchange?

No that's not possible for RabbitMQ.

shahabganji commented 6 years ago

@pardahlman

I changed the code and pushed the new one to the repository:

  1. changed the serializer to default

    di.AddSingleton<ISerializer, RawRabbit.Serialization.JsonSerializer>()
  2. Changed the subscriber and publisher to use default config fro exchange and queue names

All to no avail.

RedOnePrime commented 6 years ago

@shahabganji

Here is the solution I found. I am not sure the cause, but how the IBusClient is created is making a difference. My guess is that the pipeline is different. We ran into this as well and switched to creating our IBusClient instance(s) via the factory instead of dependency injection.

         this.Client = RawRabbitFactory.CreateSingleton(new RawRabbitOptions()
            {
                ClientConfiguration = new RawRabbitConfiguration()
                {
                    Hostnames = new System.Collections.Generic.List<string>() { "localhost"},
                    Password = "guest",
                    VirtualHost = "/",
                    Port = 5672,
                    Username = "guest"

                }
            });

When I changed your publisher in the event store to the above code it started publishing the object correctly to rabbit. I used the rabbit UI GetMessage feature to show it was publishing the json. I wouldn't recommend putting this in the ctor for production code obviously, since that couples object construction with I/O.

Anyway, I had to shut the project down to use the GetMessage feature in the UI because there was a deadlock with the subscriber in that the messages were left in an unacked state. This means that the subscriber was being sent them but your method was not being raised; it was holding onto the message.

From there I went ahead and also updated your subscriber code to construct the IBusClient the same way and viola, works just fine.

@pardahlman I am not sure if that helps you at all with what the difference could be between using DI to get the IBusClient versus using the factory, but there appears to be something different that causes the behavior. He doesn't seem to be configuring anything out of the ordinary for DI.

            RawRabbitConfiguration config = new RawRabbitConfiguration();
            this.Configuration.Bind("RabbitMQ", config);

            services.AddRawRabbit(new RawRabbitOptions() {
                ClientConfiguration = config,
                    DependencyInjection = di => di.AddSingleton<ISerializer, RawRabbit.Serialization.JsonSerializer>()
            });

-Red

shahabganji commented 6 years ago

@RedOnePrime, @pardahlman

I have the same sample on console all the codes are the same except that in that sample my subscriber and publisher are on the same IBusClient object, however, in ASP.Net I give it up to the DI engine.

That :point_up: , and what you have done, manually creating the IBusClient rather than using DI, I become suspicious to my third-party IOC, in this case Grace. So I commented the grace parts and used the default IOC of asp.net core, and everything works just fine. Hence I close the issue. Thanks guys.

RedOnePrime commented 6 years ago

@shahabganji Glad it is working for you now! I know this issue is closed now, but I also wanted to add to those that might find this later. You referenced using Grace as IoC and suspect that might be the issue as it works fine with default asp.net core IoC. Well, we use Autofac ourselves and I have never tested ours using the default IoC. So perhaps that was our issue all along as well; something inside custom IoC frameworks.

-Red

shahabganji commented 6 years ago

Well, Thanks for your help @RedOnePrime, I opened an issue on Grace repository to see what's going wrong, we would be grateful if you could also participate in answering the questions relating to the RabbitMQ as the author of the Grace is not familiar with RawRabbit.

here is the issue.

shahabganji commented 6 years ago

A solution to work with RawRabbit using Grace IOC.

shahabganji commented 6 years ago

@RedOnePrime

Well, we use Autofac ourselves and I have never tested ours using the default IoC. So perhaps that was our issue all along as well; something inside custom IoC frameworks.

Just one thing pops in mind as a question, by that :point_up: you mean internally in RawRabbit or when using RawRabbit, just like my situation?

RedOnePrime commented 5 years ago

@RedOnePrime

Well, we use Autofac ourselves and I have never tested ours using the default IoC. So perhaps that was our issue all along as well; something inside custom IoC frameworks.

Just one thing pops in mind as a question, by that ☝️ you mean internally in RawRabbit or when using RawRabbit, just like my situation?

Internally in RawRabbit, something is going awry when using custom IoC frameworks.

kgrosvenor commented 5 years ago

Working really well here, i used vNext in a NET Core console application just fine... my Program.cs looks like this


class Program
    {
        private static ServiceProvider serviceProvider;
        private static IConfiguration _config;

        static void Main(string[] args)
        {

            var environmentName = Environment.GetEnvironmentVariable("ASPNETCORE_ENVIRONMENT");
            if(string.IsNullOrEmpty(environmentName)) {
                //default local environment to development
                environmentName="development";
            }

            Console.WriteLine("ASPNETCORE_ENVIRONMENT: " + environmentName);

            var builder = new ConfigurationBuilder()
            .AddJsonFile($"appsettings.{environmentName}.json", true, true)
            .AddEnvironmentVariables();
            _config = builder.Build();

            var serviceCollection = new ServiceCollection();
            serviceProvider = ConfigureServices(serviceCollection, _config).BuildServiceProvider();

            //do the actual work here
            var glucoseCreatedListener = serviceProvider.GetService<IGlucoseCreatedListener>();
            glucoseCreatedListener.Work();

            var glucoseUpdatedListener = serviceProvider.GetService<IGlucoseUpdatedListener>();
            glucoseUpdatedListener.Work();

        }

        private static IServiceCollection ConfigureServices(IServiceCollection services, IConfiguration config)
        {
            services.AddRawRabbit(
             cfg => cfg.AddJsonFile("rawrabbit.json"),
             ioc => ioc.AddSingleton<ILoggerFactory, LoggerFactory>()
            );
            services.AddSingleton<IConfiguration>(config);

            services.AddSingleton<IGlucoseCreatedListener, GlucoseCreatedListener>();
            services.AddSingleton<IGlucoseUpdatedListener, GlucoseUpdatedListener>();

            return services;
        }
    }