pardahlman / RawRabbit

A modern .NET framework for communication over RabbitMq
MIT License
747 stars 144 forks source link

Publishing inside the context of a subscriber #138

Closed jayrulez closed 8 years ago

jayrulez commented 8 years ago

Can I publish another message while in the context of a subscriber?

I'm trying this and I notice the process hanging and dying at this point.

jayrulez commented 8 years ago

For reference - this commit introduces the issue I'm having in my playground project:

https://github.com/jayrulez/Hutch/commit/2149a9436020a126f3d11d996930c28a98794ca0

pardahlman commented 8 years ago

Hi @jayrulez, There is nothing stopping you publishing a message in the context of a subscribe message handler, in fact it is done is a lot of the integration tests for RawRabbit. Looking at the commit you mentioned, I see that you are publishing a dynamic object, which can sometimes be problematic, especially if the consumer is is in another assembly than the the publisher (see this issue over at Json.Net). It can be other things as well, but it is difficult to figure out without seeing the full stack trace.

jayrulez commented 8 years ago

I used the dynamic object for brevity.

The same thing happens if I use a POCO.

See: https://github.com/jayrulez/Hutch/commit/c19ba817e9e2ff780d40f9d490803f52c2bba124

I am not getting a stack trace either.

I get this: Exception thrown: 'System.InvalidOperationException' in System.Private.CoreLib.ni.dll

Then the process dies.

Any ideas @pardahlman ?

I'm thinking this may not be rawrabbit specific but how I set up the subscribers and publishers.

pardahlman commented 8 years ago

Just a few quick follow up questions:

To trouble shoot further, I think a stack trace or memory dump would be preferable. You could try to:

jayrulez commented 8 years ago

So to confirm, no stack traces.

Not sure when InvalidOperationException came into play. I was playing around with the calls. May have happened then without me paying attention.

StackOverflowException is what I'm getting though.

The issue occurs even without the configuration.

It also occurs if I await the task:

See the output from the console: info: BaseBusClient1[0] Subscribing to message 'EmailMessage' on exchange 'hutch.controllers' with routing key emailmessage. dbug: TopologyProvider[0] Start processing topology work. info: TopologyProvider[0] Declaring queue 'emailmessage_hutch'. dbug: ThreadBasedChannelFactory[0] Existing connection is open and will be used. info: TopologyProvider[0] Declaring exchange 'hutch.controllers'. info: TopologyProvider[0] Binding queue 'emailmessage_hutch' to exchange 'hutch.controllers' with routing key 'emailmessage.#' dbug: TopologyProvider[0] Done processing topology work. dbug: ThreadBasedChannelFactory[0] Existing connection is open and will be used. dbug: EventingBasicConsumerFactory[0] Setting QoS Prefetch Size: 0 Prefetch Count: 50 global: false dbug: Subscriber1[0] Setting up a consumer on channel '2' for queue emailmessage with NoAck set to False. info: BaseBusClient1[0] Subscribing to message 'EmailMessage' on exchange 'hutch.controllers' with routing key emailmessage. dbug: TopologyProvider[0] Start processing topology work. info: TopologyProvider[0] Declaring queue 'emailmessage_hutch_1'. info: TopologyProvider[0] Binding queue 'emailmessage_hutch_1' to exchange 'hutch.controllers' with routing key 'emailmessage.#' dbug: TopologyProvider[0] Done processing topology work. dbug: ThreadBasedChannelFactory[0] Existing connection is open and will be used. dbug: EventingBasicConsumerFactory[0] Setting QoS Prefetch Size: 0 Prefetch Count: 50 global: false dbug: Subscriber1[0] Setting up a consumer on channel '3' for queue emailmessage with NoAck set to False. Hosting environment: Development Content root path: C:\Users\Robert\Desktop\Hutch Now listening on: http://localhost:5000 Application started. Press Ctrl+C to shut down. info: TopologyProvider[0] Disposing topology channel (if exists). info: Microsoft.AspNetCore.Hosting.Internal.WebHost[1] Request starting HTTP/1.1 GET http://localhost:5000/ info: Microsoft.AspNetCore.Mvc.Internal.ControllerActionInvoker[1] Executing action method Hutch.Controllers.TestController.GetAsync (Hutch) with arguments ((null)) - ModelState is Valid dbug: BaseBusClient1[0] Publishing message 'EmailMessage' on exchange 'hutch.controllers' with routing key emailmessage. dbug: ThreadBasedChannelFactory[0] Begining to process 'GetChannel' requests. dbug: ThreadBasedChannelFactory[0] Existing connection is open and will be used. info: ThreadBasedChannelFactory[0] Channel '1' has been created. dbug: ThreadBasedChannelFactory[0] 'GetChannel' has been processed. info: PublishAcknowledger[0] Setting 'Publish Acknowledge' for channel '1' info: EventingBasicConsumerFactory[0] Message recived: MessageId: 170b45ba-0dec-4594-84ef-37c5eeba4d9a info: EventingBasicConsumerFactory[0] Message recived: MessageId: 170b45ba-0dec-4594-84ef-37c5eeba4d9a info: PublishAcknowledger[0] Recieved ack for 1/1 with multiple set to 'False' dbug: PublishAcknowledger[0] Disposed ack timer for 1/1 dbug: Hutch.Controllers.TestController[0] Published email message. info: Microsoft.AspNetCore.Mvc.StatusCodeResult[1] Executing HttpStatusCodeResult, setting HTTP status code 200 info: Microsoft.AspNetCore.Mvc.Internal.ControllerActionInvoker[2] Executed action Hutch.Controllers.TestController.GetAsync (Hutch) in 577.4101ms info: Hutch.Services.EmailLogger[0] Logging 'Message body.' for 'test@gmail.com'. info: Hutch.Services.EmailSender[0] Sending 'Message body.' to 'test@gmail.com'. dbug: BaseBusClient`1[0] Publishing message 'EmailMessage' on exchange 'hutch.controllers' with routing key emailmessage. dbug: EventingBasicConsumerFactory[0] Ack:ing message with id 1. info: Microsoft.AspNetCore.Hosting.Internal.WebHost[2] Request finished in 1243.0988ms 200 dbug: PublishAcknowledger[0] Successfully confirmed publish 1/1

