rebus-org / Rebus

:bus: Simple and lean service bus implementation for .NET
https://mookid.dk/category/rebus
Other
2.31k stars 361 forks source link

Saga Handlers are caching root properties -- WasMarkedAsComplete #865

Closed hanyelemary closed 4 years ago

hanyelemary commented 4 years ago

Hello,

We're having an issue where root properties are persisting/being cached across different Saga instances.

Here is an example. We have a Saga that is initiated by an event (PaymentReceivedEvent) and handles two other events that could mark completeness. Here is the flow at a high level.

PaymentReceivedEvent -> PaymentProcessedEvent -> PrintReceiptCommand

The flow goes like this. We receive a payment. If the payment is declined (PaymentProcessedEvent), we mark the Saga as complete inside the handler for PaymentProcessedEvent. If the payment is approved, then we move on to PrintReceiptCommand and then we complete the saga. This is a much more simplified view of what we're working on (which needs to hold payment data state and correlate them once a payment is processed).

The problem we're seeing here is that when PaymentProcessedEvent carries an approval after a decline, then was marked as complete is still true from the previous transaction. Therefore, the handler for PrintReceiptCommand never gets invoked as the Saga has been deleted.

The handler result returned from the method below appears to be caching the root properties of all instances of our Saga.

/// <summary>
/// Returns all relevant handler instances for the given message by looking up compatible registered functions and instance factory methods.
/// </summary>
public async Task<IEnumerable<IHandleMessages<TMessage>>> GetHandlers<TMessage>(TMessage message, ITransactionContext transactionContext)
{
    if (message == null) throw new ArgumentNullException(nameof(message));
    if (transactionContext == null) throw new ArgumentNullException(nameof(transactionContext));
    var messageContext = MessageContext.Current;
    if (messageContext == null)
    {
        throw new InvalidOperationException("Attempted to resolve handler with message context, but no current context could be found on MessageContext.Current");
    }
    var handlerFactories = _cachedHandlerFactories.GetOrAdd(typeof(TMessage), type =>
    {
        var noArgumentsInvokers = _handlerFactoriesNoArguments
            .OfType<Func<IHandleMessages<TMessage>>>()
            .Select(factory => (Func<IMessageContext, IHandleMessages>)(context => factory()));
        var contextArgumentInvokers = _handlerFactoriesMessageContextArgument
            .OfType<Func<IMessageContext, IHandleMessages<TMessage>>>()
            .Select(factory => (Func<IMessageContext, IHandleMessages>)(factory));
        var busAndContextInvokers = _handlerFactoriesBusAndMessageContextArguments
            .OfType<Func<IBus, IMessageContext, IHandleMessages<TMessage>>>()
            .Select(factory => (Func<IMessageContext, IHandleMessages>)(context => factory(Bus, context)));
        return noArgumentsInvokers.Concat(contextArgumentInvokers).Concat(busAndContextInvokers).ToArray();
    });
    // ReSharper disable once CoVariantArrayConversion
    var instances = (IHandleMessages<TMessage>[])_cachedHandlers.GetOrAdd(typeof(TMessage), type => _handlerInstances
        .OfType<IHandleMessages<TMessage>>().ToArray());
    var result = new IHandleMessages<TMessage>[handlerFactories.Length + instances.Length];
    for (var index = 0; index < handlerFactories.Length; index++)
    {
        result[index] = (IHandleMessages<TMessage>)handlerFactories[index](messageContext);
    }
    transactionContext.OnDisposed(context =>
    {
        for (var index = 0; index < handlerFactories.Length; index++)
        {
            if (result[index] is IDisposable disposable)
            {
                disposable.Dispose();
            }
        }
    });
    Array.Copy(instances, 0, result, handlerFactories.Length, instances.Length);
    return result;
}

Do you have any insight into this?

mookid8000 commented 4 years ago

Looks like you're using the built-in handler activator.

Could you show me how you are registering your saga handler?

yujie900 commented 4 years ago

Hi @mookid8000, I'm on the same team as @hanyelemary and would like to answer your question.

We were initially calling activator.Register(() => handler); for each message type handled in the Saga, then we tried just registering the Saga itself, ie activator.Register(() => saga); Both methods did not work.

The code below is how we manage our subscriptions, including registering the saga with RegisterCommandHandler<T>(IHandleMessages<T> handler)

    public class Subscriber : ISimpleSubscriber, IDisposable
    {
        private readonly string inputQueueName;
        private readonly string addressName;
        private readonly IMemoryNetwork inMemNetwork;
        private readonly ISubscriberStore subscriberStore;
        private readonly BuiltinHandlerActivator activator;

        public Subscriber(
            string inputQueueName,
            string addressName,
            IMemoryNetwork inMemNetwork,
            ISubscriberStore subscriberStore)
        {
            this.inputQueueName = inputQueueName;
            this.addressName = addressName;
            this.inMemNetwork = inMemNetwork;
            this.subscriberStore = subscriberStore;
            activator = new BuiltinHandlerActivator();
        }

        public void RegisterCommandHandler<T>(IHandleMessages<T> handler)
        {
            activator.Register(() => handler);
        }

        public void Initialize(ICollection<Type> subscriptionTypesCollection)
        {
            SubscriptionCollection = subscriptionTypesCollection;
            Configure
                .With(activator)
                .Logging(l => l.None())
                .Transport(t => t.UseInMemoryTransport(inMemNetwork.Network, inputQueueName))
                .Subscriptions(s => s.StoreInMemory(subscriberStore.Store))
                .Options(option =>
                {
                    option.SimpleRetryStrategy(secondLevelRetriesEnabled:false,maxDeliveryAttempts: 1);
                    option.SetBackoffTimes(
                        TimeSpan.FromMilliseconds(50),
                        TimeSpan.FromMilliseconds(50),
                        TimeSpan.FromMilliseconds(50)
                    );
                    option.SetWorkerShutdownTimeout(TimeSpan.FromSeconds(10));
                    option.SetNumberOfWorkers(1);
                    option.SetMaxParallelism(1);
                })
                .Sagas(s => s.StoreInMemory())
                .Routing(r =>
                {
                    var routing = r.TypeBased();
                    foreach (var subscriptionType in subscriptionTypesCollection)
                    {
                        routing.Map(subscriptionType, addressName);
                    }
                })
                .Start();
            foreach (var subscriptionType in SubscriptionCollection)
            {
                activator.Bus.Subscribe(subscriptionType).Wait();
            }
        }

        public void Dispose()
        {
            activator.Dispose();
            Console.WriteLine(inputQueueName + " Subscriber disposed");
        }

        public ICollection<Type> SubscriptionCollection { get; private set; }
    }
mookid8000 commented 4 years ago

Here's the reason why the same handler instance is being returned every time you receive a message:

public void RegisterCommandHandler<T>(IHandleMessages<T> handler)
{
    activator.Register(() => handler);
}

You need to turn the argument into a handler factory, so that Rebus can create a new instance each time a message is received:

public void RegisterCommandHandler<T>(Func<IHandleMessages<T>> handlerFactory)
{
    activator.Register(() => handlerFactory());
}