dotnet / reactive

The Reactive Extensions for .NET
http://reactivex.io
MIT License
6.69k stars 749 forks source link

Replay().RefCount() keeps values after all subscriptions are disposed #1218

Open ilnur-nazmutdinov-spark opened 4 years ago

ilnur-nazmutdinov-spark commented 4 years ago

I use Replay().RefCount() and have several subscribers to it. After all subscribers are unsubscribed original observable stops producing values. After that I subscribe to it again. The new subscriber receives old values of Replay(). It also doesn't have values produced in the interval when there were no subscribers. For this test:

        [Fact]
        public void RefCount_Replay()
        {
            var subject = new Subject<int>();
            var obs = Observable.Create<int>(subscriber =>
            {
                testOutputHelper.WriteLine("Create");
                var subscription = subject.Do(subscriber.OnNext).Subscribe();
                return () =>
                {
                    subscription.Dispose();
                    testOutputHelper.WriteLine("Dispose");
                };
            });
            var shared = obs
                .Do(val => testOutputHelper.WriteLine($"publish value: {val}"))
                .Replay().RefCount();

            subject.OnNext(1);

            testOutputHelper.WriteLine(string.Empty);
            var subscriber1 = shared.Subscribe(val => testOutputHelper.WriteLine($"subscriber #1: {val}"));

            subject.OnNext(2);

            testOutputHelper.WriteLine(string.Empty);
            var subscriber2 = shared.Subscribe(val => testOutputHelper.WriteLine($"subscriber #2: {val}"));

            subject.OnNext(3);

            testOutputHelper.WriteLine(string.Empty);
            subscriber1.Dispose();
            subscriber2.Dispose();

            subject.OnNext(4);

            testOutputHelper.WriteLine(string.Empty);
            var subscriber3 = shared.Subscribe(val => testOutputHelper.WriteLine($"subscriber #3: {val}"));

            subject.OnNext(5);
        }

I receive output:

Create
publish value: 2
subscriber #1: 2

subscriber #2: 2
publish value: 3
subscriber #1: 3
subscriber #2: 3

Dispose

subscriber #3: 2
subscriber #3: 3
Create
publish value: 5
subscriber #3: 5

As you can see 3rd subscriber has values 2 and 3 (it also doesn't have value 4). I expected, that after all subscribers unsubscribed and something subscribes to it again it will start receiving new values and forget old ones. It looks like the problem is that Replay() uses ReplaySubject, but ReplaySubject stores all values when there were any subscribers. I'm not sure what is the correct behaviour, but I think it either should "forget" old values or it should produce all of them.

vladd commented 3 years ago

@ilnur-nazmutdinov-spark Is there a good workaround for the problem? I see only reimplementing of RefCount, which is, well, ugly.

metasong commented 1 year ago

I tested the code and find it works as it is designed. I think what we are worrying is between Dispose and Create:

Dispose

subscriber #3: 2
subscriber #3: 3
Create

the 'Replay' operator has a ReplaySubject inside and connects to source when subscribing, when we concat it to the RefCount operator, that means we will unconnect the ReplaySubect to the source and unsubscribe to the source when no observer on it(Dispose), but the old values are still buffered inside the ReplaySubject(inside Replay operator), when subscribing to it again, the old value will show up. we can verify that like this simple example:

var re = new ReplaySubject<int>();
var sub = re.Do(v => Console.WriteLine(v)).Subscribe();
re.OnNext(1);
sub.Dispose();
// re.OnNext(11); // it will show up too
 re.Do(v => Console.WriteLine(v)).Subscribe();
re.OnNext(2);

the output will be:

1
1
2

for the reason why the value 4 in the original example code subject.OnNext(4); is not available, it is because the connection between the source and the ReplaySubject is cut off by the RefCount operator when all subscriptions are disposed.

another modification to test is replacing the RefCount operator with the AutoConnect operator in the original example, and the out put will be:

Create
publish value: 2
subscriber #1: 2

subscriber #2: 2
publish value: 3
subscriber #1: 3
subscriber #2: 3

publish value: 4

subscriber #3: 2
subscriber #3: 3
subscriber #3: 4
publish value: 5
subscriber #3: 5

that is because the connect between the source and the ReplaySubject(created from Replay operator) is kept. so the value 4 and the old value showed up.

I also created another example to verify:

var si = new ReplaySubject<int>(1);
var so = new Subject<string>();
var o = si.Do(a => Console.WriteLine("----")).Replay(1).RefCount();
so.Select(_ => o).Switch().Do(v => Console.WriteLine(v)).Subscribe();
Console.WriteLine("== value from outer");
so.OnNext("");

Console.WriteLine("value 0 from inner");
si.OnNext(0);

Console.WriteLine("value 1 from inner");
si.OnNext(1);

Console.WriteLine("== new value from outer");
so.OnNext("");

the output will be:

== value from outer
value 0 from inner
----
0
value 1 from inner
----
1
== new value from outer
1
----
1

you can also notice that the value 1 is output 2 times when new value from outer comes. the first is from the ReplaySubject inside the Replay operator, the second from the ReplaySubject si.

the Note we learned is that:

the ReplaySubject inside the Replay operator will keep the in the value in buffer even we disposed the subscription

I also created an operator that will TRANSPARENTLY do subscription/Dispose operations to the source. just for your reference.

Note: it still keeps the last value.

        public static IObservable<T> ReplayBuffer<T>(this IObservable<T> source, int bufferSize = 1) {
            var replay = new ReplaySubject<T>(bufferSize);
            return Observable.Create<T>(observer =>
            {

                var subscription = source.Subscribe(replay);
                var replaySub = replay.Subscribe(observer);

                return () =>
                {
                    subscription.Dispose();
                    replaySub.Dispose();
                };
            });
                   }

for the problem I meet the Replay().AutoConnect() solved my problem.

ibebbs commented 6 months ago

Looks like this change of behavior was implemented in RxJava a months before this issue was opened: https://github.com/ReactiveX/RxJava/pull/6921

M0ns1gn0r commented 6 months ago

@maintaners Is there any plan to fix this in Rx.Net as well?

akarnokd commented 6 months ago

It's been awhile 😅

We had to augment the connectable's interface with a reset method and have refCount call it. It's complicated.

RxJava has custom publish/replay implementations that do not rely on subjects. Rx.NET uses a fixed subject that is not refreshed. I believe the intent here for COs was to act like a clutch.

Considerations:

ibebbs commented 6 months ago

Thanks @akarnokd.

I started writing a ResetableConnectableObservable<T> : IConnectableObservable that took a Func<IConnectableObservable> in the constructor and used this to create new COs on the call to Connect().

Was looking promising until I realised I could resolve my specific use-case much more easily with a BehaviorSubject being fed by an observable with a StartsWith (effectively resetting the BehaviorSubject to a known state on [re]subscription).

Would like to see something done here though. Keeping the value after all the connections are disposed doesn't feel right to me.