reactive-streams / reactive-streams-jvm

Reactive Streams Specification for the JVM
http://www.reactive-streams.org/
MIT No Attribution
4.79k stars 527 forks source link

Remove Processor Interface #22

Closed benjchristensen closed 10 years ago

benjchristensen commented 10 years ago

What is the purpose of Processor being in the API?

Based on my reading of the code and understanding of the intents of this API, I suggest removing the Processor class: https://github.com/reactive-streams/reactive-streams/blob/master/spi/src/main/java/org/reactivestreams/api/Processor.java

It does not seem relevant to the purpose of this project. Libraries such as Akka, RxJava, Reactor, Finagle etc can and will layer logic on top of the streams of data and integrate via the Subscriber class to receive and transform data.

viktorklang commented 10 years ago

It is crucial to be able to represent a detached "stage" (having an inlet and an outlet), and since Java has a, erm, "rustic" type system, you cannot represent Consumer[T] with Producer[T], which means that you need an interface that extends both. Hence, Processor.

benjchristensen commented 10 years ago

Please provide some pseudo code as I don't see how the interop goals have anything to do with detached stages.

smaldini commented 10 years ago

I will feed the thread with some reactor code which implements Processor.

Sent from my iPhone

On 15 Apr 2014, at 17:12, Ben Christensen notifications@github.com wrote:

Please provide some pseudo code as I don't see how the interop goals have anything to do with detached stages.

— Reply to this email directly or view it on GitHub.

viktorklang commented 10 years ago

Pseudo:

   def wrap[T](p: Processor[T]): Processor[T] = doSomethingBefore() produceTo p produceTo doSomethingAfter()
benjchristensen commented 10 years ago

Sorry I'm so slow, but I'm still not seeing what that example has to do with interop between libraries.

Can you give me an example of how Akka -> Rx -> Reactor would need Processor?

viktorklang commented 10 years ago

I'm thinking:

Rx -> AnyGenericReactiveStreamsImpl -> Rx

as in:

  def splice[T](anyGenericReactiveStreamsImpl: Processor[T]) =
    doRxyThings() subscribe anyGenericReactiveStreamsImpl subscribe doMoarRxyThings()

As mentioned before, the only reason it is needed is because Scala is the only(?) jvm-lang with a typesystem that supports saying Subscriber with Publisher without requiring that to be a concrete type, i.e. interface Processor extends Subscriber with Publisher.

The alternative encoding would be to do something runtime-fail-y like:

   def splice[T](genericSubscriber: Subscriber[T], genericPublisher: Publisher[T]) = {
       assert(genericSubscriber == genericPublisher, "You broke it")
       doRxyThings() subscribe genericSubscriber
       genericPublisher subscribe doMoarRxyThings()
    }

Thoughts?

benjchristensen commented 10 years ago

Would it be correct to say that the only reason this would be needed is if we are wanting to define interoperable transformations? If so, that's beyond the original scope of allowing interop of streams but moves into interoperable stream processing.

viktorklang commented 10 years ago

@benjchristensen No it's just a consequence of a Publisher also being able to be a Subscriber.

rkuhn commented 10 years ago

I think I understand what Ben means: passing around a Processor means that I give someone a piece of hose to fit it into a pipeline, which emphasizes the transformation being passed around. We have such a case in our TCP code, when you connect outwards you get back a Processor since you can view the server end as a transformation you send ByteStrings through. It would not be a big difference to say that you get a pair of Publisher and Subscriber instead, I think.

The main question here is whether or not we will see many incompatible types being created in implementations, and I think the answer is yes: if you provide a DSL for creating some asynchronous processing steps, then the return type of the “create me the machine” method must return both interfaces at once (with Scala being the only possible exception).

By not providing this type in the SPI we will foster the creation of many copies that only inter-operate single-sidedly (i.e. you can pass an Akka Processor as either a Publisher or Subscriber, but there is no shared Processor).

I would argue for keeping this unification since most implementations will otherwise create one.

benjchristensen commented 10 years ago

This conversation has long since been superseded by discussions in #37 and #19

benjchristensen commented 10 years ago

Reopening as this discussion has been revived: https://github.com/reactive-streams/reactive-streams/issues/56#issuecomment-43776385

benjchristensen commented 10 years ago

@smaldini

I am regretting the loss of Processor but we can leave without it for now. Not sure why that one has been ditched tho.

@benjchristensen

Why does Processor need to be part of the spec? Each library will have different ways of applying operators, processors, whatever. Interop will occur naturally for a Processor of any library that implements the correct Subscriber/Publisher interfaces.

@smaldini

