reactive-streams / reactive-streams-dotnet

Reactive Streams for .NET
MIT No Attribution
198 stars 28 forks source link

Inherit IObservable, IObserver for IPublisher, ISubscriber #10

Closed buybackoff closed 8 years ago

buybackoff commented 8 years ago

I believe IPublisher and ISubscriber should inherit from IObservable and IObserver. Also, ISubscription should implement IDisposable and Dispose() should be used instead of Cancel(). I played with reactive streams recently here and realized that a standalone implementation of RX Streams makes it impossible to use existing Rx.NET library. However, if IPublisher inherits IObservable, we could do type check and adjust consumers behavior accordingly. So, if a consumer supports reactive streams, it should use Request(n), otherwise Request(Int.MaxValue) should be implicitly called so that IPublisher behaves the same way as IObservable currently does. Here is an unfinished example of how to fallback to old IObservable behavior when IObserver is not ISubscriber. I believe I have seen similar patterns is Java implementation or discussions.

IObservable/IObserver are a part of the standard library, so keeping binary compatibility is quite important.

viktorklang commented 8 years ago

Hi @buybackoff!

How do you propose to deal with the contracts/spec incompatibilities for the methods? (especially for ISubscriber's signals)

Wouldn't AsIObservable on ISubscriber and an AsIObserver on Publisher be less ambiguous?

buybackoff commented 8 years ago

Aliases could probably work. There is mainly Cancel vs Dispose difference. On Jun 22, 2016 6:15 PM, "Viktor Klang (√)" notifications@github.com wrote:

Hi @buybackoff https://github.com/buybackoff!

How do you propose to deal with the contracts/spec incompatibilities for the methods? (especially for ISubscriber's signals)

Wouldn't AsIObservable on ISubscriber and an AsIObserver on Publisher be less ambiguous?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/reactive-streams/reactive-streams-dotnet/issues/10#issuecomment-227776730, or mute the thread https://github.com/notifications/unsubscribe/AAd5fEnR85YFWI4tAyxJFRkuIO8pRK5gks5qOVGDgaJpZM4I73d_ .

viktorklang commented 8 years ago

@buybackoff How would that be a problem, you mean?

buybackoff commented 8 years ago

I mean, there are only two differences: Subscribe signature is void in RS but returns IDisposable in .NET. And Dispose works like Cancel, so one could be an alias to another. Then subscription has an additional method Request, and Subscriber has an additional method OnSubscribe. Everything else is identical. Return value of Subscribe could be ignored is this is the same instance that is passed into OnSubscribe method. On Jun 22, 2016 6:29 PM, "Viktor Klang (√)" notifications@github.com wrote:

@buybackoff https://github.com/buybackoff How would that be a problem, you mean?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/reactive-streams/reactive-streams-dotnet/issues/10#issuecomment-227781391, or mute the thread https://github.com/notifications/unsubscribe/AAd5fEeTFj7HwIU5FYzqjnaWnq9A6kbRks5qOVTdgaJpZM4I73d_ .

cconstantin commented 8 years ago

@buybackoff as I understand it, the reactive streams interfaces target interoperability between compliant implementations, but they are not necessarily intended to be exposed to users of those implementations. They should be minimal, so they don't limit the choices of implementors.

In akka streams we need a non-generic version of IPublisher and ISubscriber in order to get around variance conflicts, and in an early iteration of reactive-streams-dotnet we had IPublisher<> inherit from IPublisher. This in turn complicated the implementation of an adapter library to RX.NET.

I'd rather not go down that path again.

Implementors can easily wrap or adapt these minimalistic interfaces in order to provide additional functionality.

buybackoff commented 8 years ago

Well, interfaces support multiple inheritance. IPublisher<> could inherit both from IPublisher and IObservable. On Jun 22, 2016 6:36 PM, "Chris Constantin" notifications@github.com wrote:

@buybackoff https://github.com/buybackoff as I understand it, the reactive streams interfaces target interoperability between compliant implementations, but they are not necessarily intended to be exposed to users of those implementations. They should be minimal, so they don't limit the choices of implementors.

In akka streams we need a non-generic version of IPublisher and ISubscriber in order to get around variance conflicts, and in an early iteration of reactive-streams-dotnet we had IPublisher<> inherit from IPublisher. This in turn complicated the implementation of an adapter library to RX.NET.

I'd rather not go down that path again.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/reactive-streams/reactive-streams-dotnet/issues/10#issuecomment-227783707, or mute the thread https://github.com/notifications/unsubscribe/AAd5fIG7WopX6ON-xK8cpsuYEIygCUGHks5qOVaMgaJpZM4I73d_ .

cconstantin commented 8 years ago

Yes, but it's quite invasive, in the sense that it increases the burden and constraints on the implementors. And the benefit is minimal, imho.

buybackoff commented 8 years ago

But without binary compatibility we lose ability to use everything that depends on IObservable, and fallback to implementations that do not support Request and backpressure is easy to do in base classes. On Jun 22, 2016 6:41 PM, "Chris Constantin" notifications@github.com wrote:

Yes, but it's quite invasive, in the sense that it increases the burden and constraints on the implementors.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/reactive-streams/reactive-streams-dotnet/issues/10#issuecomment-227785180, or mute the thread https://github.com/notifications/unsubscribe/AAd5fAWhLMGmQbH8y1G0BeWru7Q182Ebks5qOVeqgaJpZM4I73d_ .

cconstantin commented 8 years ago

You can easily adapt observables to reactive streams interfaces and vice-versa I would think. See this as an example: https://github.com/akarnokd/RxAdvancedFlow/blob/c1eb769a06d4118bd340cec8e2558bc3a9ae1240/RxAdvancedFlow/Flowable.cs#L2241

buybackoff commented 8 years ago

What about implicit operators in some base classes? I mean, it would be nice to have a reference implementation as a part of this project, and that implementation should be interoperable with existing Rx stuff. On Jun 22, 2016 7:04 PM, "Chris Constantin" notifications@github.com wrote:

You can easily adapt observables to reactive streams interfaces and vice-versa I would think. See this as an example: https://github.com/akarnokd/RxAdvancedFlow/blob/c1eb769a06d4118bd340cec8e2558bc3a9ae1240/RxAdvancedFlow/Flowable.cs#L2241

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/reactive-streams/reactive-streams-dotnet/issues/10#issuecomment-227792405, or mute the thread https://github.com/notifications/unsubscribe/AAd5fFMOXKQ8kJVEkxHtTVJfBjzsoeaCks5qOVz9gaJpZM4I73d_ .

marcpiechura commented 8 years ago

@buybackoff A example implementation will be available as well as the TCK when https://github.com/reactive-streams/reactive-streams-dotnet/pull/9 is merged.

I would also prefer to have a simple library that provides the interop API and leave these interfaces as they are, the same way the JVM handles this problem. Because we would also need to update the TCK to verify that who ever implements the interfaces handles the interop correctly, especially the fact that RX.Net doesn't provide backpressure. Also I don't like the fact that the .Net interfaces would be different to the ones from the other languages, that would complicate ports from existing Reactive Streams implementations.

Maybe I'm missing something but wouldn't be the only advantage a saved call to ToObservable,... ?

buybackoff commented 8 years ago

You are right that it is easy to turn Publishers/Subscribers into Observables/Observers. I am confused by the fact that they are - by the very meaning and semantics - the same, with just backpressure added, therefore it is very natural that the former just extends the later, not replaces or provides an alternative.

On Thu, Jun 23, 2016 at 5:20 PM, Marc Piechura notifications@github.com wrote:

@buybackoff https://github.com/buybackoff A example implementation will be available as well as the TCK when #9 https://github.com/reactive-streams/reactive-streams-dotnet/pull/9 is merged.

I would also prefer to have a simple library that provides the interop API and leave these interfaces as they are, the same way the JVM handles this problem. Because we would also need to update the TCK to verify that who ever implements the interfaces handles the interop correctly, especially the fact that RX.Net doesn't provide backpressure. Also I don't like the fact that the .Net interfaces would be different to the ones from the other languages, that would complicate ports from existing Reactive Streams implementations.

Maybe I'm missing something but wouldn't be the only advantage a saved call to ToObservable,... ?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/reactive-streams/reactive-streams-dotnet/issues/10#issuecomment-228064873, or mute the thread https://github.com/notifications/unsubscribe/AAd5fFZZM_BO57ERDDGILdWN5EuSnXY4ks5qOpYXgaJpZM4I73d_ .

marcpiechura commented 8 years ago

Fair point, maybe @viktorklang can explain the reasons why these new interfaces were created instead of using the ones from RX.

I think it's not only the backpressure but also the fact that RX was designed as push based model while the reactive streams provide a dynamic push/pull behavior via the demand which is send upstream.

akarnokd commented 8 years ago

The problem with the IObservable.Subscribe is that it doesn't allow synchronous cancellation of the stream and forces all sorts of async swing-arounds in Rx.NET, degrading performance by 10-100x in some cases. Here is an example how to covert from an IObserable and how to appear as IObservable. Also Rx.NET doesn't support backpressure.

buybackoff commented 8 years ago

Could you please elaborate on 10-100x perf difference? We have agreed that conversion is an easy thing to do even without inheritance and could close this issue, but I do not understand this perf point.

akarnokd commented 8 years ago

Sure, the following program benchmarks the most basic operation: loop and observe 1M integers:

using Reactor.Core;
using System;
using System.Reactive.Linq;

namespace ConsoleApplication1
{
    class Program
    {
        static int sink;

        static void Rx()
        {
            Observable.Range(1, 1000000).Subscribe(v => { sink = v; });
        }

        static void Rs()
        {
            Flux.Range(1, 1000000).Subscribe(v => { sink = v; });
        }

        static void Throughput(string name, Action a)
        {
            var start = DateTimeOffset.UtcNow;
            var end = start.AddSeconds(1);

            long ops = 0;

            while (end > DateTimeOffset.UtcNow)
            {
                a();
                ops++;
            }

            Console.Write(name);
            Console.Write(": ");
            Console.Write(ops * 1000 / (DateTimeOffset.UtcNow - start).TotalMilliseconds);
            Console.WriteLine(" ops/s");
        }

        static void Main(string[] args)
        {
            Throughput("Reactor.Core", Rs);
            Throughput("Rx.NET", Rx);
            Console.ReadKey();
        }
    }
}

The results on my weak laptop with Celeron 1005:

Reactor.Core: 46,62431525497 ops/s
Rx.NET: 0,594849955642039 ops/s

No wonder it has so much overhead: it schedules itself one-by-one.

cconstantin commented 8 years ago

This can be closed @viktorklang, there's agreement this is not necessary.