Process is terminated due to StackOverflowException. ` https://github.com/jayrulez/Hutch/commit/3e148309cc2bc10cf999c95d9640e08c69e36448

jayrulez commented 8 years ago

So I realized that this is essentially what I am doing:

    public async Task<IActionResult> GetAsync()
    {
        _busClient.SubscribeAsync<EmailMessage>(async (message, context) => {

            _logger.LogInformation(JsonConvert.SerializeObject(message));

            await _busClient.PublishAsync<P>(new Controllers.P() { X= "test2@gmail.com", Y = "Message body2." });
        });

        await _busClient.PublishAsync(new EmailMessage() { To = "test@gmail.com", Body = "Message body." });

        _logger.LogDebug("Published email message.");

        return Ok();
    }

which will recurse indefinitely since the same instance is publishing and subscribing.

Is this correct?

reisenberger commented 8 years ago

Presumably watching the message rates in the RabbitMQ management API could confirm whether it was recursing indefinitely?

pardahlman commented 8 years ago

I haven't had the time to look through your entire solution, but based on your latest comment, it sounds like you've found the problem :)

I would advice against setting up a subscriber within a Controller's request handler, as this will result in a new consumer for each call.

jayrulez commented 8 years ago

Watching the message rates, I do not observe recursion so I guess it breaks for another reason.

Looking at the memory dump in VS, I'm seeing where a circular reference is detected.

Anyway, It works when the instance of the bus client that subscribes is different from the one that publishes (which is how it is done in the test cases you linked earlier) so I am going with this approach.

I set up the subscribers in the Startup class. I am now creating a bus client instance per consumer. Do you see any problem with this? I don't think using one client instance for all consumers would cause any problems either as long as the instance of the publishing client is different.

jayrulez commented 8 years ago

@pardahlman

pardahlman commented 8 years ago

It is generally recommended to have one broker connection per application. Depending on how you instantiate the bus client (Service Collection extensions or BusClientFactory), you might create one connection per client.

Take a look in the mgmt tool and verify that your application does not create multiple connections. If it doesn't - you're fine :)