It's just that it is very convenient for combining libraries together or modularising streams, nothing about forcing a given transformation. If a library provide an arbitrary operation, I'd just like to be able to lift a current publisher with it and still retrieve it's output, like a logical RequestReply pattern (in fact it would look like this way in tcp application of the spec).

benjchristensen commented 10 years ago

I envision it like this, where only the reactiveStream is defined by the spec:

For example in RxJava I could do this:

Observable.from(reactiveStream).lift(processor_operator_function).subscribe(function)

The lift capability of Rx is not something I expect other libraries to adopt, nor should we impose it on the spec.

The original Processor as defined in v0.3 is basically what Rx calls a Subject, as it implements both Subscriber and Publisher. It is nothing more than that. If that's all it is, we don't need a special type for that, anything that happens to implement both of those magically becomes this "thing" (which I view as a Subject and not a Processor).

In Rx a Processor is very different than a Subject. This is the signature:

public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>>

versus a Subject:

public abstract class Subject<T, R> extends Observable<R> implements Observer<T>
... or in reactive-stream terminology ...
public interface Subject<T, R> extends Publisher<R>, Subscriber<T>

This Operator type is more interesting than Subject as it is something new, not just a combination of Publisher and Subscriber. So if we are going to make a Processor part of the interface, I'd argue it needs to be like the Operator above. We wouldn't want to rely on a function type, and the JVM doesn't have a standard for that, so we'd need to just create a full definition such as:

public interface Processor<R, T> {
    public Subscriber<? super T> call(final Subscriber<? super R> o);
}

But then, for this to be standardized across all libraries (the only point of including Processor in the spec), we'd need to add a lift (ignore name bike shedding for now) method to Publisher.

At that point I feel like we are dictating far too much about how implementations will allow processing of data, or whether they'll even allow user defined operators, or only the ones provided by their library.

Thus the Subject-style Processor is simple enough as to not need a definition, and the Operator-stye Processor requires dictating too much of the implementation. For these reasons I don't think we should include a Processor interface in the spec, even though both are useful and important to library implementations.

smaldini commented 10 years ago

Cheers Ben for the re-open and the comment. From an operator perspective it makes sense, however I think this is not strictly speaking an operator or maybe it is a Reactive Operator. I called lift because functionally speaking this seems to achieve the same goal somehow. A Processor is a function combinator (like the mathematical counterpart) and is a way to compose streams, not to define an operation in particular. In fact it can hide N operators under the hood, but the point is that this combination is also fulfilling the contract. I would also dub it "Reactive Server" or "Reactive Blackbox" or "(f o g)(x) = f(g(x))" as its purpose is to hide the implementation, offering reactive input (onNext etc) and reactive output (subscribe()).

Processor<Integer,Integer> identityProcessor =
                                Action<Integer,Integer>.map(integer -> integer)
                .distinctUntilChanged()
                .scan(Tuple2<Integer, Integer>::getT1)
                .filter(integer -> integer >= 0)
                .combine();
                //combine is creating an Action (implements Processor) where its subscribe delegates to its downstream Publisher
                // and where its onNext/onError/onSubscribe/onComplete delegates to its upstream Subscriber

Streams
    .defer(Arrays.asList(1,2,3))
    .connect(identityProcessor) //could be AkkaFlow, Subject (?), Action
        .subscribe(reactOnIdentityProcessorSubscriber);

In my mind the purpose is to have the flexibility to package head and tail of a Stream under a single object and in a coherent way across implementations. I stand corrected tho, having played with it the past couple of month it looked like it fitted well for a relative free cost. Since it is maybe more niche than Publisher/Subscriber, could it be optional for a given implementation ? I am pretty sure this could be appreciated by a few tho, especially in distributed environment where you could hide the request-reply / reply-to pattern under this simple interface.

danarmak commented 10 years ago

@benjchristensen, you said:

The original Processor as defined in v0.3 is basically what Rx calls a Subject, as it implements both Subscriber and Publisher. It is nothing more than that. If that's all it is, we don't need a special type for that, anything that happens to implement both of those magically becomes this "thing"

In Scala we can declare a type Subscriber with Publisher, but not in Java, so how would implementations "magically become this thing"?

If two different Java implementations of Reactive Streams have their own types which both look like interface Processor extends Subscriber extends Publisher, they will still not be interoperable directly but need trivial wrappers. That trivial inconvenience may be a small reason to include such an interface in the spec.

igstan commented 10 years ago

There is a way to mimic Subscriber with Publisher in Java using multiple bounds. It's more limited though. It's impossible to use this trick to declare the type of a field without adding a new generic type to the class.

public <T, Processor extends Subscriber<T> & Pulisher<T>> void quux(Processor p) {}
danarmak commented 10 years ago

