Eventuous / eventuous

Event Sourcing library for .NET
https://eventuous.dev
Apache License 2.0
442 stars 70 forks source link

[Aggregate Store] Non-intrusive aggregate caching #259

Closed makp0 closed 12 months ago

makp0 commented 1 year ago

Building on top of #220, here's an approach I've used to battle increasing aggregate creation time with each event.

Setup

services.Decorate<IAggregateStore, AggregateStoreCachingDecorator>(); // provided by Scrutor nuget
public class AggregateStoreCachingDecorator : IAggregateStore {
    readonly IEventReader             _eventReader;
    readonly AggregateFactoryRegistry _factoryRegistry;
    readonly IAggregateStore          _implementation;
    readonly IMemoryCache             _memoryCache;

    AggregateStoreCachingDecorator(
            IAggregateStore          implementation,
            IMemoryCache             memoryCache,
            IEventReader             eventReader,
            AggregateFactoryRegistry factoryRegistry
        ) {
        _implementation  = implementation;
        _memoryCache     = memoryCache;
        _eventReader     = eventReader;
        _factoryRegistry = factoryRegistry;
    }

    public AggregateStoreCachingDecorator(
            IAggregateStore          implementation,
            IMemoryCache             memoryCache,
            IEventStore              eventStore,
            AggregateFactoryRegistry factoryRegistry
        )
        : this(implementation, memoryCache, (IEventReader)eventStore, factoryRegistry) {}

    public Task<AppendEventsResult> Store<T>(StreamName streamName, T aggregate, CancellationToken cancellationToken) where T : Aggregate {
        Cache(streamName, aggregate);

        return _implementation.Store(streamName, aggregate, cancellationToken);
    }

    public Task<T> Load<T>(StreamName streamName, CancellationToken cancellationToken) where T : Aggregate
        => LoadInternal<T>(streamName, true, cancellationToken);

    public Task<T> LoadOrNew<T>(StreamName streamName, CancellationToken cancellationToken) where T : Aggregate
        => LoadInternal<T>(streamName, false, cancellationToken);

    async Task<T> LoadInternal<T>(StreamName streamName, bool failIfNotFound, CancellationToken cancellationToken) where T : Aggregate {
        var aggregate = _factoryRegistry.CreateInstance<T>();

        var cachedEvents = GetCachedEvents();

        try {
            var events = await _eventReader.ReadStream(streamName, new StreamReadPosition(cachedEvents.LongLength), failIfNotFound, cancellationToken)
                .ConfigureAwait(false);
            aggregate.Load(cachedEvents.Concat(events.Select(x => x.Payload)));
            Cache(streamName, aggregate);
        } catch (StreamNotFound) when (!failIfNotFound) {
            return aggregate;
        } catch (Exception e) {
            Log.UnableToLoadAggregate<T>(streamName, e);

            throw e is StreamNotFound ? new AggregateNotFoundException<T>(streamName, e) : e;
        }

        return aggregate;

        object[] GetCachedEvents() => _memoryCache.Get<IEnumerable<object>>(streamName)?.ToArray() ?? Array.Empty<object>();
    }

    void Cache<T>(StreamName streamName, T aggregate) where T : Aggregate
        => _memoryCache.Set(streamName, aggregate.Current.AsEnumerable());
}
alexeyzimarev commented 12 months ago

Isn't it related to #157?

makp0 commented 12 months ago

Isn't it related to #157?

It also is. But I based this work on the one mentioned.