rebus-org / Rebus.RabbitMq

:bus: RabbitMQ transport for Rebus
https://mookid.dk/category/rebus
Other
63 stars 44 forks source link

Why Rebus.Timeouts.ITimeoutManager is not Implemented? #56

Closed xyfy closed 4 years ago

xyfy commented 4 years ago

Why Rebus.Timeouts.ITimeoutManager is not Implemented?

And When you plan to do this?

I want to use the defer message in rabbitmq

or anything I can do ?

mookid8000 commented 4 years ago

What makes you think it's not implemented?

xyfy commented 4 years ago

I donnot found it in the source code ,but I found a Implemented in Rebus.SqlServer

I tried to use the UseExternalTimeoutManagerto defer message

but it don't be affected

I start another bus to subcribe the defer queue like code below

rebusConfig.Timeouts(t => t.UseExternalTimeoutManager(Consts.MqDeferQueue));
                _bus = rebusConfig.Start();
                using (var active = new BuiltinHandlerActivator())
                {
                    var deferConfig = Configure.With(active );
                    deferConfig.Logging(moduleConfig.LoggingConfigurer);
                    deferConfig.Transport(t => t.UseRabbitMq(Consts.MqConnnectionString, Consts.MqDeferQueue));
                    deferConfig.Routing(c =>
                    {
                        var typebaseRouting = c.TypeBased();
                        foreach (var mqMessageType in mqMessageTypes)
                        {
                            typebaseRouting.Map(mqMessageType, Consts.MqDeferQueue);
                        }
                    });

                   // deferConfig.Timeouts(t => t.StoreInMemory());
                    deferConfig.Timeouts(t => t.StoreInSqlServer(Configuration.DefaultNameOrConnectionString, "Rebus_Mq_Timeouts", true));
                    deferConfig.Start();
                }

It worked。 However, I found that I did not actually use this delay queue at all. It just use Rebus.SqlServer to poll the table to achieve it. it's nothing go on in [MqDeferQueue]

xyfy commented 4 years ago

Then I try to go to source , I have not found a class that Implement Rebus.Timeouts.ITimeoutManager in Rebus.RabbitMq, Is there anything wrong with me ?

mookid8000 commented 4 years ago

RabbitMQ is not capable of sending messages with individual delays – that's the reason why Rebus' RabbitMQ transport does not provide an implementation.

Instead, with Rebus, you need to configure some kind of persistent store for your delayed messages. E.g. as you've figured out, you can use SQL Server (via Rebus.SqlServer) to store delayed messages like this:

Configure.With(...)
    .Transport(...)
    .Timeouts(t => t.StoreInSqlServer(...))
    .Start();

As you've also correctly figured out, it's fairly easy to create a single timeout manager, e.g. like this:

Configure.With(...)
    .Transport(t => t.UseRabbitMq(..., "timeouts"))
    .Timeouts(t => t.StoreInSqlServer(...))
    .Start();

which you can then use from all other endpoints like this:

Configure.With(...)
    .Transport(t => t.UseRabbitMq(...))
    .Timeouts(t => t.UseExternalTimeoutManager("timeouts"))
    .Start();

When you're storing delayed messages in SQL Server, having a single endpoint be the timeout manager is the preferred approach, because it's easy to manage, and it doesn't put too much of a burden on your database.

xyfy commented 4 years ago

I don't know why

but today I try to use the RabbitMQ Plugin rabbitmq-delayed-message-exchange which offered by RabbitMQ ,that worked fine.

I am looking for a way to use it with Rebus.RabbitMq together now.

mookid8000 commented 4 years ago

but today I try to use the RabbitMQ Plugin rabbitmq-delayed-message-exchange which offered by RabbitMQ ,that worked fine.

I am looking for a way to use it with Rebus.RabbitMq together now.

I didn't know that plugin existed. You might be able to put it to use with Rebus by providing a x-delay header on your messages, e.g. like this:

var headers = new Dictionary<string, string> {{"x-delay", "5000"}};

await bus.Send(new SomeMessage(), headers);

although I'm not completely sure that the RabbitMQ plugin will correctly pick up the value.

mookid8000 commented 4 years ago

I hope this answered your question. I'm closing the issue for now.