Closed videege closed 7 years ago
Hello @videege 👋 ,
I tried to reproduce this in the RawRabbit.Todo
project, without any success.
Looking at the log entries, everything seems to go well. Early on the sequences completes (Sequence '8998c39c-c797-4552-869e-4f9c52e7a0dd' completed), and then it looks like the http request also completes (Request finished in 163.8095ms). It is a bit strange that the exception is thrown after the execution is done.
I wonder if:
If possible, you could share your code and I'll try to run it locally!
@videege is this still an issue for you?
I can confirm I'm still seeing this but it doesn't seem to negatively impact execution at all. I can't share code but let me try to get you some logs.
Hi @pardahlman, sorry for the long delay on this. This seems to be causing some actual problems in our application now. Let me see if I can describe the sequence of events:
client.PublishAsync(/* Class that Service A is expecting */)
. No error is thrown until execution drops out of the method that is executing due to the subscription on Service A's message.When I hook up to the default_error_exchange, I can see that the error being logged is the AlreadyClosedException
above. Here's the relevant data from the error message that gets published:
exception_type: AlreadyClosedException
exception_stacktrace: at RabbitMQ.Client.Impl.SessionBase.Transmit(Command cmd)
at RabbitMQ.Client.Impl.ModelBase.ModelSend(MethodBase method, ContentHeaderBase header, Byte[] body)
at RabbitMQ.Client.Impl.RecoveryAwareModel.BasicAck(UInt64 deliveryTag, Boolean multiple)
at RawRabbit.Pipe.Middleware.ExplicitAckMiddleware.AcknowledgeMessage(IPipeContext context)
at RawRabbit.Pipe.Middleware.ExplicitAckMiddleware.<InvokeAsync>d__7.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at RawRabbit.Pipe.Middleware.CancellationMiddleware.<InvokeAsync>d__0.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at RawRabbit.Pipe.Middleware.ExceptionHandlingMiddleware.<InvokeAsync>d__4.MoveNext()
Here are the logs of Service B:
2017-07-18 12:43:58.387 -06:00 [Debug] Existing connection is open and will be used.
2017-07-18 12:43:58.398 -06:00 [Debug] Stage 'Initialized' has no additional middlewares registered.
2017-07-18 12:43:58.398 -06:00 [Debug] Found message type 'CollectionDataResponse`1' in context. Creating consume config based on it.
2017-07-18 12:43:58.399 -06:00 [Information] Configuration action for 'collectiondataresponse[dto]' found.
2017-07-18 12:43:58.404 -06:00 [Debug] Stage 'ConsumeConfigured' has no additional middlewares registered.
2017-07-18 12:43:58.404 -06:00 [Debug] Updating routing key with GlobalExecutionId ''
2017-07-18 12:43:58.405 -06:00 [Information] Routing key updated with GlobalExecutionId: collectiondataresponse[dto].9338b802-69bf-4f31-8b2c-a1a90eda7874.#.
2017-07-18 12:43:58.406 -06:00 [Debug] Declaring queue 'collectiondataresponse[dto]_9338b802-69bf-4f31-8b2c-a1a90eda7874'.
2017-07-18 12:43:58.407 -06:00 [Information] Declaring queue 'collectiondataresponse[dto]_9338b802-69bf-4f31-8b2c-a1a90eda7874'.
2017-07-18 12:43:58.414 -06:00 [Debug] Done processing topology work.
2017-07-18 12:43:58.414 -06:00 [Debug] Stage 'QueueDeclared' has no additional middlewares registered.
2017-07-18 12:43:58.415 -06:00 [Debug] Exchange configuration found. Declaring 'cobra.messages.responses'.
2017-07-18 12:43:58.416 -06:00 [Debug] Stage 'ExchangeDeclared' has no additional middlewares registered.
2017-07-18 12:43:58.416 -06:00 [Information] Binding queue 'collectiondataresponse[dto]_9338b802-69bf-4f31-8b2c-a1a90eda7874' to exchange 'cobra.messages.responses' with routing key 'collectiondataresponse[dto].9338b802-69bf-4f31-8b2c-a1a90eda7874.#'
2017-07-18 12:43:58.423 -06:00 [Debug] Done processing topology work.
2017-07-18 12:43:58.423 -06:00 [Debug] Stage 'QueueBound' has no additional middlewares registered.
2017-07-18 12:43:58.424 -06:00 [Debug] Stage 'ConsumerChannelCreated' has no additional middlewares registered.
2017-07-18 12:43:58.425 -06:00 [Information] Preparing to consume message from queue 'collectiondataresponse[dto]_9338b802-69bf-4f31-8b2c-a1a90eda7874'.
2017-07-18 12:43:58.428 -06:00 [Debug] Stage 'ConsumerCreated' has no additional middlewares registered.
2017-07-18 12:43:58.492 -06:00 [Debug] Stage 'Initialized' has no additional middlewares registered.
2017-07-18 12:43:58.492 -06:00 [Debug] Stage 'ProducerInitialized' has no additional middlewares registered.
2017-07-18 12:43:58.493 -06:00 [Information] GlobalExecutionId '9338b802-69bf-4f31-8b2c-a1a90eda7874' was allready found in PipeContext.
2017-07-18 12:43:58.495 -06:00 [Debug] Stage 'PublishConfigured' has no additional middlewares registered.
2017-07-18 12:43:58.496 -06:00 [Debug] Updating routing key with GlobalExecutionId '9338b802-69bf-4f31-8b2c-a1a90eda7874'
2017-07-18 12:43:58.496 -06:00 [Information] Routing key updated with GlobalExecutionId: dtoquerybyid.9338b802-69bf-4f31-8b2c-a1a90eda7874.
2017-07-18 12:43:58.498 -06:00 [Debug] Exchange configuration found. Declaring 'cobra.messages.queries'.
2017-07-18 12:43:58.498 -06:00 [Information] Declaring exchange 'cobra.messages.queries'.
2017-07-18 12:43:58.500 -06:00 [Debug] Done processing topology work.
2017-07-18 12:43:58.501 -06:00 [Debug] Stage 'ExchangeDeclared' has no additional middlewares registered.
2017-07-18 12:43:58.504 -06:00 [Debug] Stage 'MessageSerialized' has no additional middlewares registered.
2017-07-18 12:43:58.505 -06:00 [Debug] Stage 'BasicPropertiesCreated' has no additional middlewares registered.
2017-07-18 12:43:58.506 -06:00 [Debug] Begining to process 'GetChannel' requests.
2017-07-18 12:43:58.507 -06:00 [Debug] 'GetChannel' has been processed.
2017-07-18 12:43:58.508 -06:00 [Debug] Adding channel 16 to Execution Context.
2017-07-18 12:43:58.509 -06:00 [Debug] Stage 'ChannelCreated' has no additional middlewares registered.
2017-07-18 12:43:58.510 -06:00 [Debug] No Mandatory Callback registered.
2017-07-18 12:43:58.511 -06:00 [Debug] Publish Acknowledgement is disabled.
2017-07-18 12:43:58.512 -06:00 [Debug] Stage 'PreMessagePublish' has no additional middlewares registered.
2017-07-18 12:43:58.514 -06:00 [Information] Performing basic publish with routing key dtoquerybyid.9338b802-69bf-4f31-8b2c-a1a90eda7874 on exchange cobra.messages.queries.
2017-07-18 12:43:58.518 -06:00 [Debug] Stage 'MessagePublished' has no additional middlewares registered.
2017-07-18 12:43:58.630 -06:00 [Debug] Invoking consumer pipe for message 'e38ead2f-b487-43e5-af79-cc2a48933282'.
2017-07-18 12:43:58.631 -06:00 [Debug] Stage 'MessageRecieved' has no additional middlewares registered.
2017-07-18 12:43:58.632 -06:00 [Debug] Trying to extract global_execution_id from header
2017-07-18 12:43:58.633 -06:00 [Debug] Header type extracted: 'String'
2017-07-18 12:43:58.659 -06:00 [Debug] Stage 'MessageDeserialized' has no additional middlewares registered.
2017-07-18 12:43:58.688 -06:00 [Debug] Disposing subscriptions for Message Sequence '9338b802-69bf-4f31-8b2c-a1a90eda7874'.
2017-07-18 12:43:58.744 -06:00 [Information] Sequence '9338b802-69bf-4f31-8b2c-a1a90eda7874' completed with message 'CollectionDataResponse`1'.
2017-07-18 12:43:58.748 -06:00 [Debug] Stage 'Initialized' has no additional middlewares registered.
2017-07-18 12:43:58.748 -06:00 [Debug] Stage 'ProducerInitialized' has no additional middlewares registered.
2017-07-18 12:43:58.751 -06:00 [Information] Using GlobalExecutionId '1b6d2901-6e05-4408-af95-461cec243fa6' that was found in the execution process.
2017-07-18 12:43:58.753 -06:00 [Debug] Stage 'PublishConfigured' has no additional middlewares registered.
2017-07-18 12:43:58.755 -06:00 [Debug] Updating routing key with GlobalExecutionId '1b6d2901-6e05-4408-af95-461cec243fa6'
2017-07-18 12:43:58.758 -06:00 [Information] Routing key updated with GlobalExecutionId: dataresponse[dtovaliditydata].1b6d2901-6e05-4408-af95-461cec243fa6.
2017-07-18 12:43:58.760 -06:00 [Debug] Exchange configuration found. Declaring 'cobra.messages.responses'.
2017-07-18 12:43:58.761 -06:00 [Debug] Stage 'ExchangeDeclared' has no additional middlewares registered.
2017-07-18 12:43:58.783 -06:00 [Debug] Stage 'MessageSerialized' has no additional middlewares registered.
2017-07-18 12:43:58.783 -06:00 [Debug] Stage 'BasicPropertiesCreated' has no additional middlewares registered.
2017-07-18 12:43:58.786 -06:00 [Debug] Begining to process 'GetChannel' requests.
2017-07-18 12:43:58.788 -06:00 [Debug] 'GetChannel' has been processed.
2017-07-18 12:43:58.789 -06:00 [Debug] Adding channel 16 to Execution Context.
2017-07-18 12:43:58.790 -06:00 [Debug] Stage 'ChannelCreated' has no additional middlewares registered.
2017-07-18 12:43:58.791 -06:00 [Debug] No Mandatory Callback registered.
2017-07-18 12:43:58.792 -06:00 [Debug] Publish Acknowledgement is disabled.
2017-07-18 12:43:58.793 -06:00 [Debug] Stage 'PreMessagePublish' has no additional middlewares registered.
2017-07-18 12:43:58.794 -06:00 [Information] Performing basic publish with routing key dataresponse[dtovaliditydata].1b6d2901-6e05-4408-af95-461cec243fa6 on exchange cobra.messages.responses.
2017-07-18 12:43:58.796 -06:00 [Debug] Stage 'MessagePublished' has no additional middlewares registered.
2017-07-18 12:43:58.803 -06:00 [Debug] Stage 'HandlerInvoked' has no additional middlewares registered.
2017-07-18 12:43:58.907 -06:00 [Error] Exception thrown. Will be handled by Exception Handler
2017-07-18 12:43:58.912 -06:00 [Error] Unhandled exception thrown when consuming message
2017-07-18 12:43:58.914 -06:00 [Information] Declaring exchange 'default_error_exchange'.
2017-07-18 12:43:58.916 -06:00 [Debug] Done processing topology work.
2017-07-18 12:43:58.917 -06:00 [Debug] Existing connection is open and will be used.
2017-07-18 12:43:59.636 -06:00 [Error] An unhandled exception was thrown when consuming message with routing key collectiondataresponse[dto].9338b802-69bf-4f31-8b2c-a1a90eda7874
The thing that concerns me is that the sequence ID 9338b802-69bf-4f31-8b2c-a1a90eda7874
seems to get picked up successfully at the beginning, and then it gets re-used by service B to do the query and response to Service C. Service B then disposes subscriptions for this sequence:
2017-07-18 12:43:58.688 -06:00 [Debug] Disposing subscriptions for Message Sequence '9338b802-69bf-4f31-8b2c-a1a90eda7874'.
Does this hose up the original sequence, which has not yet completed, with this same GUID (Service A -> Service B)? I might just be misinterpreting this, but it seems like it is impossible to nest message sequences, i.e., A -> B -> C.
Hello @videege, thanks for taking moving this forward 👍
I'm trying to recreate what you describe in a unit test. Based on my understanding, this is what you are trying to do (message name/number indicates "flow order")
await serviceB.SubscribeAsync<FirstMessage>(async message =>
{
var nestedSequence = serviceB.ExecuteSequence(s => s
.PublishAsync(new SecondMessage())
.Complete<ThirdMessage>());
await nestedSequence.Task;
await serviceB.PublishAsync(new ForthMessage());
}
);
await serviceC.SubscribeAsync<SecondMessage>(message =>
{
return serviceC.PublishAsync(new ThirdMessage());
});
var mainSequence = serviceA.ExecuteSequence(s => s
.PublishAsync(new FirstMessage())
.Complete<ForthMessage>()
);
await mainSequence.Task;
Is this close to what you're doing?
@pardahlman I'm also seeing these 'already closed' exceptions. They seem fairly benign - our requests complete without issue.
Our scenario differs very slightly - rather than the typical request/response pattern for our request command handlers we're currently using the pub/sub model instead - so we're not really doing fully nested sequences. If I flip the code back to the Request/Response pattern obviously the issue doesn't occur.
The consistent issue appears to be that publishing messages within the context of an existing handler causes this exception to be thrown.
I think I've narrowed down to where the problem is. Stateless fires of an event that appears to be running in a different thread than the rest of the execution. That part of the code disposes the subscription(s) and channel for the sequence, at the same time as RawRabbit tries to use the same channel to ack the message that completes the sequence. That is, I don't think it has to do with the nested sequences. I'll need to look in to this a bit further, now that I've found a way to repro it.
This issue has been resolved in the just released beta8. Closing this ticket now, but feel free to re-open if you experience any related problems.
I was experimenting with 2.0 (beta 6) and I noticed that I encounter this error when handling the completion of a message sequence.
My setup is
Everything works fine but in the debug output for the ASP.NET app I see logs like this:
Is this output to be expected, or am I doing something wrong? I basically copied from the sample app in the 2.0 branch.