I didn't know that. Thanks, and sorry for raising irrelevant objections.

viktorklang commented 10 years ago

:+1: on adding Processor. @igstan's trick is very neat but it doesn't work out in practice:

Exhibit A:

✗ cat Foo.java
public class Foo {
  public interface Pig<T> {}
  public interface Dog<t> {}

  public static <T, Pigdog extends Pig<T> & Dog<T>> Pigdog foo(Pigdog pd) { return pd; }

  public static void main(Strin[] args) {
    Pig<Integer> & Dog<Integer> pd = foo(null);
  }
}

✗ javac Foo.java
Foo.java:8: error: not a statement
    Pig<Integer> & Dog<Integer> pd = foo(null);
       ^
Foo.java:8: error: ';' expected
    Pig<Integer> & Dog<Integer> pd = foo(null);
                ^
2 errors

Arguments for Processor (needs counter-arguments): 1) Java's type system is not sophisticated enough to represent Publisher<T> with Subscriber<R> without introducing a new, concrete, type. (see Exhibit A) 2) Processor<T,R> is as ubiquitous as Function<T,R>, it is a means of abstracting/composing detached processing stages. 3) We already agree that many implementations will have to implement Processor—with poor to no possibility of offering generic combinators over Processors (since they will not have the same JVM type) 4) It is easier (quicker adoption) to deprecate the Processor in Reactive Streams if deemed needed than to get all other implementations to deprecate their Processor implementations and switch to Reactive Streams Processor if we decide to introduce it later. 5) Having Processor in the spec makes it possible for us to add specification to it if needed, to avoid diverging behaviors if implementations define their own Processor type. 6) It is 1 line of Java code—extremely cheap to maintain.

Looking forward to seeing these arguments shot down.

Cheers, √

smaldini commented 10 years ago

Well I'm inline with Viktor on this. +1

Sent from my iPhone

On 22 May 2014, at 10:49, Viktor Klang (√) notifications@github.com wrote:

on adding Processor. @igstan's trick is very neat but sadly it doesn't really work out in practice:

Exhibit A:

✗ cat Foo.java public class Foo { public interface Pig {} public interface Dog {}

public static <T, Pigdog extends Pig & Dog> Pigdog foo(Pigdog pd) { return pd; }

public static void main(Strin[] args) { Pig & Dog pd = foo(null); } }

✗ javac Foo.java Foo.java:8: error: not a statement Pig & Dog pd = foo(null); ^ Foo.java:8: error: ';' expected Pig & Dog pd = foo(null); ^ 2 errors Arguments for Processor (needs counter-arguments): 1) Java's type system is not sophisticated enough to represent Publisher with Subscriber without introducing a new, concrete, type. (see Exhibit A) 2) Processor is as ubiquitous as Function, it is a means of abstracting/composing detached processing stages. 3) We already agree that many implementations will have to implement Processor—with poor to no possibility of offering generic combinators over Processors (since they will not have the same JVM type) 4) It is easier (quicker adoption) to deprecate the Processor in Reactive Streams if deemed needed than to get all other implementations to deprecate their Processor implementations and switch to Reactive Streams Processor if we decide to introduce it later. 5) Having Processor in the spec makes it possible for us to add specification to it if needed, to avoid diverging behaviors if implementations define their own Processor type. 6) It is 1 line of Java code—extremely cheap to maintain.

Looking forward to seeing these arguments shot down.

Cheers, √

— Reply to this email directly or view it on GitHub.

rkuhn commented 10 years ago

Ben, as much as I understand where you are coming from, you are neglecting an important detail: the JVM’s intrinsic type system (i.e. not even Java’s but the byte-code one) is fully nominal, which means that implementing both Subscriber and Publisher does not magically make this a Processor-like thing that can be shared. Every library that introduces such a type by necessity makes a new one that is not compatible with any of the others.

The (very useful) operation that you then cannot write generically is def append(p: Processor[T, U]): Publisher[U] (on a Publisher[T]), because a Reactor.Processor cannot stand in for an Akka.Processor even though they have the exact same structure—due to their different names.

I am convinced that we will see multiple libraries creating and using variations of this type, and even if RxJava will never use it this will still be a desirable addition.

Towards your description of Rx Operators: this is not as generically useful, it is more a detail of how you choose to design your DSL (which means that I fully agree with your assessment that Rx’s Operator is not a good fit for Reactive Streams in general). A TCP client connection would be perfectly represented as a Processor[ByteString, ByteString], because that makes it obvious that we are dealing with two asynchronous boundaries (which is indisputable in this case) while using the Operator terminology does not fit this bill.

