rebus-org / Rebus

:bus: Simple and lean service bus implementation for .NET
https://mookid.dk/category/rebus
Other
2.26k stars 354 forks source link

Deferred messages disappear #838

Closed pwelzien closed 4 years ago

pwelzien commented 4 years ago

Hi! I have noticed a situation where deferred messages disappear. It might be due to a lousy Sql Server connection or high load, but it is quite reproducible in my test case, and most importantly, the deferred messages are gone without a trace! I’m using Rebus 5.4.0 with all the latest and greatest components, running under .NET Core 2.2. The test case is using RabbitMQ for message handling with SQL Server for sagas and timeouts. I send in 1000 messages to a saga handler, which in turn send a deferred message and normal message for each message to the same saga. After the queues are empty, I count the number of log entries with “testmessage1”, “testmessage2”, and “testmessage3”. The result is not consistent: “testmessage1” and “testmessage2” are always reported 1000 times each but “testmessage3”, which is deferred, is reported 999 or 1000 times.

Can you see anything obviously wrong in my code?

This is my configuration in the startup: services.AddRebus(configure => configure .Options(o => o.SetBusName("Test")) .Options(o => o.LogPipeline(verbose: true)) .Logging(l => l.Serilog())
.Transport(t => t.UseRabbitMq(Configuration["rebus:rabbit"], QueueNames.Queue) .ExchangeNames(ExchangeNames.Direct, ExchangeNames.Topics) .Prefetch(50)) .Timeouts(t => t.StoreInSqlServer(rebusConnection, "RebusTimeout", true)) .Sagas(s => s.StoreInSqlServer(rebusConnection, "RebusSaga", "RebusSagaIndex", true)) .Options(o => o.SetMaxParallelism(rebusThreads)) .Options(o => o.SimpleRetryStrategy( errorQueueAddress: QueueNames.ErrorQueue, maxDeliveryAttempts: 1, secondLevelRetriesEnabled: false)) .Routing(r => r.TypeBased() .MapAssemblyOf(QueueNames.Queue, type => type.Namespace.StartsWith("Test.Models")) .MapFallback(QueueNames.ErrorQueue)) );

The actual test, send in 1000 messages: for (int i = 1; i <= 1000; i++) { var id = Guid.NewGuid(); await this.bus.Send(new TestMessage { Id = id }); }

The saga handler: public class TestData : SagaData { public Guid TestId { get; set; } public bool Defered { get; set; } }

public class TestSaga : Saga<TestData>
    , IAmInitiatedBy<TestMessage>
    , IHandleMessages<TestMessage2>
    , IHandleMessages<TestMessage3>
{
    private readonly IBus _bus;
    private readonly ILogger<TestSaga> _logger;

    public TestSaga(IBus bus, ILogger<TestSaga> logger)
    {
        _bus = bus;
        _logger = logger;
    }

    public async Task Handle(TestMessage message)
    {
        await _bus.Defer(TimeSpan.FromSeconds(1),  new TestMessage2 { Id = message.Id });
        await _bus.Send(new TestMessage3 { Id = message.Id });
        Data.TestId = message.Id;
        _logger.LogInformation("testmessage1 handled");
    }

    public async Task Handle(TestMessage2 message)
    {
        if (Data.Defered)
            MarkAsComplete();
        else
            Data.Defered = true;
        await Task.CompletedTask;
        _logger.LogInformation("testmessage2 handled");
    }

    public async Task Handle(TestMessage3 message)
    {
        if (Data.Defered)
            MarkAsComplete();
        else
            Data.Defered = true;
        await Task.CompletedTask;
        _logger.LogInformation("testmessage3 handled");
    }

    protected override void CorrelateMessages(ICorrelationConfig<TestData> config)
    {
        config.Correlate<TestMessage>(m => m.Id, s => s.TestId);
        config.Correlate<TestMessage2>(m => m.Id, s => s.TestId);
        config.Correlate<TestMessage3>(m => m.Id, s => s.TestId);
    }
}

public class TestMessage
{
    public Guid Id { get; set; }
}

public class TestMessage2
{
    public Guid Id { get; set; }
}

public class TestMessage3
{
    public Guid Id { get; set; }
}
mookid8000 commented 4 years ago

