Cysharp / R3

The new future of dotnet/reactive and UniRx.
MIT License
2.1k stars 92 forks source link

There is something wrong with Observable.Concat #232

Closed kamend closed 2 months ago

kamend commented 3 months ago

I have this async function:

async ValueTask<string> SomeAsyncTask()
{
    await Task.Delay(1000);
    return "result";
}

And I want to run these Observables in this order:

Observable.Concat(
    Observable.Return("Start"),
    SomeAsyncTask().ToObservable(),
    Observable.Return("End")
).Subscribe(r =>
{
    Debug.Log("Result:"+r);
}).AddTo(this);

But for some reason the feed gets stuck at the async function and "End" never gets emitted. If I remove the Task.Delay() call, everything seem to work fine, but that makes the function synchronous and defeats the purpose.

I tried the same thing in UniRx and it seems to work as supposed, so there might be something wrong with Concat or my expectations?

Thanks!

kamend commented 3 months ago

I tracked the issue to this line where SerialDisposable is being used, but still can not wrap my head around what is wrong exactly.

https://github.com/Cysharp/R3/blob/4fc537d03444c102f323abd2242852be59f1c7b6/src/R3/Factories/Concat.cs#L94

Also one thing that is interesting is that, if I re-arrange the order of the Observables, putting the async one at the top, things seems to work fine:

Observable.Concat(
   SomeAsyncTask().ToObservable(),
    Observable.Return("Start"),
    Observable.Return("End")
).Subscribe(r =>
{
    Debug.Log("Result:"+r);
}).AddTo(this);
kamend commented 3 months ago

Digging a bit deeper, I think I found out where the problem is coming from. It is how Concat is being implemented and how SerialDisposable works.

Here is the case, we have a Concat of two Observables, one that returns immediately and one async Subject.

Subject<int> someIntObservable = new Subject<int>();

var obs = Observable.Concat(
                                       Observable.Return(1);
                                       someIntObservable
                 ).Subscribe();

If we follow the code of Concat. First we will start here where we try to set the initial value of the SerialDisposable:

https://github.com/Cysharp/R3/blob/4fc537d03444c102f323abd2242852be59f1c7b6/src/R3/Factories/Concat.cs#L53

Now before we actually reach the setter of Disposable, we will call Subscribe on the first Observable, which will immediately complete and following the code will end up here:

https://github.com/Cysharp/R3/blob/4fc537d03444c102f323abd2242852be59f1c7b6/src/R3/Factories/Concat.cs#L94

Where we want to change the SerialDisposable to the next Observable, but notice that at the begging the setter of the SerialDisposables haven't been called yet, because we are in recursion, since the first Observable is running synchronously. So in this situation the previous disposable of the SerialDisposable is not yet set, even though we already have a previous Observable.

So now what happens is that the setter of the SerialDisposable gets called with the Disposable for the Second observable, we continue, we go out of the recursion and we go back at the top, where finally the setter of the SerialDisposable gets called and what happens is that now because of how the SerialDisposable works, the current Disposable will get Disposed and the new value will become current (but here is where the problem happens, the current disposable is the second Observable in our Concat) and it is being disposed before it even got the change to emit any values.

Sorry for the very detailed explanation, but I had not idea how to show the bug in a better way.

neuecc commented 2 months ago

thanks and sorry for delayed reply. I'll check it soon.

neuecc commented 2 months ago

Sorry for the delay in confirmation. Thank you very much for your detailed explanation!

We have released v1.1.15 which fixes the problem.