Cysharp / R3

The new future of dotnet/reactive and UniRx.
MIT License
1.7k stars 70 forks source link

`Select` / `Switch` race condition #229

Open ninjaoxygen opened 1 week ago

ninjaoxygen commented 1 week ago

There seems to be a race in Switch similar to issue #228

Below is a distilled test-case of the code that triggers it. I'm not sure what the guarantees are about threaded calls to OnNext, we basically ported from dotnet/reactive to R3 and came across the issue very intermittently.

Uncommenting the await Task.Delay(10); makes it almost always go away.

First three runs of the code for me produce different results:

dotnet run
entering Subscribe for 111
Subscribe dto is not null. DeviceId = UNCHANGED
Setting value for 111 to GEN-111-A
Subscribe dto is not null. DeviceId = GEN-111-A

dotnet run
Subscribe dto is not null. DeviceId = UNCHANGED
entering Subscribe for 111
Setting value for 111 to GEN-111-A
Subscribe dto is not null. DeviceId = GEN-111-A

dotnet run
entering Subscribe for 111
Setting value for 111 to GEN-111-A
Subscribe dto is not null. DeviceId = UNCHANGED
using R3;

SyncFactory factory = new();

ReactiveProperty<string?> udid = new("111");

var subscription = udid
    .GenerateObservable<string, ExampleDto?>(factory.Observe)
    .Subscribe(
        (x) =>
        {
            var output = string.Format("Subscribe dto {0} null. DeviceId = {1}", x is null ? "is" : "is not", x?.DeviceId ?? "(null)");
            Console.Out.WriteLine(output);
        });

//udid.Value = "222";

await Task.Delay(30000);

public static class ObserverGenerators
{
    /// <summary>
    /// Generate an observable from a key source.
    /// </summary>
    /// <typeparam name="TKey">Type of key used as generator parameter.</typeparam>
    /// <typeparam name="TValue">Type of value emitted by the generated Observable.</typeparam>
    /// <param name="keyObservable">Observable responsible for emitting keys.</param>
    /// <param name="observableFactoryFunction">Factory function used to create Observable for each key.</param>
    /// <returns>Observable of TValue?.</returns>
    public static Observable<TValue?> GenerateObservable<TKey, TValue>(this Observable<TKey?> keyObservable, Func<TKey, Observable<TValue?>> observableFactoryFunction)
    {
        var x = keyObservable.Select(
            (key) =>
            {
                if (key is null)
                {
                    return Observable.Return<TValue?>(default);
                }

                var value = observableFactoryFunction(key);

                return value;
            }).Switch();

        return x;
    }
}

public class SyncFactory
{
    public ReadOnlyReactiveProperty<ExampleDto?> Observe(string udid)
    {
        var provider = new SyncProvider(udid);

        return provider.Observable;
    }
}

public class SyncProvider
{
    private readonly string _udid;
    private readonly ReactiveProperty<ExampleDto?> _property;
    private readonly ReadOnlyReactiveProperty<ExampleDto?> _readOnlyProperty;

    public SyncProvider(string udid)
    {
        _udid = udid;
        _property = new(new ExampleDto(false, "UNCHANGED")); // establish property with a default value until async subscribe completes
        _readOnlyProperty = _property.ToReadOnlyReactiveProperty();

        // start async Subscribe process, so initially our Property will have the value from above.
        _ = Task.Run(SubscribeAsync);
    }

    public ReadOnlyReactiveProperty<ExampleDto?> Observable => _readOnlyProperty;

    /// <summary>
    /// Fake subscription to remote service.
    /// In reality, this is an Orleans grain Observer, our first value
    /// is the return from a Task<ExampleDto> Subscribe() call which is immediately delivered to OnNext.
    /// </summary>
    /// <returns>Task.</returns>
    public async Task SubscribeAsync()
    {
        Console.WriteLine("entering Subscribe for {0}", _udid);
        // await Task.Delay(10); // simulate Subscribe call round-trip time
        var value = $"GEN-{_udid}-A";
        Console.WriteLine("Setting value for {0} to {1}", _udid, value);
        _property.OnNext(new ExampleDto(true, value));
    }
}

public record ExampleDto(
    bool Configured,
    string? DeviceId);
nepolak commented 1 week ago

You could try building and linking R3 package from my forked repo from the pull request with fixed ReactiveProperty and see if it helps.

ninjaoxygen commented 1 week ago

You could try building and linking R3 package from my forked repo from the pull request with fixed ReactiveProperty and see if it helps.

Tried your fork, it definitely solves it in the test case, so looks like the same underlying cause. I will try it on our full code base.

Many thanks for your issue and patch, I had started down the same lines of investigation and was scattering logging and extra locks in Select/Switch without it helping, they just made the problem occur less frequently.

I will check with our full code and post results, happy to close as a dupe of #228.