pardahlman / RawRabbit

A modern .NET framework for communication over RabbitMq
MIT License
747 stars 144 forks source link

Polly Policies not Executing #268

Closed cocowalla closed 7 years ago

cocowalla commented 7 years ago

I've been giving 2.x's Polly enricher a try, but I can't seem to get the policies to be executed.

I've tried throwing from within RespondAsync, removing access permissions for the RabbitMQ user, as well as shutting down the broker - but the policies never seem to execute, the exception just bubbles up to my code.

Any idea what I'm missing here?

pardahlman commented 7 years ago

I assume that you have looked at the Polly integration tests to get a sense for how to register the plugin and policies?

The Polly support provided by the enricher package covers non-operation specific middlewares, such as publish message, create topology etc. There is a named policy for HandlerInvokation which is the middleware typically used for invoking the user defined message handler. However, the Respond operation has a custom message handler middleware (RespondInvokationMiddleware) which isn't recognized and replaced by the Polly plugin.

You should be able to use policies for the response handler by creating a class similar to this

public class PollyResponseHandlerMiddleware : RespondInvokationMiddleware
{
  protected override Task InvokeMessageHandler(IPipeContext context, CancellationToken token)
  {
    var policy = context.GetPolicy(PolicyKeys.HandlerInvokation);
    return policy.ExecuteAsync(
      action: ct => base.InvokeMessageHandler(context, ct),
      cancellationToken: token,
      contextData: new Dictionary<string, object>
      {
        [RetryKey.PipeContext] = context,
        [RetryKey.CancellationToken] = token
      });
  }
}

And then register it in the client.

In the future I might look at replacing the custom response handler middleware with the default one, so that polly works out of the box for the response operation.

cocowalla commented 7 years ago

I had looked at the integration tests, but I confess I didn't try running them (my broker is setup for client certificate authorisation, it's a pain to change it).

Your comment explains why the policy wasn't firing when my respond handler throws - I'll try the middleware you suggested, and also try a policy with Subsribe/Publish.

Shouldn't the policy still execute for broker-level exceptions during the Request though (e.g. broker has gone down)? Or is there some other way to hook into such events?

pardahlman commented 7 years ago

I looked through the code in RespondInvokationMiddleware and since it didn't override any of the underlying methods in HandlerInvokationMiddleware I've updated the code so that it uses the default middleware (that the Polly plugin enhances).

I'm running a few difference RabbitMQ instances on my local machine using this docker compose file. Not sure if it fits you, but I find it very convenient to have different clusters with different settings etc.

pardahlman commented 7 years ago

As for your question, yes: the Polly policy should trigger for core concepts such as queue bind, publish etc. These policy keys are perhaps helpful to get a sense on what parts of the system that is executed under Polly.

cocowalla commented 7 years ago

I'm still stuggling a bit with this :(

If I instantiate an IBusClient when the broker is down, a BrokerUnreachableException is thrown in ChannelFactory - but it bubbles up to the call to RequestAsync without touching the Polly policy.

The BrokenUnreachableException is thrown before the TransientChannelMiddleware executes, as IChannelFactory is a ctor parameter of TransientChannelMiddleware - is there a way to handle broker connection retries? Should ConnectToBroker() perhaps be called by consuming middleware, instead of in the ChannelFactory ctor as it is today?

pardahlman commented 7 years ago

A design decision was taken back in 1.x: if the broker is unreachable when creating an instance of the client, thrown an exception. The idea is that the client usually is created during startup and startup is part of deployment so the deployment fails if the broker is unreachable.

Now, this might not be the best approach, especially in a world of Docker etc.

The temporary work-around is to implement a custom IChannelFactory. Relevant methods in the default implementation is marked as virtual, making it possible to inject a Polly policy.

Sounds like you have an idea of a solution. If you want to you could create a PR that we could discuss this in?

cocowalla commented 7 years ago

Ah yes, I seem to recall this coming up in an issue I raised for 1.x a long time back - sub-classing ChannelFactory does indeed seem to be a simple solution to this problem.

Regarding an alternative solution, I hadn't given it much though, I'm just a bit adverse in general to doing anything that can fail in a ctor. But I guess it is rater moot if clients are always created during startup, which I will actually do. One alternative though would be to move the ConnectToBroker call to within ChannelFactory.GetChannelAsync, such that it will only be called if not already connected?

pardahlman commented 7 years ago

I agree with you. Also, virtual calls in ctor isn't best practice. On the other hand, the channel factory hasn't changed for quite some time, making it a stable component since 1.x. On the other hand (again 😄 ), I think RawRabbit should support the docker scenario, where the broker container might not be started before the client container.

pardahlman commented 7 years ago

Hello @cocowalla - so the ConnectToBroker is removed in rc1, but the instant connect is still triggered by connecting to the broker in DI registration.

A cool side effect for this is that if you use the Polly plugin, the call to connect won't happen, as the Polly-aware ChannelFactory will be registered.

That factory can be configured to do retries etc like this

var options = new RawRabbitOptions
{
  Plugins = p => p.UsePolly(new PolicyOptions
  {
    PolicyAction = c => c
      .UsePolicy(defaultPolicy)
      .UsePolicy(queueBindPolicy, PolicyKeys.QueueBind),
    ConnectionPolicies = new ConnectionPolicies
    {
      Connect = Policy
        .Handle<BrokerUnreachableException>()
        .WaitAndRetryAsync(new[] {
          TimeSpan.FromSeconds(1),
          TimeSpan.FromSeconds(2),
          TimeSpan.FromSeconds(4),
          TimeSpan.FromSeconds(8)
        })
    }
  })
};
arvigeus commented 6 years ago

I hate hijacking issues, especially with "my code does not work", but here it goes... I tried to go as basic as possible, following what I could found as an example, but it does not work.

Code here

What am I missing?