Cysharp / R3

The new future of dotnet/reactive and UniRx.
MIT License
1.71k stars 70 forks source link

Is it excessive to have OnNextAsync on Subject? #185

Closed harayuu9 closed 3 months ago

harayuu9 commented 3 months ago

When I write code like the following, in reality, OnComplete is called first, so the task itself is cancelled and there is no Console output.

var subject = new Subject<string>();

subject.SubscribeAwait(async (text, ct) =>
{
    await Task.Delay(TimeSpan.FromSeconds(1), ct);
    Console.WriteLine(text);
});

subject.OnNext("Hello!");
subject.OnNext("World!");
subject.OnCompleted();

// output
// (none)

I believe that in the original Rx, where SubscribeAwait didn't exist, this kind of requirement never came up. However, since R3 actively supports async await, it seems like it would be beneficial to be able to write code like the following:

var subject = new Subject<string>();

subject.SubscribeAwait(async (text, ct) =>
{
    await Task.Delay(TimeSpan.FromSeconds(1), ct);
    Console.WriteLine(text);
});

await subject.OnNextAsync("Hello!");
await subject.OnNextAsync("World!");
subject.OnCompleted();

// output
// Hello!
// World!

Although it may seem excessive as a feature of Subject, what do you think about this proposal?

neuecc commented 3 months ago

That is difficult because Rx is a push type model. IAsyncEnumerable(UniTaskAsyncEnumerable) might be possible. MessagePipe is a dedicated system to fulfill such requests (PublishAsync) https://github.com/Cysharp/MessagePipe

harayuu9 commented 3 months ago

Thank you very much. RX is still difficult because it is a Push type. As for asynchronous event exchange, I will try another method like MessagePipe.