benjchristensen commented 10 years ago

so how would implementations "magically become this thing"?

Ah, so if I understand correctly, this is about supporting a method that takes a Processor. If that's the case, then yes, Java can't do that.

My point was that it is fully possible in Java to just have a type that implements Publisher, Subscriber so we didn't need a type to do that.

But if we want to support having a method like this:

void doStuff(Type that extends Publisher, Subscriber)

then yes, Java can't do that and we'd need a type that unites them.

@rkuhn I'm convinced that the type is useful if trying to do the above.

benjchristensen commented 10 years ago

Are you thinking it would be optional for libraries to implement this? If it is required, must the Publisher have a method like process(Processor p)?

benjchristensen commented 10 years ago

obvious that we are dealing with two asynchronous boundaries

No, the Processor does not do that. It can be fully synchronous as per decisions in #46 and often will be (such as filter and map).

The Operator and Processor types both support synchronous and asynchronous processing.

danarmak commented 10 years ago

@benjchristensen, isn't publisher.process(processor) equivalent to:

publisher.subscribe(processor);
return processor // As the next Publisher

If so, the method doesn't need to be included in the spec.

viktorklang commented 10 years ago

@benjchristensen I see no need for a process method—the type is needed to be able to abstract over the notion of a Publisher that is also a Subscriber—to take them as parameters, to return them as return values, to assign them to variables etc.

benjchristensen commented 10 years ago

isn't publisher.process(processor) equivalent to:

@danarmak Sort of ... the problem here is that the chain is connected in the wrong direction, so data will now be flowing from the origin before the child is connected, which means data can be lost unless the Processor is buffering until someone subscribes to it (which there is no guarantee of happening).

@viktorklang If it's just to add a type, then that significantly reduces the impact of this on the spec and actually shouldn't even need to be mentioned.

Though the name seems wrong, as processing implies Subscriber<T> -> Subscriber<R> and does not require a Publisher being involved. I'd rather we come up with a different name ... but right now I'm struggling to come up with anything except Subject which is what Rx calls this signature.

I fully agree that the signature is valuable, but my experience with this signature has convinced me it is rarely the right one for "processing". It is used for multicasting and for "hot" Publishers that can keep calling onNext to it from the "outside" of a stream while one or more Subscribers are receiving data from it.

Some of the problems with using it for things like map, take, skip, etc are:

1) Lifecycle

A new Processor instance is generally needed each time a stream is subscribed to, which means the method signature for accepting a Processor can not just be process(Processor p) but must instead be a factory function.

2) Subscription Flow

A stream must be subscribed bottom-to-top, not top-to-bottom, otherwise data is already flowing at the top before the children subscribers are ready to receive.

A process method must wrap the Processor in a Publisher that awaits a subscribe from a Subscriber and then subscribe upwards. This means extra object allocations and complexity.

3) Public/Private

The onNext method on a Processor is public, so anyone with a reference to it can emit data to it.

This may or may not be a concern of a system, but it's something to keep in mind, especially if the Processor instance is being passed into a stream. It also makes reasoning about contract compliance more difficult. An example of another consideration with this signature becomes "are multiple places concurrently invoking onNext?".

4) Efficiency

The Publisher, Subscriber signature requires that each step must subscribe upwards, which means a subscribe -> Subscription -> onSubscribe step for each transformation.

For something like map or filter that is quite heavy, and requires extra object allocations as per item 2 above.

This is why Rx uses the Subscriber<T> -> Subscriber<R> signature for almost all "processing" as it allows far more efficient implementations (onNext -> onNext without an extra subscription in the mix).

I mention these not to argue against the Publisher, Subscriber union, but to illustrate that this type is not necessarily the right thing for all or even most "processing" and should not be construed as such.

danarmak commented 10 years ago

@benjchristensen Subscription order is an interesting problem. With hot Publishers, items are liable to be dropped no matter what you do. Even if you subscribe bottom-to-top, if every processor in the stream is 'hot', then data elements can still be dropped between any two processors, which is a similar result to subscribing top-to-bottom, although perhaps less lossy in practice. With cold Publishers, subscription order only matters in a multicast scenario.

viktorklang commented 10 years ago

On Thu, May 22, 2014 at 5:41 PM, Ben Christensen notifications@github.comwrote:

isn't publisher.process(processor) equivalent to:

@danarmak https://github.com/danarmak Sort of ... the problem here is that the chain is connected in the wrong direction, so data will now be flowing from the origin before the child is connected, which means data can be lost unless the Processor is buffering until someone subscribes to it (which there is no guarantee of happening).

