rebus-org / Rebus

:bus: Simple and lean service bus implementation for .NET
https://mookid.dk/category/rebus
Other
2.31k stars 362 forks source link

Saga Data not keeping up #958

Closed mkoziel2000 closed 3 years ago

mkoziel2000 commented 3 years ago

I noticed in Rebus 6.5.5 that when I define a straight forward Saga with 4 messages fired in a chain, the messages can get processed before the SagaData has a chance to get updated and passed along. This basically makes the SagaData as a holder of state between messages not very useful. I don't remember seeing this problem in earlier versions of Rebus, although the transport layer we used in the earlier version was SQL Server. I'm currently testing out the Kafka transport layer in the 6.5.5 version where I see the issue (the Saga storage is still in SQL Server).

Here is the basic Saga setup

What I should expect to see is that all the Sagas I pump into the system should make it to the MarkAsComplete() command. Instead, its about 50% of the Sagas that get marked and 50% that never satisfy the IF condition when processing "Msg 3". It seems to be timing related around how long it takes to consume and process the message relative to Rebus getting the SagaData internally updated after a previous message is done processing. In the cases where I get a stale SagaData, there is a Conflict Resolution invoked by Rebus as the result of a Concurrency collision. It seems the only way to overcome this problem is to add an artificial delay to the publishing of the next message in the Saga using deferLocal().

Its an arbitrary test, but I was basically verifying that Kafka as a transport (and subscriber) with SQL Server as the storage for Saga would work together and function properly when I ran into the odd behavior. I don't remember seeing this issue when I used SQLServer for the transport (maybe because the mechanism is slow enough that the SagaData can get processed in time before the next message gets handled) - Or it might have still happened but it was very infrequent.

Is there a way to guarantee that the SagaData is not stale when doing chain-based message sequencing? I suspect that a command&control pattern might be better suited to leveraging SagaData (A central handler that deals with the sequencing logic to determine which message to publish next based on the last message published)...but then, thinking about that pattern, it could also suffer from the same stale SagaData scenario since the worker handler has to publish a message to return control back to the controller handler before it exits its handler method.

Please advise. I have to imagine people must be running into this issue when they are hooked up to fast transport systems like Kafka or Rabbit.

Here is an example Saga that I was using for the test:

public class TestSaga : IdempotentSaga<TestSagaData>,
                        IAmInitiatedBy<KickoffSagaMessages>,
                        IHandleMessages<SagaMessageEarth>,
                        IHandleMessages<SagaMessageWind>,
                        IHandleMessages<SagaMessageFire>,
                        IHandleMessages<KickoffSagaMessages>,
                        IHandleMessages<IFailed<ISagaMessage>>
{
    private IBus _bus;

    private static ILogger _logger = LogManager.GetCurrentClassLogger();

    public TestSaga(IBus bus)
    {
        _bus = bus ?? throw new ArgumentNullException();
    }

    protected override async Task ResolveConflict(TestSagaData otherSagaData)
    {
        this.Data.Task1Processed = this.Data.Task1Processed || otherSagaData.Task1Processed;
        this.Data.Task2Processed = this.Data.Task2Processed || otherSagaData.Task2Processed;
        this.Data.Task3Processed = this.Data.Task3Processed || otherSagaData.Task3Processed;
    }
    #pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously
    public async Task Handle(SagaMessageEarth message)
    #pragma warning restore CS1998 // Async method lacks 'await' operators and will run synchronously
    {
        try
        {
            if (!this.Data.Task1Processed)
            {
                _logger.Info("Processing Earth - {id}", this.Data.SagaInstanceId);
                this.Data.stuffDone += "Earth;";
                this.Data.Task1Processed = true;
            }
            await _bus.Publish(new SagaMessageWind()
            {
                SagaInstanceId = message.SagaInstanceId
            }).ConfigureAwait(false);
            PossiblyDone();
            _logger.Info("Published Wind...Done Processing Earth - {id}", this.Data.SagaInstanceId);
        }
        catch (Exception e)
        {
            _logger.Error(e, "WHAT Earth? - {id}", this.Data.SagaInstanceId);
            throw;
        }
    }

    #pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously
    public async Task Handle(SagaMessageWind message)
    #pragma warning restore CS1998 // Async method lacks 'await' operators and will run synchronously
    {
        try 
        {
            if (!this.Data.Task2Processed)
            {
                _logger.Info("Processing Wind - {id}", this.Data.SagaInstanceId);
                this.Data.stuffDone += "Wind;";
                this.Data.Task2Processed = true;
            }
            await _bus.Publish(new SagaMessageFire()
            {
                SagaInstanceId = message.SagaInstanceId
            }).ConfigureAwait(false);
            PossiblyDone();
            _logger.Info("Published Fire...Done Processing Wind - {id}", this.Data.SagaInstanceId);
        }
        catch (Exception e)
        {
            _logger.Error(e, "WHAT Wind? - {id}", this.Data.SagaInstanceId);
            throw;
        }
    }

    #pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously
    public async Task Handle(SagaMessageFire message)
    #pragma warning restore CS1998 // Async method lacks 'await' operators and will run synchronously
    {
        try
        {
            if (!this.Data.Task3Processed)
            {
                //throw new Exception("Not going to finish this");
                _logger.Info("Processing Fire - {id}", this.Data.SagaInstanceId);
                this.Data.stuffDone += "Fire;";
                this.Data.Task3Processed = true;
            }
            PossiblyDone();
            _logger.Info("Done Processing Fire - {id}", this.Data.SagaInstanceId);
        }
        catch (Exception e)
        {
            _logger.Error(e, "WHAT Fire? - {id}", this.Data.SagaInstanceId);
            throw;
        }
    }

    private void PossiblyDone()
    {
        if (this.Data.Task1Processed && this.Data.Task2Processed && this.Data.Task3Processed)
        {
            _logger.Info("Completed everything for {id}: {msg}", this.Data.SagaInstanceId, this.Data.stuffDone);
            MarkAsComplete();
        }
        else
        {
            _logger.Info("NOT Completed everything for {id}: {task1},{task2},{task3}", this.Data.SagaInstanceId, this.Data.Task1Processed, this.Data.Task2Processed, this.Data.Task3Processed);
        }
    }

    public async Task Handle(KickoffSagaMessages message)
    {
        _logger.Info("Processing Kickoff - {id}", this.Data.SagaInstanceId);
        this.Data.SagaInstanceId = message.SagaInstanceId;
        this.Data.stuffDone += "Initiated;";
        _bus.Publish(new SagaMessageEarth()
        {
            SagaInstanceId = message.SagaInstanceId
        });
        _logger.Info("Published Earth....Done Processing Kickoff - {id}", this.Data.SagaInstanceId);
    }

    protected override void CorrelateMessages(ICorrelationConfig<TestSagaData> config)
    {
        //config.CorrelateHeader<ISagaMessage>("rbs2-corr-id", d => d.Id);
        config.Correlate<KickoffSagaMessages>(m => m.SagaInstanceId, d => d.SagaInstanceId);
        config.Correlate<SagaMessageFire>(m => m.SagaInstanceId, d => d.SagaInstanceId);
        config.Correlate<SagaMessageEarth>(m => m.SagaInstanceId, d => d.SagaInstanceId);
        config.Correlate<SagaMessageWind>(m => m.SagaInstanceId, d => d.SagaInstanceId);
        config.Correlate<IFailed<ISagaMessage>>(m => m.Message.SagaInstanceId, d => d.SagaInstanceId);
    }

    #pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously
    public async Task Handle(IFailed<ISagaMessage> message)
    #pragma warning restore CS1998 // Async method lacks 'await' operators and will run synchronously
    {
        _logger.Error("Unable to handle the message of type {msgtype} with error message {errMsg}", message.Message.GetType().Name, message.ErrorDescription);
    }
}
mookid8000 commented 3 years ago

