reactivemarbles / DynamicData

Reactive collections based on Rx.Net
http://dynamic-data.org
MIT License
1.73k stars 182 forks source link

[Feature]: `IObservableCache.TransformMany` with force transform param #899

Open Metadorius opened 6 months ago

Metadorius commented 6 months ago

Describe the functionality desired 🐞

Currently only Transform has an overload which supports refreshing the derived cache with another observable. This is very useful when doing a transform which depends on external to the cache data which can be changed, thus invalidating the derived cache data, but TransformMany doesn't have a similar override.

Considerations

Refreshing the original source on external data changes may be not desirable due to other effects and also isn't obvious

dwcullop commented 6 months ago

I'm assuming you're talking about the IObservable<Unit> forceTransform parameter found in this overload.

IObservable<IChangeSet<TDestination, TKey>> Transform<TDestination, TSource, TKey>(this IObservable<IChangeSet<TSource, TKey>> source, Func<TSource, TKey, TDestination> transformFactory, IObservable<Unit> forceTransform)

It's probably missing from TransformMany because of performance. Having to re-transform every single value is expensive and it would be worse if each value transforms into a collection of values.

My suggestion would be to check out one of the new operators such as TransformManyAsync or MergeManyChangeSets. TransformManyAsync allows you to transform each value into an ObservableCollection or a SourceCache of values which all all merged together.

MergeManyChangeSets allows you to transform each value into a ChangeSet, which is all merged together.

When your external data is updated, it can update the changeset, and then results will automatically be propagated.

This will be a lot more performant that just transforming every single value over and over again.

Metadorius commented 6 months ago

In my case I don't really care about performance this much since I am evaluating this once when the source values are edited from UI, and I had to introduce another observable stream to update because conditions which impact calculation of TransformMany have changed.

What I did in the end was to snatch the refresh logic and introduce it as a separate .RefreshOn(IObservable<> refresher) operator which I can use wherever I need. Works flawlessly and I think could be a good addition to the library.

    public static IObservable<IChangeSet<TSource, TKey>> RefreshOn<TSource, TKey>(
            this IObservable<IChangeSet<TSource, TKey>> source,
            IObservable<Func<TSource, TKey, bool>> refresher) 
        where TSource : notnull
        where TKey : notnull
        => Observable.Create<IChangeSet<TSource, TKey>>(observer =>
        {
            var locker = new object();
            var shared = source.Synchronize(locker).Publish();

            // capture all items so we can apply a forced transform
            var cache = new Dictionary<TKey, TSource>();
            var cacheLoader = shared.Subscribe(changes =>
            {
                foreach (var item in (changes as ChangeSet<TSource, TKey>)!)
                {
                    switch (item.Reason)
                    {
                        case ChangeReason.Update:
                        case ChangeReason.Add:
                            cache[item.Key] = item.Current;
                            break;

                        case ChangeReason.Remove:
                            cache.Remove(item.Key);
                            break;
                    }
                }
            });

            // create change set of items where force refresh is applied
            var refreshes = refresher.Synchronize(locker)
                .Select(selector => cache
                    .Where(kvp => selector(kvp.Value, kvp.Key))
                    .Select(kvp => new Change<TSource, TKey>(ChangeReason.Refresh, kvp.Key, kvp.Value)))
                .Select(changes => new ChangeSet<TSource, TKey>(changes)).NotEmpty();

            var sourceAndRefreshes = shared.Merge(refreshes);

            return new CompositeDisposable(cacheLoader, sourceAndRefreshes.SubscribeSafe(observer), shared.Connect());
        });

    public static IObservable<IChangeSet<TSource, TKey>> RefreshOn<TSource, TKey>(
            this IObservable<IChangeSet<TSource, TKey>> source, 
            IObservable<Func<TSource, bool>> refresher)
        where TSource : notnull
        where TKey : notnull
        => source.RefreshOn(refresher.Select(selector => new Func<TSource, TKey, bool>((value, _) => selector(value))));

    public static IObservable<IChangeSet<TSource, TKey>> RefreshOn<TSource, TKey>(
            this IObservable<IChangeSet<TSource, TKey>> source,
            IObservable<Unit> refresher)
        where TSource : notnull
        where TKey : notnull
        => source.RefreshOn(refresher.Select(_ => new Func<TSource, TKey, bool>((_, _) => true)));
dwcullop commented 1 month ago

If that's all you needed, have you considered AutoRefreshOnObservable? It is very similar except structured a little differently. Having an IObservable<Func<T, TKey, bool>> seems a little unusual.

If you really think it should be part of the library, you should consider submitting a PR that wraps that logic in a class with a Run method (which is Roland's convention) and some appropriate unit tests.

But you should also worry about performance because it will need to work for every situation, not just yours.