@viktorklang https://github.com/viktorklang If it's just to add a type, then that significantly reduces the impact of this on the spec and actually shouldn't even need to be mentioned.

It's pretty good to just have it a part of the spec so that if someone does a .NET impl they also have to retain it?

Though the name seems wrong, as processing implies Subscriber -> Subscriber and does not require a Publisher being involved. I'd rather we come up with a different name ... but right now I'm struggling to come up with anything except Subject which is what Rx calls this signature.

It's not a Sub -> Sub, it's a Subscriber to it's upstream and a Publisher to its downstream.

I fully agree that the signature is valuable, but my experience with this signature has convinced me it is rarely the right one for "processing". It is used for multicastinghttps://github.com/Netflix/RxJava/tree/master/rxjava-core/src/main/java/rx/subjectsand for "hot" Publishers that can keep calling onNext to it from the "outside" of a stream while one or more Subscribers are receiving data from it.

It has been very useful for Akka Streams—an anecdotal counter argument to an anecdotal argument.

Some of the problems with using it for things like map, take, skip, etc are:

1) Lifecycle

A new Processor instance is generally needed each time a stream is subscribed to, which means the method signature for accepting a Processorcan not just be process(Processor p) but must instead be a factory function.

I don't understand process(Processor p)—it is a subscriber to its upstream and a publisher to its downstream. Where does process enter the picture?

2) Subscription Flow

A stream must be subscribed bottom-to-top, not top-to-bottom, otherwise data is already flowing at the top before the children subscribers are ready to receive.

A process method must wrap the Processor in a Publisher that awaits a subscribe from a Subscriber and then subscribe upwards. This means extra object allocations and complexity.

Can you prove that? I don't see how that follows its definition currently.

3) Public/Private

The onNext method on a Processor is public, so anyone with a reference to it can emit data to it.

That's not a Processor problem, it's a Subscriber problem. If onNext being public is a problem, then I have multiple alternatives to offer.

This may or may not be a concern of a system, but it's something to keep in mind, especially if the Processor instance is being passed into a stream. It also makes reasoning about contract compliance more difficult. An example of another consideration with this signature becomes "are multiple places concurrently invoking onNext?".

Can you demonstrate this problem? Following the contract/spec is not unique to Processor.

4) Efficiency

The Publisher, Subscriber signature requires that each step must subscribe upwards, which means a subscribe -> Subscription -> onSubscribestep for each transformation.

For something like map or filter that is quite heavy, and requires extra object allocations as per item 2 above.

Why would it require extra allocations? This argument seems to assume a specific implementation.

This is why Rx uses the Subscriber -> Subscriber signaturehttps://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/Observable.java#L102for almost all "processing" as it allows far more efficient implementations (onNext -> onNext without an extra subscription in the mix).

Again, assumes a specific implementation. I am not convinced this argument holds. There's nothing that says that a Processor impl couldn't short-circuit its subscription.

I mention these not to argue against the Publisher, Subscriber union, but to illustrate that this type is not necessarily the right thing for all or even most "processing" and should not be construed as such.

I need more arguments to be convinced. Also, no one has contested my 6 Processor arguments yet, I'm looking forward to having them put to the test, so to say.

— Reply to this email directly or view it on GitHubhttps://github.com/reactive-streams/reactive-streams/issues/22#issuecomment-43905442 .

Cheers, √

benjchristensen commented 10 years ago

@danarmak Yes, in a hot stream subscription order doesn't matter. In a cold stream though it does matter. If I start streaming a file and have subscribed at the top already, but the bottom of the sequence is not yet subscribed, I'll lose data.

benjchristensen commented 10 years ago

It's pretty good to just have it a part of the spec so that if someone does a .NET impl they also have to retain it?

Sorry, by "spec" I meant the contract. It can definitely live as a type. There just aren't any requirements or rules that we need other than the fact that this type exists.

benjchristensen commented 10 years ago

Regarding concurrency ...

Can you demonstrate this problem? Following the contract/spec is not unique to Processor.

What do you mean about demonstrating this? Just have two threads call onNext. I'm not saying the contract doesn't cover it, it's just not good design to make the spec so easy to break. I know this from the experience of watching people try and use the Rx Subject type. It is a sharp, double-edge sword. And this Processor signature is the exact same thing as `Subject.

benjchristensen commented 10 years ago

subscribe from a Subscriber and then subscribe upwards. This means extra object allocations and complexity.

Can you prove that? I don't see how that follows its definition currently.

Sure ... it'll take some time to write the code matching this spec, but we spent weeks figuring this out in the refactoring from RxJava 0.16 -> 0.17 when we migrated away from exactly what this Processor type signature requires.

benjchristensen commented 10 years ago

