rebus-org / Rebus.MySql

:bus: MySQL integration for Rebus
https://mookid.dk/category/rebus
Other
7 stars 6 forks source link

Rebus Timeout Configuration #18

Closed bogdan-romaniv closed 3 years ago

bogdan-romaniv commented 3 years ago

Hello doing some tests to try to postpone messages in the queue, using the defer method.

I have this configuration taking as an example the RequestReply project from the examples and modifying it to use the MysqlTransport

namespace Consumer { class Program { const string ConnectionString = "...";

    static void Main()
    {
        using var adapter = new BuiltinHandlerActivator();

        adapter.Handle<Job>(async (bus, job) =>
        {              
            var delay = TimeSpan.FromSeconds(10);
            await bus.Defer(delay,job);
        });

        Configure.With(adapter)
            .Logging(l => l.ColoredConsole(minLevel: LogLevel.Warn))
            .Transport(t => t.UseMySql(ConnectionString, "platform_emails", "consumerinput"))             
            .Start();

        Console.WriteLine("Press ENTER to quit");
        Console.ReadLine();
    }
}

}

If I try this way the following error occurs

[WRN] Rebus.Retry.ErrorTracking.InMemErrorTracker (Rebus 1 worker 1): Unhandled exception 1 while handling message with ID "d04ee57c-10b5-416c-b35e-494cdae48656" System.ArgumentException: Cannot get destination for message of type Consumer.Messages.Job because it has not been mapped!

You need to ensure that all message types that you intend to bus.Send or bus.Subscribe to are mapped to an endpoint - it can be done by calling .Map(someEndpoint) or .MapAssemblyOf(someEndpoint) in the routing configuration. at Rebus.Routing.TypeBased.TypeBasedRouter.GetDestinationAddressForMessageType(Type messageType) at Rebus.Routing.TypeBased.TypeBasedRouter.GetDestinationAddress(Message message) at Rebus.Bus.RebusBus.Defer(TimeSpan delay, Object message, IDictionary2 optionalHeaders) at Consumer.Program.<>c.<<Main>b__1_0>d.MoveNext() in C:\Users\Bogdan\Downloads\RebusSamples-master\RequestReply\Consumer\Program.cs:line 21 --- End of stack trace from previous location --- at Rebus.Activation.BuiltinHandlerActivator.Handler1.Handle(TMessage message) at Rebus.Pipeline.Receive.HandlerInvoker1.Invoke() at Rebus.Pipeline.Receive.DispatchIncomingMessageStep.Process(IncomingStepContext context, Func1 next) at Rebus.Sagas.LoadSagaDataStep.Process(IncomingStepContext context, Func1 next) at Rebus.Pipeline.Receive.ActivateHandlersStep.Process(IncomingStepContext context, Func1 next) at Rebus.Pipeline.Receive.HandleRoutingSlipsStep.Process(IncomingStepContext context, Func1 next) at Rebus.Pipeline.Receive.DeserializeIncomingMessageStep.Process(IncomingStepContext context, Func1 next) at Rebus.DataBus.ClaimCheck.HydrateIncomingMessageStep.Process(IncomingStepContext context, Func1 next) at Rebus.Retry.FailFast.FailFastStep.Process(IncomingStepContext context, Func1 next) at Rebus.Retry.Simple.SimpleRetryStrategyStep.DispatchWithTrackerIdentifier(Func`1 next, String identifierToTrackMessageBy, ITransactionContext transactionContext, String messageId, String secondLevelMessageId) [WRN] Rebus.Retry.ErrorTracking.InMemErrorTracker (Rebus 1 worker 1): Unhandled exception 2 while handling message with ID "d04ee57c-10b5-416c-b35e-494cdae48656" System.ArgumentException: Cannot get destination for message of type Consumer.Messages.Job because it has not been mapped!

If I try to configure a timeouts in the following way

namespace Consumer { class Program { const string ConnectionString = "...";

    static void Main()
    {
        using var adapter = new BuiltinHandlerActivator();

        adapter.Handle<Job>(async (bus, job) =>
        {                
            var delay = TimeSpan.FromSeconds(10);

            await bus.Defer(delay,job);
        });

        Configure.With(adapter)
            .Logging(l => l.ColoredConsole(minLevel: LogLevel.Warn))
            .Transport(t => t.UseMySql(ConnectionString, "platform_emails", "consumerinput"))
            .Timeouts(t => t.StoreInMySql(ConnectionString, "timeouts"))
            .Start();

        Console.WriteLine("Press ENTER to quit");
        Console.ReadLine();
    }
}

}

it returns this error

System.InvalidOperationException: 'Attempted to register primary -> Rebus.Timeouts.ITimeoutManager, but a primary registration already exists: primary -> Rebus.Timeouts.ITimeoutManager'

Looking for solutions in this regard I have come across this issue but it is for the Sql tranport.

https://github.com/rebus-org/Rebus/issues/555

That also applies to the MySQL transport? Dont know what to do if want to defer messages.

Thanks in advance

mookid8000 commented 3 years ago

The reason you get the exception, is because await bus.Defer(..) uses Rebus' endpoint mappings to determine where to send the message... if you intend for the sender to be the recipient, then there's await bus.DeferLocal(...) for that:

adapter.Handle<Job>(async (bus, job) =>
{              
    var delay = TimeSpan.FromSeconds(10);
    await bus.DeferLocal(delay,job);
});

could you try that?

bogdan-romaniv commented 3 years ago

Hello, I am sorry I responded so late, I have been complicated with the work and I have not been able to review your answers.

With DeferLocal it works fine. But I have a question.

With DeferLocal, is the message re-queued to persist between reboots?

Thanks in advance

mookid8000 commented 3 years ago

With DeferLocal, is the message re-queued to persist between reboots?

Yes!

Send/SendLocal and Defer/DeferLocal work the same way: They put a message into a queue.

The *Local variants just happen to select the sender's own input queue as the destination. 🙂

bogdan-romaniv commented 3 years ago

Many thanks. Awesome Work!