rebus-org / Rebus.Autofac

:bus: Autofac container adapter for Rebus
https://mookid.dk/category/rebus
Other
12 stars 11 forks source link

Rebus.Autofac can't resolve generics IFailed<T>, making SecondLevelTry to fail #11

Closed pelin closed 6 years ago

pelin commented 6 years ago

When GetHandlerInvokers in ActivateHandlersStep for a failed message (FailedMessageWrapper), no invokers can be found, resulting in an empy list of handlers.

Register in Rebus.Autofac:

autofacBus.Subscribe<string>().Wait();
autofacBus.Subscribe<IFailed<string>>().Wait();

--> Do not work!

Register with BuildInActivator:

rebusActivator.Bus.Subscribe<string>();
rebusActivator.Bus.Subscribe<IFailed<string>>();

-- Working OK!

mookid8000 commented 6 years ago

First off, the lines

autofacBus.Subscribe<IFailed<string>>()

and

rebusActivator.Bus.Subscribe<IFailed<string>>();

don't "register" anything, and they do not make any difference in this case, because there's no events published to the topic for IFailed<string> (unless you explicitly went and did this, but I assume you didn't 😄 ).

When a message is in an endpoint's input queue, the message will be processed by that endpoint.

And then, if the message has failed too many times, and you have enabled 2nd level retries, then the message gets processed again – not as itself, but wrapped in an implementation of IFailed<>.

This means that your IoC container must be capable of resolving all IHandleMessages<IFailed<YourMessage>> implementations registered in your container.

My guess is that you have not registered your handler(s) in such a way that they're correctly resolved as implementations of IHandleMessages<IFailed<YourMessage>>.

Could you try and do this:

var handlers = container.Resolve<IEnumerable<IHandleMessages<IFailed<YourMessage>>>>();

and then see what your Autofac container returns?

pelin commented 5 years ago

Thanks for your answer, and apologies for late response, but it's been vacationtime.

Still will say that this does not work. I tested the resolvement of the registration, and it do resolve. Check this "test code". (do uncomment one at a time, first autofac, then builtincontainer)

        //// Rebus With AUTOFAC
        var builder = new ContainerBuilder();
        builder.RegisterHandler<TestMessageRebusWithSltHandler>();
        builder.Register<ILogger>(c => new ConsoleLogger()).InstancePerDependency();
        var config = new RebusMessagingConfiguration("Endpoint=sb:...")
        {
            MaxRetries = 1,
            EnableSecondLevelRetries = true
        };
        builder.RegisterRebus((configurer, context) =>
            {
                return configurer
                    .Transport(t => t.UseAzureServiceBus(config.AzureServiceBusConnectionString, config.CurrentEndpointName))
                .Options(o => o.SetBackoffTimes(TimeSpan.FromMilliseconds(500), TimeSpan.FromMilliseconds(2000), TimeSpan.FromMilliseconds(5000)))
                .Options(r => r.SimpleRetryStrategy(maxDeliveryAttempts: config.MaxRetries, secondLevelRetriesEnabled: config.EnableSecondLevelRetries))
                .Options(o => o.SetMaxParallelism(config.SetMaxParallelism))
                .Options(o => o.SetNumberOfWorkers(config.SetNumberOfWorkers))
                .Options(t => t.SetWorkerShutdownTimeout(TimeSpan.FromSeconds(120)));
            }
        );
        var container = builder.Build();
        var autofacBus = container.Resolve<IBus>();
        autofacBus.Subscribe<string>().Wait();
        autofacBus.Subscribe<IFailed<string>>().Wait();
        var handlers = container.Resolve<IEnumerable<IHandleMessages<IFailed<string>>>>();
        // --> YES, IT RESOLVES!
        autofacBus.Publish("hi from Autofac").Wait();

        //// Rebus With BuiltinHandlerActivator
        var rebusActivator = new BuiltinHandlerActivator();
        var bus = new ReBus();
        rebusActivator.Register(() => new TestMessageRebusWithSltHandler(logger));
        var rebusMessagingConfiguration = new RebusMessagingConfiguration("Endpoint=sb:...")
        {
            AutomaticallyRenewLock = true,
            MaxRetries = 1,
            EnableSecondLevelRetries = true
        };
        var rebusBus = bus.Start(rebusActivator, rebusMessagingConfiguration);
        //Confa prenumerationer
        rebusActivator.Bus.Subscribe<string>();
        rebusActivator.Bus.Subscribe<IFailed<string>>();
        bus.Activate(rebusActivator);
        rebusBus.Publish("hi from BuiltInActivator").Wait();

The message handler class looks like this:

public class TestMessageRebusWithSltHandler : IHandleMessages<string>, 
IHandleMessages<IFailed<string>>
{
    private ILogger _logger;

    public TestMessageRebusWithSltHandler(ILogger logger)
    {
        _logger = logger;
        _bus = bus;
    }

    public async Task Handle(string message)
    {
        _logger.LogDebug("String Message received!");

        throw new ApplicationException("test, should retry x times");
    }

    public async Task Handle(IFailed<string> message)
    {
        _logger.LogDebug("SLT:String  FailedMessage received!");
    }
}

The BuiltInActivator works correctly and an IFailed-message is dispatched to the IFailed-handle-method, the Autofac-version does not.

mookid8000 commented 5 years ago

Hi @pelin , I had time to try your program now.

When I create a console application and use your program (slightly modified for simplicity and to make it compile 😁 ) looking like this:

class Program
{
    static void Main()
    {
        //// Rebus With AUTOFAC
        var builder = new ContainerBuilder();

        builder.RegisterHandler<TestMessageRebusWithSltHandler>();

        builder.RegisterRebus((configurer, context) => configurer
            .Transport(t => t.UseInMemoryTransport(new InMemNetwork(), "test-queue"))
            .Options(o =>
            {
                o.SimpleRetryStrategy(
                    maxDeliveryAttempts: 5,
                    secondLevelRetriesEnabled: true
                );
            })
            .Subscriptions(s => s.StoreInMemory(new InMemorySubscriberStore())));

        using (var container = builder.Build())
        {
            var autofacBus = container.Resolve<IBus>();

            autofacBus.Subscribe<string>().Wait();

            autofacBus.Publish("hi from Autofac").Wait();

            Console.WriteLine("Press ENTER to quit");
            Console.ReadLine();
        }
    }

    public class TestMessageRebusWithSltHandler : IHandleMessages<string>, IHandleMessages<IFailed<string>>
    {
        public async Task Handle(string message)
        {
            Console.WriteLine("String Message received!");

            throw new ApplicationException("test, should retry x times");
        }

        public async Task Handle(IFailed<string> message)
        {
            Console.WriteLine("SLT:String FailedMessage received!");
        }
    }
}

I get this output in the console:

Rebus.Threading.TaskParallelLibrary.TplAsyncTask INFO (Thread #1): Starting periodic task "CleanupTrackedErrors" with interval 00:01:00
Rebus.Bus.RebusBus INFO (Thread #1): Starting bus 1
Rebus.Bus.RebusBus INFO (Thread #1): Setting number of workers to 1
Rebus.Bus.RebusBus DEBUG (Thread #1): Adding worker "Rebus 1 worker 1"
Rebus.Bus.RebusBus INFO (Thread #1): Started
Rebus.Workers.ThreadPoolBased.ThreadPoolWorker DEBUG (Rebus 1 worker 1): Starting (threadpool-based) worker "Rebus 1 worker 1"
Rebus.Pipeline.Send.SendOutgoingMessageStep DEBUG (Thread #1): Sending "hi from Autofac" -> "test-queue"
Press ENTER to quit
String Message received!
Rebus.Retry.ErrorTracking.InMemErrorTracker WARN (Rebus 1 worker 1): Unhandled exception 1 while handling message with ID "98084685-4900-4e65-9a02-712d207ca9fc"
String Message received!
Rebus.Retry.ErrorTracking.InMemErrorTracker WARN (Rebus 1 worker 1): Unhandled exception 2 while handling message with ID "98084685-4900-4e65-9a02-712d207ca9fc"
String Message received!
Rebus.Retry.ErrorTracking.InMemErrorTracker WARN (Rebus 1 worker 1): Unhandled exception 3 while handling message with ID "98084685-4900-4e65-9a02-712d207ca9fc"
String Message received!
Rebus.Retry.ErrorTracking.InMemErrorTracker WARN (Rebus 1 worker 1): Unhandled exception 4 while handling message with ID "98084685-4900-4e65-9a02-712d207ca9fc"
String Message received!
Rebus.Retry.ErrorTracking.InMemErrorTracker WARN (Rebus 1 worker 1): Unhandled exception 5 while handling message with ID "98084685-4900-4e65-9a02-712d207ca9fc"
SLT:String FailedMessage received!
Rebus.Pipeline.Receive.DispatchIncomingMessageStep DEBUG (Rebus 1 worker 1): Dispatching message "String/98084685-4900-4e65-9a02-712d207ca9fc" to 1 handlers took 1 ms

which indicates that everything worked as it should.

Something else must be tricking you – could you try and see if your original program has any differences compared to my version?

pelin commented 5 years ago

Yes something was tricking me, and I 'm pretty sure I found what. This problem only happens in the beta-version (6.0.0-b01) of the Rebus.Autofac-package. Sorry I missed that important thing, that we ran the beta-version. I appreciate your time to investigate this, and you now have a headsup before releasing the beta-version of the rebus.autofac-package.. :-) We can degrade to the main-version so there is no hurry for us now. Thanks. //Peter