Also, no one has contested my 6 Processor arguments yet, I'm looking forward to having them put to the test, so to say.

What do you mean by this? What "6 Processor" arguments have not been contested?

viktorklang commented 10 years ago

On Thu, May 22, 2014 at 6:05 PM, Ben Christensen notifications@github.comwrote:

It's pretty good to just have it a part of the spec so that if someone does a .NET impl they also have to retain it?

Sorry, by "spec" I meant the contract. It can definitely live as a type. There just aren't any requirements or rules that we need other than the fact that this type exists.

Then I totally agree! :)

— Reply to this email directly or view it on GitHubhttps://github.com/reactive-streams/reactive-streams/issues/22#issuecomment-43908734 .

Cheers, √

viktorklang commented 10 years ago

On Thu, May 22, 2014 at 6:07 PM, Ben Christensen notifications@github.comwrote:

Regarding concurrency ...

Can you demonstrate this problem? Following the contract/spec is not unique to Processor.

What do you mean about demonstrating this? Just have two threads call onNext. I'm not saying the contract doesn't cover it, it's just not good design to make the spec so easy to break. I know this from the experience of watching people try and use the Rx Subject type. It is a sharp, double-edge sword. And this Processor signature is the exact same thing as `Subject.

Implementing the spec properly requires reading and understanding the spec, If you implement a Publisher there are rules to follow, this is besides the point of Processor. To make it easy/simple for end-users to implement custom Processors/Publishers/Subscribers the libraries can offer shims/formulas.

def safeSubscriber[T](us: Subscriber[T], logger: Logger): Subscriber[T] = new Subscriber { override def onSubscribe(subscription: Subscription) = try us.onSubscribe(subscription) catch { case NonFatal(t) => onError(new IllegalStateException("Reactive Streams Specification violation: onSubscribe threw an exception", t)) }

override def onNext(element: T) = try us.onNext(element) catch { case NonFatal(t) => onError(new IllegalStateException("Reactive Streams Specification violation: onNext threw an exception", t)) }

override def onError(throwable: Throwable) = try us.onError(throwable) catch { case NonFatal(t) => logger.error(t, "Reactive Streams Specification violation: onError for {} threw an exception while handling {}.", us, throwable) }

… and so on and so forth }

— Reply to this email directly or view it on GitHubhttps://github.com/reactive-streams/reactive-streams/issues/22#issuecomment-43909024 .

Cheers, √

danarmak commented 10 years ago

@benjchristensen I think I caused a misunderstanding:

Yes, in a hot stream subscription order doesn't matter. In a cold stream though it does matter. If I start streaming a file and have subscribed at the top already, but the bottom of the sequence is not yet subscribed, I'll lose data.

I may have misused the term 'cold publisher'. I meant the following concept:

A publisher that never drops any elements. It waits for the subscribers to request more before publishing each element. If there are multiple subscribers, it waits for all of them (so the slowest one determines the overall speed). If there are no subscribers, it waits for someone to subscribe before publishing at all.

This is basically a description of the behavior of a part of my implementation. I think it emerges naturally from the weaker concept of a 'cold publisher'.

Suppose you have a function that creates a Publisher[ByteString] representing a file. If it starts publishing right away, you'll miss data before anyone can subscribe. So either it should not publish until the first subscriber is subscribed (what I do), or the function that creates the Publisher must take the subscriber as a parameter (I think that is less elegant).

The rest is a natural outcome of the desire not to drop any data once the stream is active. Given this behavior, subscription order doesn't matter.

Except for the case of multicast where the second subscriber may miss the first elements already passed to the first subscriber. The only way I know to make that deterministic is either to introduce an explicit start() method on your Publisher, or a 'broadcast' processor which is created when all subscribers are already available, so they're all effectively subscribed at once.

viktorklang commented 10 years ago

On Thu, May 22, 2014 at 6:09 PM, Ben Christensen notifications@github.comwrote:

Also, no one has contested my 6 Processor arguments yet, I'm looking forward to having them put to the test, so to say.

What do you mean by this? What "6 Processor" arguments have not been contested?

https://github.com/reactive-streams/reactive-streams/issues/22#issuecomment-43868042

— Reply to this email directly or view it on GitHubhttps://github.com/reactive-streams/reactive-streams/issues/22#issuecomment-43909281 .

Cheers, √

benjchristensen commented 10 years ago

Looking forward to seeing these arguments shot down.

Those 6 I am fine with.

smaldini commented 10 years ago

I guess we are leaning towards +1 then. I don't think this will be a risky operation. Shall we make it 0.4 ? I'll PR in a minute.

Sent from my iPhone

