pardahlman / RawRabbit

A modern .NET framework for communication over RabbitMq
MIT License
747 stars 144 forks source link

Beta8: Failed message handling #256

Closed Sarmaad closed 7 years ago

Sarmaad commented 7 years ago

Hi,

I have started using RawRabbit 2.0 Beta 8 and taking it for a spin though a message life-cycles.

The use-case I have is a subscriber to a message:

client.SubscribeAsync<TMessage, MessageContext>(
(message, context) =>
            {
                throw new Exception("Forced Error!");
                return Task.FromResult(0);
            });

The exception is captured and a default error exchange is created, then a message is published with the error details in the headers.. which is great.

image

Logs:


[14:25:55 ERR] Exception thrown. Will be handled by Exception Handler
[14:25:55 ERR] Unhandled exception thrown when consuming message
System.Exception: Forced Error! **(This is expected)**
<truncated>

[14:25:55 INF] Declaring exchange default_error_exchange.
[14:25:55 ERR] An unhandled exception was thrown when consuming message with routing key participantinvited.bbd4377d-516b-4754-9efe-f70ddd2ce81b
System.NotSupportedException: Invokation Result of Message Handler not found.
   at RawRabbit.Pipe.Middleware.ExplicitAckMiddleware.<AcknowledgeMessageAsync>d__10.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at RawRabbit.Enrichers.Polly.Middleware.ExplicitAckMiddleware.<AcknowledgeMessageAsync>d__1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---

Thank you.

pardahlman commented 7 years ago

Hello @Sarmaad - thanks for reporting this 👍

There are currently no custom way to consume messages from the error exchange, the easiest thing is to use the fluent configuration builder to create a queue that is bound to the error exchange (with the correct routing key). See code below

await subscriber.SubscribeAsync<BasicMessage, IBasicProperties>(async (msg, props) =>
{
    var host = Encoding.UTF8.GetString((byte[]) props.Headers[PropertyHeaders.Host]);
    var exceptionType = Encoding.UTF8.GetString((byte[]) props.Headers[PropertyHeaders.ExceptionType]);
    var stacktrace = Encoding.UTF8.GetString((byte[]) props.Headers[PropertyHeaders.ExceptionStackTrace]);
}, ctx => ctx
    .UseMessageContext(c => c.GetBasicProperties())
    .UseConsumerConfiguration(cfg => cfg
        .FromDeclaredQueue(q => q.WithName("custom_error_queue"))
        .OnDeclaredExchange(e => e.WithName("default_error_exchange"))
));

A few things:

As for the exception: let me investigate and get back to you!

pardahlman commented 7 years ago

Hello again, I was able to reproduce the behavior with the un-acked messaged and has also created a fix for it. This will be part of the beta9 of the client.