Sorry, but the code you posted is a little too much for me to digest right now.

Can you make a shorter reproduction of the issue?

pwelzien commented 4 years ago

Sure can:

This is my configuration in the startup as before:

services.AddRebus(configure => configure
  .Options(o => o.SetBusName("Test"))
  .Options(o => o.LogPipeline(verbose: true))
  .Logging(l => l.Serilog())
  .Transport(t => t.UseRabbitMq(Configuration["rebus:rabbit"], QueueNames.Queue)
  .ExchangeNames(ExchangeNames.Direct, ExchangeNames.Topics)
  .Prefetch(50))
  .Timeouts(t => t.StoreInSqlServer(rebusConnection, "RebusTimeout", true))
  .Sagas(s => s.StoreInSqlServer(rebusConnection, "RebusSaga", "RebusSagaIndex", true))
  .Options(o => o.SetMaxParallelism(rebusThreads))
  .Options(o => o.SimpleRetryStrategy(
    errorQueueAddress: QueueNames.ErrorQueue,
    maxDeliveryAttempts: 1,
    secondLevelRetriesEnabled: false))
  .Routing(r => r.TypeBased()
  .MapAssemblyOf(QueueNames.Queue,
    type => type.Namespace.StartsWith("Test.Models"))
  .MapFallback(QueueNames.ErrorQueue))
);

The actual test, send in 1000 messages:

    for (int i = 1; i <= 1000; i++)
    {
        var id = Guid.NewGuid();
        await this.bus.Send(new TestMessage { Id = id });
    }

The saga handler:

    public class TestSaga : Saga<TestData>
        , IAmInitiatedBy<TestMessage>
        , IHandleMessages<TestMessage2>
    {
        private readonly IBus _bus;
        private readonly ILogger<TestSaga> _logger;

        public TestSaga(IBus bus, ILogger<TestSaga> logger)
        {
            _bus = bus;
            _logger = logger;
        }

        public async Task Handle(TestMessage message)
        {
            await _bus.Defer(TimeSpan.FromSeconds(1),  new TestMessage2 { Id = message.Id });
            Data.TestId = message.Id;
            _logger.LogInformation("testmessage1 handled");
        }

        public async Task Handle(TestMessage2 message)
        {
            MarkAsComplete();
            await Task.CompletedTask;
            _logger.LogInformation("testmessage2 handled");
        }

        protected override void CorrelateMessages(ICorrelationConfig<TestData> config)
        {
            config.Correlate<TestMessage>(m => m.Id, s => s.TestId);
            config.Correlate<TestMessage2>(m => m.Id, s => s.TestId);
        }
    }
mookid8000 commented 4 years ago

I've tried to reproduce the issue, but I was unsuccessful – everything seems to work like it should.

Could you take a look at the code here 👉 https://bitbucket.org/mookid8000/rabbitrepro/src/master/ and see if it differs from yours?

Btw. when you say that "the deferred messages disappear", I had a thought in my head: Did you, by any chance, run more than one instance of the program?

Because if so, then they would compete for the deferred messages from the timeouts database, and they would also compete for the messages if the queue names were the same.

Just a thought. Please look at my code and see, if it's different from yours. 🙂

pwelzien commented 4 years ago

Yes, I have multiple consumers of the deferred queue stored in a SQL Server database. Luckily I can remove all deferred queues as they are only used to validate states. I've been trying to move the deferred queue to a rabbit queue as it seems work with multiple consumers. Thanks for info.

mookid8000 commented 4 years ago

Ok. I'm not quite sure I understand what you said there 🙂 but for the record I just want to say this:

The SQL timeout storage works perfectly well with multiple endpoints concurrently – the handling of due messages will be distributed among all Rebus instances that share the same timeout storage. And, since messages due to be delivered are sent to the queue for which they were originally destined, due messages can be handled by any Rebus instance.

RabbitMQ also works perfectly well with competing consumers. But, as always when using this pattern, it is important that the consumers that share a queue are simply multiple instances of the same service.

Therefore, my suspicion is that your setup is a tad too complex to be able to simply count logging statements in one of your instances, to determine whether things are working as they should.

Do you think this could explain what you're seeing?