On 22 May 2014, at 17:32, Ben Christensen notifications@github.com wrote:

Looking forward to seeing these arguments shot down.

Those 6 I am fine with.

— Reply to this email directly or view it on GitHub.

benjchristensen commented 10 years ago

I was wrong in some of my assertions earlier. I spent a while writing code and performance tests and came up with this: https://github.com/benjchristensen/StreamProcessor

Take a look at the README there for the full information, and the links to the code.

The summary is:

Based on these results, the Processor interface actually outperforms the Operator definition in common cases. The Operator signature does seem to have value in the cases where multiple transformations are chained together (common in Rx and one of the drivers to our adoption of this as of v0.17).

Sorry about being mistaken earlier, hopefully this data and experiment correctly presents the options.

benjchristensen commented 10 years ago

@danarmak Take a look at these 3 approaches to implementing a process method. I'm sure there are others, but I came up with these 3:

    public <R> APublisher<R> process(Supplier<Processor<T, R>> supplier) {
        return new APublisher<R>((s) -> {
            Processor<T, R> p = supplier.get();
            p.subscribe(s);
            f.accept(p);
        });
    }

   /**
     * This will blow up because it happens in the wrong order
     * 
     * @param p
     * @return
     */
    public <R> Publisher<R> processSimple(Processor<T, R> p) {
        subscribe(p);
        return p;
    }

    /**
     * This works for a single subscription, but not when subscribed to multiple times because the `Processor` instance gets reused
     * 
     * @param p
     * @return
     */
    public <R> Publisher<R> process(Processor<T, R> p) {
        return new APublisher<R>((s) -> {
            p.subscribe(s);
            f.accept(p);
        });
    }

I use Test.java to play with each of these and show the problems with the latter 2 and how the 1st (process(Supplier<Processor<T, R>> supplier)) works in all the use cases.

benjchristensen commented 10 years ago

I guess we are leaning towards +1 then. I don't think this will be a risky operation. Shall we make it 0.4 ? I'll PR in a minute.

I'm fine adding the type. We are not ready to define how a Publisher would expose a mechanism for using it at this time (if ever).

danarmak commented 10 years ago

@benjchristensen your FiniteSynchronousLongPublisher is unicast, and it starts publishing only once the subscriber calls request() (i.e. it doesn't drop items before there's a subscriber). If every Processor down the line also worked like this, then subscription order wouldn't matter - elements won't be lost even if you subscribe top to bottom, because no elements will be sent until the final Consumer is connected which sends the first ever request(n). This is the solution I adopted.

Your code is a bit different because your MapProcessor passes along the Subscription from upstream, it doesn't generate one itself. That makes it less general - for instance it can't represent a mapping that isn't one-to-one, because it doesn't intercept the calls to request(n). And for this lightweight implementation to work, it requires subscription in bottom-to-top order. I don't think that impacts the general case; I'm pretty sure it's possible to implement a lightweight Processor in a top-to-bottom (or an unordered) subscription paradigm.

benjchristensen commented 10 years ago

Yes, if everything is synchronous it would work regardless of order. Once it goes async (the other publisher impl I provide in the example) things change. And it would change dramatically again if every Processor was async and decoupled the subscription (including object allocation).

I tried to start with the simplest and most efficient implementation and then start adding complexity, such as an async publisher. I have not yet created an async processor.

What would you change or do as an example to test a scenario that you're thinking is more general or applicable? I'd like to capture some of these cases and show the performance impact of different choices.

smaldini commented 10 years ago

Agree

Sent from my iPhone

On 22 May 2014, at 20:22, Ben Christensen notifications@github.com wrote:

I guess we are leaning towards +1 then. I don't think this will be a risky operation. Shall we make it 0.4 ? I'll PR in a minute.

I'm fine adding the type. We are not ready to define how a Publisher would expose a mechanism for using it at this time (if ever). — Reply to this email directly or view it on GitHub.

danarmak commented 10 years ago

@benjchristensen the generalization that immediately comes to mind is mappings that are not 1-to-1. Even a synchronous 1-to-n (e.g. emit 3 items for each 1 input) requires intercepting and modifying the request(n) calls, and either buffering the output produced on each step, or making the transformation function return a continuation or be a mutable state machine.

The other common generalization is I think asynchronicity, where the mapping function returns a Future of some kind.