Hi, I've tried reproducing the issue here.

I think your experience might have been caused by a missing await in this block of code

    public async Task Handle(KickoffSagaMessages message)
    {
        _logger.Info("Processing Kickoff - {id}", this.Data.SagaInstanceId);
        this.Data.SagaInstanceId = message.SagaInstanceId;
        this.Data.stuffDone += "Initiated;";
        _bus.Publish(new SagaMessageEarth()
        {
            SagaInstanceId = message.SagaInstanceId
        });
        _logger.Info("Published Earth....Done Processing Kickoff - {id}", this.Data.SagaInstanceId);
    }

which should have read

    public async Task Handle(KickoffSagaMessages message)
    {
        _logger.Info("Processing Kickoff - {id}", this.Data.SagaInstanceId);
        this.Data.SagaInstanceId = message.SagaInstanceId;
        this.Data.stuffDone += "Initiated;";
        // 👇😅
        await _bus.Publish(new SagaMessageEarth()
        {
            SagaInstanceId = message.SagaInstanceId
        });
        _logger.Info("Published Earth....Done Processing Kickoff - {id}", this.Data.SagaInstanceId);
    }

Could you try again with that added to your code?

(closing for now – feel free to pick it up again, if it still doesn't behave as expected)

mkoziel2000 commented 3 years ago

I just checked the code. The await was indeed missing in the code snippet that I put in this issue. When I check my code, it was present. I think I was just trying to figure out the behavior of the timing issue and you got the remnants of that. I believe I isolated it down to the behavior of the Publish() method. If you add a Thread.Sleep(100) right after the publish in one of the handlers, you will see the behavior unfold. It seems that Rebus doesn't know that the next message pulled from the queue is the result of a publish() call that was made from within a handler that hasn't exited its method and saved its saga data yet. Therefore, when the new message is associated with the same saga, the new saga data is not available so the old saga data is referenced. This, in turn, results in the Conflict resolution event getting triggered. Unfortunately, processing of that event may be too late as the message that is suppose to complete the saga may have already finished it processing by that point. Unless the saga instance is available to the conflict resolution event such that we can mark it final, it seems we will end up with unfinished sagas.

I see how this can be the case given that everything is parallel and concurrent in nature. The question is whether we can get rebus to persist the changed state of the saga data before the next message gets consumed by the next process/thread. Seems like passing in the saga data as part of the publish() might be a good option to solve this issue as it gives Rebus the change to save off the change in saga data state before that message is pushed to the Q. Another option could be that changes to Saga Data are only done through an accesssor, where the accessor will persist each change made...but that would probably break a lot of implementations out there. Third option might be to create an Api that allows for registering publication calls with rebus to initiate as soon as the handler method is done...this would provide the hook necessary to get any change in Saga Data to get persisted first (this is close to the behavior of the DelayLocal() method...but that method is not predictable as to when it will fire relative to the current handler finishing). Just some ideas.

mookid8000 commented 3 years ago

Oh, sorry – could you remove all that .ConfigureAwait(false) too? I think it causes the bus operations to happen immediately and not be enlisted in the current handler transaction....

mkoziel2000 commented 3 years ago

no change in behavior. Curious why you might believe the ConfigureAwait(false) would affect this type of problem since all its doing is changing the SynchronizationContext that the rest of the code in the method runs under. Could this be a problem at the transport connector level? I guess I don't know how Saga handlers would infer synchronicity to make sure that a message (associated with the same saga) can get published by a handler and then wait until that handler exits before it places it onto the physical message queue. I'm currently replacing Kafka with SQL Server to see if the problem "goes away".

mookid8000 commented 3 years ago

guess I don't know how Saga handlers would infer synchronicity to make sure that a message (associated with the same saga) can get published by a handler and then wait until that handler exits before it places it onto the physical message queue.

Most of Rebus doesn't really know about sagas – it just happens to provide a little bit of niceness around loading/saving state, but the overall handling of messages ALWAYS goes like this:

  1. Incoming message is "received" from queue (i.e. made unreceivable to other consumers)
  2. User code is executed
  3. Outgoing messages are sent
  4. Incoming message is ACKed (i.e. actually removed from queue)

During step 2, "user code" (i.e. message handlers, in your case more specifically a saga handler) that sends/publishes messages get all send operations enlisted in Rebus' ambient transaction context, which is an AsyncLocal<ITransactionContext> (accessible via the AmbientTransactionContext class) that the bus automatically picks up if present.

If the bus does not find an ambient transaction context, it will surround every single send operation by its own little transaction context, meaning that it will happen immediately.

Honestly, I've never fully understood what .ConfigureAwait(false) ACTUALLY did, and what the purpose was. I just know that the use of it has been involved in several hard-to-track-down cases of users reporting funny Rebus behavior to me, and then their code was littered with this funny thing that they didn't understand why they used, except that "it's best practice to do it".

If removing it from your code and properly awaiting your Tasks didn't make a change, then maybe you could post some more information about your configuration? Specifically, I would be interested in seeing how you register the bus in your IoC container...

mookid8000 commented 3 years ago

Hi @mkoziel2000 , did you figure out anything about this?

mkoziel2000 commented 3 years ago

Not really. This came up when investigating how the Saga pattern could be leveraged with a Kafka Bus using SQLServer as the persistence for it. In the end, we decided to go with a product called Zeebe to provide the type of orchestration we were looking for to overlay on top of Kafka. Its taking a sledgehammer to a finishing nail...but its giving us the ability to coordinate messages in an event-driven environment in much the same way that we where planning to do with Sagas. I remember that when I reverted back to SQLServer being used for both queuing and persistence, the timing issue went away. I suspected that subscribers into Kafka are much more responsive than subscribers into SQLServer and thus there might be some timing issues one would see using one over the other. But I stopped digging and needed to pivot back in March.

mookid8000 commented 3 years ago

Ok. My suspicion is that this is due to a misbehaving Kafka transport, because it sounds like it sends messages right away. The correct behavior for any Rebus transport is to delay all send/publish operations until after the user code has been correctly executed (i.e. in your case: the saga data has been updated), and only THEN send/publish outgoing messages.

Please note that Rebus does not have an official Kafka transport, and there's good reasons why you would most likely NOT use a queue-oriented library like Rebus on top of a log-based broker.

glazkovalex commented 9 months ago

My suspicion is that this is due to a misbehaving Kafka transport, because it sounds like it sends messages right away.

Hello! Sorry about the necropost, but no, it's not related to that. This could make a difference when throwing an exception when processing messages. But in this case there are no exceptions, all messages are processed correctly. Therefore, when processing all exceptions in the handler, it does not matter at what point in time offsets commit of received messages occur.
By the way, starting from version 3.0.1 of Rebus.Kafka, sending confirmation is postponed until message processing is completed, which provides support for the regular functionality of the Rebus bus: automatic retries and error handling. And transactions are now supported too.

@mkoziel2000 is absolutely right, it's about the delay in recording the saga in the database. The reply to the message arrives before the saga has been saved! That is, in this case, the problem is that saving saga data has nothing to do with sending messages! In a reliable and fault-tolerant implementation of the saga, the outbox pattern should be implemented! That is, Rebus must first save the current saga data and the messages that need to be sent at this step. And only then send messages. If successful, delete the data of the sent message from the outbox. And in case of failure to send messages, periodically try to send them when resuming communication with transport. Only in this case can the saga work reliable!

Now, like @mkoziel2000, I am faced with the problem of using Rebus capabilities to implement processes (sagas) for reliable and fault-tolerant sagas, or switch to Zeebe, Temporary or DurableTask. I am sure that this typical Rebus application case with processes (sagas) was in demand by many.

@mookid8000, is there anywhere an example of using Rebus to implement reliable and fault-tolerant sagas with fast transport (RebbitMq or other fast transport) and saving state to a slow relational database (MSSQL, Postgresql or other relational database)?

I see it here #819 something has been done to implement outbox. Also in the Rebus.SqlServer.Outbox package has also done something to implement outbox. @mookid8000, have you ever come across an example of implementing reliable and fault-tolerant sagas on Rebus along with outbox?

mookid8000 commented 9 months ago

@glazkovalex You don't need an outbox with Rebus to make it buffer outgoing messages and thus execute stuff in the following order when processing messages in a saga:

  1. Receive incoming message
  2. Load saga data via correlation property
  3. Execute saga handler - buffer outgoing messages
  4. Update saga data
  5. Send/publish outgoing messages
  6. ACK incoming message

When executing actions in this order, it should ensure that e.g. requests sent from a saga do not result in suddenly receiving replies before even having saved the saga data.

If this is not what you're seeing with the Kafka transport, then I my suspicion is that it has not been correctly implemented, and I also don't believe it would pass the transport contract tests (which can be seen here, in case you're curious: https://github.com/rebus-org/Rebus/tree/master/Rebus.Tests.Contracts/Transports )

glazkovalex commented 8 months ago

Hello, @mookid8000! Thank you for the quick response and the recommendation of the transport tests! Yes, these tests do not pass due to the specifics of the transport implementation, but this does not mean that the Rebus.Kafka transport does not work in most Rebus bus modes. Perhaps someday I will develop the Rebus.Kafka transport to fully comply with these tests. But for now, I have plans to increase support for Kafka modes within the partially supported Rebus capabilities. Such as: support for Avro and Protobuf format; Support for schema registries; Support for key partitioning; The external setting of the initial offset for the queue and other important Apache Kafka functions. It's a pity that you didn't have an example of Rebus configuration at hand for a reliable, fault-tolerant Saga. Thanks for telling me that I don't need to add an "outbox". On older versions of the libraries, I reproduced the problem described by @mkoziel2000. I was wrong, the implementation of the Sagas in the Rebus is good, the sagas already have an implementation of the "outbox" pattern. Sagas did not work correctly on Rebus.Kafka transport because they rely on Rebus transcriptions, which were not previously supported in Rebus.Kafka. 😕 When implementing the transport, I ignored all transactions, since Apache Kafka does not support any transactions. And it turned out that the "transactions" in Rebus are not transactions, but some kind of non-atomic delayed sending or not sending messages, which are actively used in Sagas. By the way, Rebus.Kafka versions 3.0.1 and later supports these "transactions" and sending confirmations of receiving messages only after their successful processing.
On the current version of Rebus 8.1 and Rebus.Kafka 3.2.1 transport, I selected the following set of parameters:

builder.ConfigureServices((hostContext, services) =>
{
    var consoleLoggerFactory = new ConsoleLoggerFactory(true) { MinLevel = LogLevel.Debug };
    services.AddSingleton(consoleLoggerFactory);
    services.AutoRegisterHandlersFromAssemblyOf<TestSaga>();
    var consumerConfig = new ConsumerAndBehaviorConfig(kafkaEndpoint, "temp") { BehaviorConfig = new ConsumerBehaviorConfig { CommitPeriod = 1 } };
    services.AddRebus((configurer, serviceProvider) => configurer
        .Logging(l => l.Use(consoleLoggerFactory))
        .Transport(t => t.UseKafka(kafkaEndpoint, $"{nameof(IdempotentSaga)}.queue", new ProducerConfig(), consumerConfig))
        .Sagas(s => s.StoreInPostgres(connectionString, "SagasData", "SagasIndex", true, null, schemaName: "rebus"))
        .Timeouts(t => t.StoreInPostgres(new PostgresConnectionHelper(connectionString), "Timeouts", true, "rebus"))
        .Options(o =>
        {
            o.EnableIdempotentSagas();
            o.RetryStrategy($"{nameof(IdempotentSaga)}.queue.error", 3);
            o.UseAttributeOrTypeFullNameForTopicNames();
        })
    );
});

I hope it's the right one. Using this example, I reproduced @mkoziel2000's case. I have checked the Sagas with various transport options many times - everything works correctly!

Detailed execution log ``` [INF] Rebus.Kafka.KafkaTransport (Thread #1): Initializing Kafka transport with queue "IdempotentSaga.queue" [DBG] Rebus.Kafka.Core.KafkaSubscriptionStorage (Thread #1): Consuming from Kafka. Client: 'rdkafka#consumer-2', message: '[thrd:app]: librdkafka v2.3.0 (0x20300ff) rdkafka#consumer-2 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer,http,oidc, SSL ZLIB SNAPPY ZSTD CURL SASL_SCRAM SASL_OAUTHBEARER PLUGINS HDRHISTOGRAM, debug 0x40)'. [INF] Rebus.Threading.TaskParallelLibrary.TplAsyncTask (Thread #1): Starting periodic task "CleanupTrackedErrors" with interval 00:00:10 [INF] Rebus.Threading.TaskParallelLibrary.TplAsyncTask (Thread #1): Starting periodic task "DueMessagesSender" with interval 00:00:01 [INF] Rebus.Bus.RebusBus (Thread #1): Bus "Rebus 1" started info: Rebus.ServiceProvider.Internals.RebusBackgroundService[0] Successfully created bus instance RebusBus Rebus 1 (isDefaultBus: True) [INF] Rebus.Bus.RebusBus (Thread #1): Bus "Rebus 1" setting number of workers to 1 [DBG] Rebus.Bus.RebusBus (Thread #1): Adding worker "Rebus 1 worker 1" [DBG] Rebus.Workers.ThreadPoolBased.ThreadPoolWorker (Rebus 1 worker 1): Starting (threadpool-based) worker "Rebus 1 worker 1" [DBG] Rebus.Kafka.Core.KafkaSubscriptionStorage (Rebus 1 worker 1): Assigned partitions: Topic:"IdempotentSaga.queue" Partition:0] [INF] Rebus.Kafka.Core.KafkaSubscriptionStorage (Rebus 1 worker 1): Subscribe on "IdempotentSaga.queue" [DBG] Rebus.Kafka.Core.KafkaSubscriptionStorage (Rebus 1 worker 1): Revoked partitions: Topic:"IdempotentSaga.queue" Partition:0] [DBG] Rebus.Kafka.Core.KafkaSubscriptionStorage (Rebus 1 worker 1): Assigned partitions: Topic:"IdempotentSaga.queue" Partition:0 Topic:"---Topic---.IdempotentSaga.KickoffSagaEvent" Partition:0] [INF] Rebus.Kafka.Core.KafkaSubscriptionStorage (Rebus 1 worker 1): Subscribe on "IdempotentSaga.KickoffSagaEvent" [DBG] Rebus.Kafka.Core.KafkaSubscriptionStorage (Rebus 1 worker 1): Revoked partitions: Topic:"---Topic---.IdempotentSaga.KickoffSagaEvent" Partition:0 Topic:"IdempotentSaga.queue" Partition:0] [DBG] Rebus.Kafka.Core.KafkaSubscriptionStorage (Rebus 1 worker 1): Assigned partitions: Topic:"IdempotentSaga.queue" Partition:0 Topic:"---Topic---.IdempotentSaga.Messages.SagaMessageEarth" Partition:0 Topic:"---Topic---.IdempotentSaga.KickoffSagaEvent" Partition:0] [INF] Rebus.Kafka.Core.KafkaSubscriptionStorage (Rebus 1 worker 1): Subscribe on "IdempotentSaga.Messages.SagaMessageEarth" [DBG] Rebus.Kafka.Core.KafkaSubscriptionStorage (Rebus 1 worker 1): Revoked partitions: Topic:"---Topic---.IdempotentSaga.KickoffSagaEvent" Partition:0 Topic:"---Topic---.IdempotentSaga.Messages.SagaMessageEarth" Partition:0 Topic:"IdempotentSaga.queue" Partition:0] [DBG] Rebus.Kafka.Core.KafkaSubscriptionStorage (Rebus 1 worker 1): Assigned partitions: Topic:"IdempotentSaga.queue" Partition:0 Topic:"---Topic---.IdempotentSaga.Messages.SagaMessageWind" Partition:0 Topic:"---Topic---.IdempotentSaga.Messages.SagaMessageEarth" Partition:0 Topic:"---Topic---.IdempotentSaga.KickoffSagaEvent" Partition:0] [INF] Rebus.Kafka.Core.KafkaSubscriptionStorage (Rebus 1 worker 1): Subscribe on "IdempotentSaga.Messages.SagaMessageWind" [DBG] Rebus.Kafka.Core.KafkaSubscriptionStorage (Rebus 1 worker 1): Revoked partitions: Topic:"---Topic---.IdempotentSaga.KickoffSagaEvent" Partition:0 Topic:"---Topic---.IdempotentSaga.Messages.SagaMessageEarth" Partition:0 Topic:"---Topic---.IdempotentSaga.Messages.SagaMessageWind" Partition:0 Topic:"IdempotentSaga.queue" Partition:0] [DBG] Rebus.Kafka.Core.KafkaSubscriptionStorage (Rebus 1 worker 1): Assigned partitions: Topic:"---Topic---.IdempotentSaga.KickoffSagaEvent" Partition:0 Topic:"IdempotentSaga.queue" Partition:0 Topic:"---Topic---.IdempotentSaga.Messages.SagaMessageWind" Partition:0 Topic:"---Topic---.IdempotentSaga.Messages.SagaMessageEarth" Partition:0 Topic:"---Topic---.IdempotentSaga.Messages.SagaMessageFire" Partition:0] [INF] Rebus.Kafka.Core.KafkaSubscriptionStorage (Rebus 1 worker 1): Subscribe on "IdempotentSaga.Messages.SagaMessageFire" [DBG] Rebus.Pipeline.Send.SendOutgoingMessageStep (Thread #1): Sending IdempotentSaga.Messages.KickoffSagaMessages -> "---Topic---.IdempotentSaga.KickoffSagaEvent" [DBG] Rebus.Kafka.Core.KafkaSubscriptionStorage (Thread #24): Consuming from Kafka. Client: 'rdkafka#consumer-2', message: '[thrd:confluent-kafka:9092/bootstrap]: confluent-kafka:9092/1: Topic ---Topic---.IdempotentSaga.KickoffSagaEvent [0] MessageSet size 510, error "Success", MaxOffset 30, LSO 30, Ver 22/22'. [DBG] Rebus.Kafka.Core.KafkaSubscriptionStorage (Thread #24): Consuming from Kafka. Client: 'rdkafka#consumer-2', message: '[thrd:confluent-kafka:9092/bootstrap]: confluent-kafka:9092/1: Enqueue 1 message(s) (57 bytes, 1 ops) on ---Topic---.IdempotentSaga.KickoffSagaEvent [0] fetch queue (qlen 0, v22, last_offset 29, 0 ctrl msgs, 0 aborted msgsets, uncompressed)'. [DBG] Rebus.Kafka.Dispatcher.CommitDispatcher (Rebus 1 worker 1): AppendMessage (Thread #21) message: 5b5e74fe-7f17-4d7c-8d65-5c2f192db812. Message infos: 5b5e74fe-7f17-4d7c-8d65-5c2f192db812; Processing; Topic:---Topic---.IdempotentSaga.KickoffSagaEvent, Partition:0, Offset:29 [DBG] Rebus.Kafka.KafkaTransport (.NET TP Worker): The following message was sent to the topic "---Topic---.IdempotentSaga.KickoffSagaEvent": { "Headers": { "rbs2-intent": "pub", "rbs2-msg-id": "5b5e74fe-7f17-4d7c-8d65-5c2f192db812", "rbs2-return-address": "IdempotentSaga.queue", "rbs2-senttime": "2024-01-28T02:49:39.7429634\u002B03:00", "rbs2-sender-address": "IdempotentSaga.queue", "rbs2-msg-type": "IdempotentSaga.Messages.KickoffSagaMessages, IdempotentSaga", "rbs2-corr-id": "5b5e74fe-7f17-4d7c-8d65-5c2f192db812", "rbs2-corr-seq": "0", "rbs2-content-type": "application/json;charset=utf-8" }, "Body": "eyJTYWdhSW5zdGFuY2VJZCI6ImM5YjljY2I0LTU2MTItNDcxMy04ZTU4LTE4Y2ZiNDZkYTUxOSJ9" } Press 'r' to repeat or Ctrl+z to exit. [DBG] Rebus.Sagas.LoadSagaDataStep (Rebus 1 worker 1): Created new saga data with ID b16855a6-3db2-4c22-bb61-1de3e2955826 for message "KickoffSagaMessages/5b5e74fe-7f17-4d7c-8d65-5c2f192db812" [INF] IdempotentSaga.Handlers.TestSaga (Rebus 1 worker 1): Processing Kickoff - c9b9ccb4-5612-4713-8e58-18cfb46da519 [DBG] Rebus.Pipeline.Send.SendOutgoingMessageStep (Rebus 1 worker 1): Sending IdempotentSaga.Messages.SagaMessageEarth -> "---Topic---.IdempotentSaga.Messages.SagaMessageEarth" [INF] IdempotentSaga.Handlers.TestSaga (Rebus 1 worker 1): Published Earth....Done Processing Kickoff - c9b9ccb4-5612-4713-8e58-18cfb46da519 [DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "IdempotentSaga.Messages.KickoffSagaMessages, IdempotentSaga" "5b5e74fe-7f17-4d7c-8d65-5c2f192db812" to 1 handlers took 3 ms [DBG] Rebus.Kafka.KafkaTransport (.NET TP Worker): The following message was sent to the topic "---Topic---.IdempotentSaga.Messages.SagaMessageEarth": { "Headers": { "rbs2-intent": "pub", "rbs2-msg-id": "4a5df8ac-37c0-4bd1-b6e4-7d4df03bc4b9", "rbs2-return-address": "IdempotentSaga.queue", "rbs2-senttime": "2024-01-28T02:49:40.0181498\u002B03:00", "rbs2-sender-address": "IdempotentSaga.queue", "rbs2-msg-type": "IdempotentSaga.Messages.SagaMessageEarth, IdempotentSaga", "rbs2-corr-id": "5b5e74fe-7f17-4d7c-8d65-5c2f192db812", "rbs2-corr-seq": "1", "rbs2-content-type": "application/json;charset=utf-8" }, "Body": "eyJTYWdhSW5zdGFuY2VJZCI6ImM5YjljY2I0LTU2MTItNDcxMy04ZTU4LTE4Y2ZiNDZkYTUxOSJ9" } [DBG] Rebus.Kafka.Core.KafkaSubscriptionStorage (Thread #24): Consuming from Kafka. Client: 'rdkafka#consumer-2', message: '[thrd:confluent-kafka:9092/bootstrap]: confluent-kafka:9092/1: Topic ---Topic---.IdempotentSaga.Messages.SagaMessageEarth [0] MessageSet size 507, error "Success", MaxOffset 82, LSO 82, Ver 16/16'. [DBG] Rebus.Kafka.KafkaTransport (.NET TP Worker): context.OnAck : { "Headers": { "rbs2-intent": "pub", "rbs2-msg-id": "5b5e74fe-7f17-4d7c-8d65-5c2f192db812", "rbs2-return-address": "IdempotentSaga.queue", "rbs2-senttime": "2024-01-28T02:49:39.7429634\u002B03:00", "rbs2-sender-address": "IdempotentSaga.queue", "rbs2-msg-type": "IdempotentSaga.Messages.KickoffSagaMessages, IdempotentSaga", "rbs2-corr-id": "5b5e74fe-7f17-4d7c-8d65-5c2f192db812", "rbs2-corr-seq": "0", "rbs2-content-type": "application/json;charset=utf-8" }, "Body": "eyJTYWdhSW5zdGFuY2VJZCI6ImM5YjljY2I0LTU2MTItNDcxMy04ZTU4LTE4Y2ZiNDZkYTUxOSJ9" } [DBG] Rebus.Kafka.Core.KafkaSubscriptionStorage (Thread #24): Consuming from Kafka. Client: 'rdkafka#consumer-2', message: '[thrd:confluent-kafka:9092/bootstrap]: confluent-kafka:9092/1: Enqueue 1 message(s) (57 bytes, 1 ops) on ---Topic---.IdempotentSaga.Messages.SagaMessageEarth [0] fetch queue (qlen 0, v16, last_offset 81, 0 ctrl msgs, 0 aborted msgsets, uncompressed)'. [DBG] Rebus.Kafka.Dispatcher.CommitDispatcher (.NET TP Worker): Completing message: 5b5e74fe-7f17-4d7c-8d65-5c2f192db812. Message infos: 5b5e74fe-7f17-4d7c-8d65-5c2f192db812; Completed; Topic:---Topic---.IdempotentSaga.KickoffSagaEvent, Partition:0, Offset:29 [DBG] Rebus.Kafka.Dispatcher.CommitDispatcher (Rebus 1 worker 1): AppendMessage (Thread #21) message: 4a5df8ac-37c0-4bd1-b6e4-7d4df03bc4b9. Message infos: 5b5e74fe-7f17-4d7c-8d65-5c2f192db812; Completed; Topic:---Topic---.IdempotentSaga.KickoffSagaEvent, Partition:0, Offset:29 4a5df8ac-37c0-4bd1-b6e4-7d4df03bc4b9; Processing; Topic:---Topic---.IdempotentSaga.Messages.SagaMessageEarth, Partition:0, Offset:81 [DBG] Rebus.Kafka.Dispatcher.CommitDispatcher (.NET TP Worker): TryCommitLastBlock offsets: Topic:---Topic---.IdempotentSaga.KickoffSagaEvent, Partition:0, Offset:29. Message infos: 4a5df8ac-37c0-4bd1-b6e4-7d4df03bc4b9; Processing; Topic:---Topic---.IdempotentSaga.Messages.SagaMessageEarth, Partition:0, Offset:81 [DBG] Rebus.Sagas.LoadSagaDataStep (.NET TP Worker): Found existing saga data with ID b16855a6-3db2-4c22-bb61-1de3e2955826 for message "SagaMessageEarth/4a5df8ac-37c0-4bd1-b6e4-7d4df03bc4b9" [INF] IdempotentSaga.Handlers.TestSaga (.NET TP Worker): Processing Earth - c9b9ccb4-5612-4713-8e58-18cfb46da519 [DBG] Rebus.Pipeline.Send.SendOutgoingMessageStep (.NET TP Worker): Sending IdempotentSaga.Messages.SagaMessageWind -> "---Topic---.IdempotentSaga.Messages.SagaMessageWind" [INF] IdempotentSaga.Handlers.TestSaga (.NET TP Worker): NOT Completed everything for c9b9ccb4-5612-4713-8e58-18cfb46da519: True,False,False [INF] IdempotentSaga.Handlers.TestSaga (.NET TP Worker): Published Wind...Done Processing Earth - c9b9ccb4-5612-4713-8e58-18cfb46da519 [DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (.NET TP Worker): Dispatching "IdempotentSaga.Messages.SagaMessageEarth, IdempotentSaga" "4a5df8ac-37c0-4bd1-b6e4-7d4df03bc4b9" to 1 handlers took 4 ms [DBG] Rebus.Kafka.KafkaTransport (.NET TP Worker): The following message was sent to the topic "---Topic---.IdempotentSaga.Messages.SagaMessageWind": { "Headers": { "rbs2-intent": "pub", "rbs2-msg-id": "0b7b3e48-6723-4593-84d1-54e2e6a7b1ae", "rbs2-return-address": "IdempotentSaga.queue", "rbs2-senttime": "2024-01-28T02:49:40.9934055\u002B03:00", "rbs2-sender-address": "IdempotentSaga.queue", "rbs2-msg-type": "IdempotentSaga.Messages.SagaMessageWind, IdempotentSaga", "rbs2-corr-id": "5b5e74fe-7f17-4d7c-8d65-5c2f192db812", "rbs2-corr-seq": "2", "rbs2-content-type": "application/json;charset=utf-8" }, "Body": "eyJTYWdhSW5zdGFuY2VJZCI6ImM5YjljY2I0LTU2MTItNDcxMy04ZTU4LTE4Y2ZiNDZkYTUxOSJ9" } [DBG] Rebus.Kafka.KafkaTransport (.NET TP Worker): context.OnAck : { "Headers": { "rbs2-intent": "pub", "rbs2-msg-id": "4a5df8ac-37c0-4bd1-b6e4-7d4df03bc4b9", "rbs2-return-address": "IdempotentSaga.queue", "rbs2-senttime": "2024-01-28T02:49:40.0181498\u002B03:00", "rbs2-sender-address": "IdempotentSaga.queue", "rbs2-msg-type": "IdempotentSaga.Messages.SagaMessageEarth, IdempotentSaga", "rbs2-corr-id": "5b5e74fe-7f17-4d7c-8d65-5c2f192db812", "rbs2-corr-seq": "1", "rbs2-content-type": "application/json;charset=utf-8" }, "Body": "eyJTYWdhSW5zdGFuY2VJZCI6ImM5YjljY2I0LTU2MTItNDcxMy04ZTU4LTE4Y2ZiNDZkYTUxOSJ9" } [DBG] Rebus.Kafka.Dispatcher.CommitDispatcher (.NET TP Worker): Completing message: 4a5df8ac-37c0-4bd1-b6e4-7d4df03bc4b9. Message infos: 4a5df8ac-37c0-4bd1-b6e4-7d4df03bc4b9; Completed; Topic:---Topic---.IdempotentSaga.Messages.SagaMessageEarth, Partition:0, Offset:81 [DBG] Rebus.Kafka.Dispatcher.CommitDispatcher (.NET TP Worker): TryCommitLastBlock offsets: Topic:---Topic---.IdempotentSaga.Messages.SagaMessageEarth, Partition:0, Offset:81. Message infos: ---- [DBG] Rebus.Kafka.Core.KafkaSubscriptionStorage (Thread #24): Consuming from Kafka. Client: 'rdkafka#consumer-2', message: '[thrd:confluent-kafka:9092/bootstrap]: confluent-kafka:9092/1: Topic ---Topic---.IdempotentSaga.Messages.SagaMessageWind [0] MessageSet size 506, error "Success", MaxOffset 102, LSO 102, Ver 10/10'. [DBG] Rebus.Kafka.Core.KafkaSubscriptionStorage (Thread #24): Consuming from Kafka. Client: 'rdkafka#consumer-2', message: '[thrd:confluent-kafka:9092/bootstrap]: confluent-kafka:9092/1: Enqueue 1 message(s) (57 bytes, 1 ops) on ---Topic---.IdempotentSaga.Messages.SagaMessageWind [0] fetch queue (qlen 0, v10, last_offset 101, 0 ctrl msgs, 0 aborted msgsets, uncompressed)'. [DBG] Rebus.Kafka.Dispatcher.CommitDispatcher (Rebus 1 worker 1): AppendMessage (Thread #21) message: 0b7b3e48-6723-4593-84d1-54e2e6a7b1ae. Message infos: 0b7b3e48-6723-4593-84d1-54e2e6a7b1ae; Processing; Topic:---Topic---.IdempotentSaga.Messages.SagaMessageWind, Partition:0, Offset:101 [DBG] Rebus.Sagas.LoadSagaDataStep (.NET TP Worker): Found existing saga data with ID b16855a6-3db2-4c22-bb61-1de3e2955826 for message "SagaMessageWind/0b7b3e48-6723-4593-84d1-54e2e6a7b1ae" [INF] IdempotentSaga.Handlers.TestSaga (.NET TP Worker): Processing Wind - c9b9ccb4-5612-4713-8e58-18cfb46da519 [DBG] Rebus.Pipeline.Send.SendOutgoingMessageStep (.NET TP Worker): Sending IdempotentSaga.Messages.SagaMessageFire -> "---Topic---.IdempotentSaga.Messages.SagaMessageFire" [INF] IdempotentSaga.Handlers.TestSaga (.NET TP Worker): NOT Completed everything for c9b9ccb4-5612-4713-8e58-18cfb46da519: True,True,False [INF] IdempotentSaga.Handlers.TestSaga (.NET TP Worker): Published Fire...Done Processing Wind - c9b9ccb4-5612-4713-8e58-18cfb46da519 [DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (.NET TP Worker): Dispatching "IdempotentSaga.Messages.SagaMessageWind, IdempotentSaga" "0b7b3e48-6723-4593-84d1-54e2e6a7b1ae" to 1 handlers took 2 ms [DBG] Rebus.Kafka.KafkaTransport (.NET TP Worker): The following message was sent to the topic "---Topic---.IdempotentSaga.Messages.SagaMessageFire": { "Headers": { "rbs2-intent": "pub", "rbs2-msg-id": "f584ad11-761a-4280-a1d1-0f6c2d6a1f5c", "rbs2-return-address": "IdempotentSaga.queue", "rbs2-senttime": "2024-01-28T02:49:42.0126297\u002B03:00", "rbs2-sender-address": "IdempotentSaga.queue", "rbs2-msg-type": "IdempotentSaga.Messages.SagaMessageFire, IdempotentSaga", "rbs2-corr-id": "5b5e74fe-7f17-4d7c-8d65-5c2f192db812", "rbs2-corr-seq": "3", "rbs2-content-type": "application/json;charset=utf-8" }, "Body": "eyJTYWdhSW5zdGFuY2VJZCI6ImM5YjljY2I0LTU2MTItNDcxMy04ZTU4LTE4Y2ZiNDZkYTUxOSJ9" } [DBG] Rebus.Kafka.Core.KafkaSubscriptionStorage (Thread #24): Consuming from Kafka. Client: 'rdkafka#consumer-2', message: '[thrd:confluent-kafka:9092/bootstrap]: confluent-kafka:9092/1: Topic ---Topic---.IdempotentSaga.Messages.SagaMessageFire [0] MessageSet size 506, error "Success", MaxOffset 106, LSO 106, Ver 4/4'. [DBG] Rebus.Kafka.KafkaTransport (.NET TP Worker): context.OnAck : { "Headers": { "rbs2-intent": "pub", "rbs2-msg-id": "0b7b3e48-6723-4593-84d1-54e2e6a7b1ae", "rbs2-return-address": "IdempotentSaga.queue", "rbs2-senttime": "2024-01-28T02:49:40.9934055\u002B03:00", "rbs2-sender-address": "IdempotentSaga.queue", "rbs2-msg-type": "IdempotentSaga.Messages.SagaMessageWind, IdempotentSaga", "rbs2-corr-id": "5b5e74fe-7f17-4d7c-8d65-5c2f192db812", "rbs2-corr-seq": "2", "rbs2-content-type": "application/json;charset=utf-8" }, "Body": "eyJTYWdhSW5zdGFuY2VJZCI6ImM5YjljY2I0LTU2MTItNDcxMy04ZTU4LTE4Y2ZiNDZkYTUxOSJ9" } [DBG] Rebus.Kafka.Core.KafkaSubscriptionStorage (Thread #24): Consuming from Kafka. Client: 'rdkafka#consumer-2', message: '[thrd:confluent-kafka:9092/bootstrap]: confluent-kafka:9092/1: Enqueue 1 message(s) (57 bytes, 1 ops) on ---Topic---.IdempotentSaga.Messages.SagaMessageFire [0] fetch queue (qlen 0, v4, last_offset 105, 0 ctrl msgs, 0 aborted msgsets, uncompressed)'. [DBG] Rebus.Kafka.Dispatcher.CommitDispatcher (.NET TP Worker): Completing message: 0b7b3e48-6723-4593-84d1-54e2e6a7b1ae. Message infos: 0b7b3e48-6723-4593-84d1-54e2e6a7b1ae; Completed; Topic:---Topic---.IdempotentSaga.Messages.SagaMessageWind, Partition:0, Offset:101 [DBG] Rebus.Kafka.Dispatcher.CommitDispatcher (Rebus 1 worker 1): AppendMessage (Thread #21) message: f584ad11-761a-4280-a1d1-0f6c2d6a1f5c. Message infos: f584ad11-761a-4280-a1d1-0f6c2d6a1f5c; Processing; Topic:---Topic---.IdempotentSaga.Messages.SagaMessageFire, Partition:0, Offset:105 0b7b3e48-6723-4593-84d1-54e2e6a7b1ae; Completed; Topic:---Topic---.IdempotentSaga.Messages.SagaMessageWind, Partition:0, Offset:101 [DBG] Rebus.Kafka.Dispatcher.CommitDispatcher (.NET TP Worker): TryCommitLastBlock offsets: Topic:---Topic---.IdempotentSaga.Messages.SagaMessageWind, Partition:0, Offset:101. Message infos: f584ad11-761a-4280-a1d1-0f6c2d6a1f5c; Processing; Topic:---Topic---.IdempotentSaga.Messages.SagaMessageFire, Partition:0, Offset:105 [DBG] Rebus.Sagas.LoadSagaDataStep (.NET TP Worker): Found existing saga data with ID b16855a6-3db2-4c22-bb61-1de3e2955826 for message "SagaMessageFire/f584ad11-761a-4280-a1d1-0f6c2d6a1f5c" [INF] IdempotentSaga.Handlers.TestSaga (.NET TP Worker): Processing Fire - c9b9ccb4-5612-4713-8e58-18cfb46da519 [INF] IdempotentSaga.Handlers.TestSaga (.NET TP Worker): Completed everything for c9b9ccb4-5612-4713-8e58-18cfb46da519: "Initiated;Earth;Wind;Fire;" [INF] IdempotentSaga.Handlers.TestSaga (.NET TP Worker): Done Processing Fire - c9b9ccb4-5612-4713-8e58-18cfb46da519 [DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (.NET TP Worker): Dispatching "IdempotentSaga.Messages.SagaMessageFire, IdempotentSaga" "f584ad11-761a-4280-a1d1-0f6c2d6a1f5c" to 1 handlers took 2 ms [DBG] Rebus.Kafka.KafkaTransport (.NET TP Worker): context.OnAck : { "Headers": { "rbs2-intent": "pub", "rbs2-msg-id": "f584ad11-761a-4280-a1d1-0f6c2d6a1f5c", "rbs2-return-address": "IdempotentSaga.queue", "rbs2-senttime": "2024-01-28T02:49:42.0126297\u002B03:00", "rbs2-sender-address": "IdempotentSaga.queue", "rbs2-msg-type": "IdempotentSaga.Messages.SagaMessageFire, IdempotentSaga", "rbs2-corr-id": "5b5e74fe-7f17-4d7c-8d65-5c2f192db812", "rbs2-corr-seq": "3", "rbs2-content-type": "application/json;charset=utf-8" }, "Body": "eyJTYWdhSW5zdGFuY2VJZCI6ImM5YjljY2I0LTU2MTItNDcxMy04ZTU4LTE4Y2ZiNDZkYTUxOSJ9" } [DBG] Rebus.Kafka.Dispatcher.CommitDispatcher (.NET TP Worker): Completing message: f584ad11-761a-4280-a1d1-0f6c2d6a1f5c. Message infos: f584ad11-761a-4280-a1d1-0f6c2d6a1f5c; Completed; Topic:---Topic---.IdempotentSaga.Messages.SagaMessageFire, Partition:0, Offset:105 [DBG] Rebus.Kafka.Dispatcher.CommitDispatcher (.NET TP Worker): TryCommitLastBlock offsets: Topic:---Topic---.IdempotentSaga.Messages.SagaMessageFire, Partition:0, Offset:105. Message infos: ---- →[INF] Rebus.Bus.RebusBus (.NET TP Worker): Bus "Rebus 1" setting number of workers to 0 [DBG] Rebus.Bus.RebusBus (.NET TP Worker): Removing worker "Rebus 1 worker 1" [WRN] Rebus.Kafka.Core.KafkaSubscriptionStorage (Rebus 1 worker 1): Consume warning: The operation was canceled. [DBG] Rebus.Workers.ThreadPoolBased.ThreadPoolWorker (Rebus 1 worker 1): Worker "Rebus 1 worker 1" stopped [INF] Rebus.Threading.TaskParallelLibrary.TplAsyncTask (.NET TP Worker): Stopping periodic task "DueMessagesSender" [INF] Rebus.Threading.TaskParallelLibrary.TplAsyncTask (.NET TP Worker): Stopping periodic task "CleanupTrackedErrors" [DBG] Rebus.Kafka.Core.KafkaSubscriptionStorage (.NET TP Worker): Revoked partitions: Topic:"---Topic---.IdempotentSaga.KickoffSagaEvent" Partition:0 Topic:"---Topic---.IdempotentSaga.Messages.SagaMessageEarth" Partition:0 Topic:"---Topic---.IdempotentSaga.Messages.SagaMessageFire" Partition:0 Topic:"---Topic---.IdempotentSaga.Messages.SagaMessageWind" Partition:0 Topic:"IdempotentSaga.queue" Partition:0] [INF] Rebus.Kafka.Core.KafkaSubscriptionStorage (.NET TP Worker): Closed consumer BootstrapServers:confluent-kafka:9092, gropId: temp. [INF] Rebus.Bus.RebusBus (.NET TP Worker): Bus "Rebus 1" stopped info: Rebus.ServiceProvider.Internals.RebusBackgroundService[0] Bus instance RebusBus Rebus 1 successfully disposed ````

Having discovered your simplified version of the saga test without persistence, I added it to my tests just in case. Of course, I have it with Rebus.Kafka 3.2.1 transport also successfully passes. In terms of providing more complete support for the transport contract, I have a few small questions for you about the implementation of the contract. And also a couple of suggestions for improving the implementation of idempotent Sagas. I will ask you these questions in a separate issues, in the hope that you will help me with them a little.