neuecc / UniRx

Reactive Extensions for Unity
MIT License
7.01k stars 895 forks source link

Merge operator doesn't stop itself if one of merged observables throws error #498

Open korober opened 2 years ago

korober commented 2 years ago

Operator Merge SHOULD be stopped if one of merged observables throws error. But UniRx implementation doesn't. Microsoft implementation - does. Here is two examples tested with UniRx and System.Reactive:

var ss1 = new Subject<int>();
var ss2 = new Subject<int>();

var ss1 = new Subject<int>();
var ss2 = new Subject<int>();

var observableMergeWithError = ss1.Merge(ss2);
observableMergeWithError.Subscribe(new Printer<int>("merged stream"));

ss1.OnNext(11);
ss1.OnNext(12);
ss1.OnNext(13);
ss2.OnNext(21);
ss2.OnNext(22);
ss2.OnError(new Exception("error"));
ss1.OnNext(1555);
ss1.OnNext(1666);
ss2.OnNext(2555);
ss2.OnNext(2666);

So, here we have next print code compiled with UniRx:

merged UNIRX stream: ON NEXT = 11
merged UNIRX stream: ON NEXT = 12
merged UNIRX stream: ON NEXT = 13
merged UNIRX stream: ON NEXT = 21
merged UNIRX stream: ON NEXT = 22
merged UNIRX stream: ON ERROR=System.Exception: error
merged UNIRX stream: ON NEXT = 1555 (shouldn't have be printed)
merged UNIRX stream: ON NEXT = 1666 (shouldn't have be printed)

and code compiled with System.Reactive (Microsoft):

merged MS stream: ON NEXT = 11
merged MS stream: ON NEXT = 12
merged MS stream: ON NEXT = 13
merged MS stream: ON NEXT = 21
merged MS stream: ON NEXT = 22
merged MS stream: ON ERROR=System.Exception: error

I also tried this with other types of observables (not Subject) - same result:

var si1 = Observable.Interval(TimeSpan.FromSeconds(1)).Take(5)
    .Select(l => "stream1_" + l)
    .Concat(Observable.Throw<string>(new Exception()));

var si2 = Observable.Interval(TimeSpan.FromSeconds(1)).Take(7).Select(l => "stream2_" + l);

si1.Merge(si2).Subscribe(new PrinterMs<string>("merge ms test"));

Console.ReadKey();

and same results for UniRX (incorrect):

merge unirx test: ON NEXT = stream2_0
merge unirx test: ON NEXT = stream1_0
merge unirx test: ON NEXT = stream2_1
merge unirx test: ON NEXT = stream1_1
merge unirx test: ON NEXT = stream2_2
merge unirx test: ON NEXT = stream1_2
merge unirx test: ON NEXT = stream2_3
merge unirx test: ON NEXT = stream1_3
merge unirx test: ON NEXT = stream2_4
merge unirx test: ON NEXT = stream1_4
merge unirx test: ON ERROR=System.Exception: Exception of type 'System.Exception' was thrown.
merge unirx test: ON NEXT = stream2_5 (shouldn't have be printed)
merge unirx test: ON NEXT = stream2_6 (shouldn't have be printed)

and Microsoft (correct):

merge ms test: ON NEXT = stream2_0
merge ms test: ON NEXT = stream1_0
merge ms test: ON NEXT = stream2_1
merge ms test: ON NEXT = stream1_1
merge ms test: ON NEXT = stream2_2
merge ms test: ON NEXT = stream1_2
merge ms test: ON NEXT = stream2_3
merge ms test: ON NEXT = stream1_3
merge ms test: ON NEXT = stream2_4
merge ms test: ON NEXT = stream1_4
merge ms test: ON ERROR=System.Exception: Exception of type 'System.Exception' was thrown.

I've found https://github.com/neuecc/UniRx/issues/350 and it seems same issue.

korober commented 2 years ago

@neuecc Please, can you check this issue and this pr: https://github.com/neuecc/UniRx/pull/439 Merge operator - one of the useful (core) and IMHO this is major issue.