The most general equivalent of a Processor that I wrote looks like this (simplified, my code isn't online yet):

abstract class Pipe[T] extends Source[T] with Sink[T] {
  protected def emit(output: Option[T]): Future[Unit] = ... // Implementation omitted
  protected def process(input: Option[T]): Future[Unit]
}

The Option[T] types follow the rule that None signifies end-of-input, corresponding to onComplete.

An implementation of Pipe needs to implement the process method. It can call emit zero or more times, but not concurrently. emit takes care of sending the data to subscribers; it returns a Future because it may need to wait for the subscriber to request more data. emit can only be called by process or the continuations created by process.

The future returned by process should complete when it has finished processing the input and emitting all output. It is guaranteed that process will be called sequentially wrt. the futures it returns. An event loop (made up of mapped Futures) links input, output (demand) and process.

This is a Future-based state machine, and a pretty powerful tool for building processors which aren't one-to-one and aren't necessarily synchronous. But the downside is that you have to create and possibly schedule a Future for each input and each output item.

If the implementation of process is known to be synchronous, the loop can be made a bit more efficient. Maybe explicitly passing around continuations would be faster than mapping on futures that are created as completed promises. I haven't tried writing an optimized solution along those lines yet.

If you're interested, the corresponding state machines for Producers and Consumers look as follows (again, simplifying):

trait Source[T] {
  protected def produce(): Future[Option[T]]
}

trait Sink[T] {
  protected def process(input: Option[T]) : Future[Unit]
}

produce is called to generate the next output element. process is just like Pipe.process except there's no emit. Both have the guarantee of not being called concurrently.

benjchristensen commented 10 years ago

I'm sorry, I don't fully understand how these examples fit what we're discussing here, as I don't see where Publisher and Subscriber fit into the types and code shown. Is this a generalization on top of Reactive Streams, or separate from it?

mappings that are not 1-to-1. Even a synchronous 1-to-n (e.g. emit 3 items for each 1 input) requires intercepting and modifying the request(n) calls, and either buffering the output produced on each step, or making the transformation function return a continuation or be a mutable state machine.

Yes, those use cases require a different implementation that the 1-to-1 scenario, but they can all be implemented on top of the defined Publisher/Subscriber types, and with or without the Processor/Operator types we're discussing in this issue.

danarmak commented 10 years ago

Source extends Publisher, Sink extends Subscriber, and Pipe extends Processor. They are intended as a complete implementation of Reactive Streams on top of Scala Futures, and they provide some additional guarantees beyond those of R.S., but those aren't important here.

I'm sorry I can't link to the actual code, I'm still waiting for the internal legal review, but I hope to publish a usable library in a week or so. I've already migrated some of our own company's code to this implementation. We needed a stream abstraction in Scala, couldn't wait for akka-streams, and I decided to make it implement Reactive Streams too.

benjchristensen commented 10 years ago

Ah, that helps to know that they extend from Publisher and Subscriber.

What do you mean by "on top of Futures"? Are you implementing streams where each onNext is produced by a Future? Or just that they are the return types to represent when an async method has completed?

The method signatures suggest the latter. If so, has the object allocation of creating a Future per onNext not proven to be an issue? I found it to be far too expensive when I explored that route.

I look forward to seeing the code so I can better understand.

danarmak commented 10 years ago

Are you implementing streams where each onNext is produced by a Future?

I'm not sure what you mean by onNext being produced. The next element to be sent to onNext is produced by the produce function, which returns a Future.

If so, has the object allocation of creating a Future per onNext not proven to be an issue?

It's definitely an issue. I reuse Futures of type Unit and Boolean where I can, but I have to create a Future wrapper for each element that passes through produce/emit.

The immediate goal of my library is getting a full implementation of Reactive Streams (with many constructors, combinators, etc) up and working as quickly as possible, because we really need it in-house and can't wait for akka-streams or Rx to be stable. Using Futures is the easiest and most solid / well supported route to implementation. It also gives a lot of flexibility since I can choose the ExecutionContext for each Source and Sink separately at run time.

Our in-house use of streams almost always begins and ends with IO, and the stream elements are relatively large, so I have reason to hope even the current performance will be sufficient.

The obvious way to improve the performance of my implementation is to add support for synchronous constructors and combinators, which would replace many unnecessarily asynchronous components I write at present. A synchronous Source.map wouldn't create a new Source, in the sense of a component that can accept subscriptions independently from the original Source. It would just take a synchronous function, possibly with mutable state support, store it in the original Source implementation and call it synchronously after the original produce.

This does involve a user-visible change: I will have to introduce a public type representing these synchronous transformations to be passed around. This will make code more complex, because some code will need a way to accept either a synchronous transformation or an asynchronous Pipe. I don't know what the common interface of these two should be. I also don't want the library to grow into something as large and feature-full as Rx; I want a relatively simple implementation of Reactive Streams. The future will show whether in the long term this implementation is needed after both akka-streams and Rx Reactive Streams are ready for production.