reactive-streams / reactive-streams-jvm

Reactive Streams Specification for the JVM
http://www.reactive-streams.org/
MIT No Attribution
4.81k stars 530 forks source link

Multicast Requirement #19

Closed benjchristensen closed 10 years ago

benjchristensen commented 10 years ago

Currently the spec states "A Publisher can serve multiple subscribers subscribed dynamically at various points in time. In the case of multiple subscribers the Publisher should respect the processing rates of all of its subscribers (possibly allowing for a bounded drift between them)."

I think this is a mistake to complicate the spec and require implementations to support multicasting and therefore management of subscriptions over time. In fact, I think it should expressly stick to unicast (new lifecycle per Subscription).

Multicasting techniques should be layered on top by libraries, not required of the Publisher instances themselves.

For example, each Subscriber could result in a new network connection, open file, etc. This would be a basic implementation.

In Rx this greatly simplifies things and is a good separation of concern. Multicasting can be added and done with different behaviors such as replaying all of history, a portion of history, the last value only, or ignoring history and just starting from now onwards, etc.

In other words, someone providing me a Publisher should not concern themselves with multiple subscriptions, how I want to multicast or other such things. If I subscribe it should start a new lifecycle, emit the data as I request it and stop when I unsubscribe.

This keeps the mental model clear, the Publisher implementations simple and allows libraries a simple contract to interface with.

normanmaurer commented 10 years ago

I agree with @benjchristensen here :)

rkuhn commented 10 years ago

I fully agree with the part about not mandating multicast: it is a complication that does not pull its own weight (due to there being so many different useful variants that picking a default is too limiting).

The remaining question is whether or not it should be allowed to call subscribe twice on the same Publisher. The simplification in implementation only fully materializes if no multicast is required at all, meaning that a Publisher will only ever publish to at most one Subscriber. We have wording in the spec that defines the shut-down state as that which is entered after having been disposed by its downstream (currently called cancelation, but that is another discussion).

What you have in mind is the Rx way of the Subscription being the “active” entity in the sense of holding the state relevant between Observable and Observer, but I would prefer this spec to be more general and allowing the Publisher to be the active (asynchronous) entity.

Thinking about the user-visible contract I prefer very clear semantics:

This behavior matches Rx practices as far as I can see, the difference is that in Rx the Subject and any other Observable are not distinguished in the type system. In this spec I would propose to reserve Publisher for single-output streams, have an API element that explicitly supports fan-out (à la Rx Subject) and keep the high-level combinators DSL outside the scope as we currently have (i.e. the transformation methods on Observable are library specific and this spec does not say anything about them or their semantics).

viktorklang commented 10 years ago

:+1: @rkuhn!

benjchristensen commented 10 years ago

I think a Publisher should be capable of being subscribed to as many times as wanted, otherwise a library must have a "Publisher Factory" and the user can't pass a Publisher around safely as once consumed it couldn't be used again.

If a Publisher can be subscribed to many times, it is the only type needed by a library and can be passed around and subscribed to whenever and however often they want.

I would not add anything to this spec for fan-out, that's for libraries to do.

rkuhn commented 10 years ago

There is a relationship between subscribing to a Publisher more than once and fan-out, which divides Publishers into two categories:

As I said above, the semantics must be clear: if I get a Publisher (e.g. as a method argument) then I must know what that means. Saying “it can be used multiple times unless it cannot” violates this constraint. In the second case above there cannot be reuse since the point of this discussion is that we agree that having a fan-out logic in every Publisher is a bad idea. This is a difference to Rx Observable because for Publisher we would have to define how to merge the back pressure streams when splitting the data streams (which we cannot do in a one size fits all fashion).

Therefore the only consistent choice is to disallow a Publisher from being reused.

Having a source of data which can support multiple sinks is perfectly reasonable, but I don’t think it should be covered by this specification—at least not in the first version. This means that e.g. Observable should have a “toPublisher” method which you can use to get a back pressure aware outlet that can be connected to any Reactive Streams Subscriber, and you would implement it such that you get a new one every time. Would that not solve this issue in a quite natural fashion?

benjchristensen commented 10 years ago

Therefore the only consistent choice is to disallow a Publisher from being reused.

This just means we're forcing people to use a factory pattern for handing out Publisher instances rather than being able to use the Publisher directly. The Publisher should be the factory and each time subscribe is invoked it starts a new lifecycle. Multi-subscribe on a single Publisher is a far nicer API and contract.

Rx Observables work this way very elegantly. If I have an Observable anyone can subscribe to it as many times as they want and they'll each get whatever stream the Observable is going to send to it. I do not need to ship around an "Observable Factory" for someone to generate an Observable and then subscribe to it.

A Publisher should ALWAYS be capable of being subscribed to as many times as it wants. Each one though is independent. It is not multicast, it is unicast per subscription.

The ability to subscribe multiple times to a Publisher is not to be confused with multicast.

Multicast means it is sharing a stream. Ability to subscribe multiple times to a Publisher does not.

Multicast immediately means that a Publisher is required to maintain state and always share the stream, which is an incorrect thing to force or make every Publisher do.

Here are examples ...

1) Stock Stream - hot, always changing

Publisher: StockStream stream = new StockStream() Consumer 1: stream.subscribe(subscriberA) Consumer 2: stream.subscribe(subscriberB)

Two independent streams are flowing to subscriberA and subscriberB. Each consumer starts receiving data from whenever it starts.

The Publisher is free to choose whether it has 2 separate network connections or 1. Thus, it can do multicast internally if it wants, but it is not a requirement, nor is it expected to. Hot streams can always choose to do so though to conserve resources if they want.

2) Catalog Stream - cold, starts from beginning each time

Publisher: CatalogStream stream = new CatalogStream() Consumer 1: stream.subscribe(subscriberA) Consumer 2: stream.subscribe(subscriberB)

In this case the Publisher will establish a new network connection each time and start fresh. Each consumer will receive all titles in the catalog from the beginning regardless of when they subscribe.

3) File Stream - cold, loads each time

Publisher: FileStream stream = new FileStream() Consumer 1: stream.subscribe(subscriberA) Consumer 2: stream.subscribe(subscriberB)

In this case the Publisher will most likely open and read the file each time and start fresh. Each consumer will receive all bytes of the file regardless of when they subscribe.

Multicast vs Multiple Subscribe

A Publisher can be subscribed to multiple times and let each subscription be independent of each other (unicast). This trivial example demonstrates this:

public class AStreamExample implements Publisher<String> {

    public static void main(String[] args) {
        AStreamExample stream = new AStreamExample();

        MySubscriber s1 = new MySubscriber();
        MySubscriber s2 = new MySubscriber();
        stream.subscribe(s1);
        stream.subscribe(s2);
    }

    @Override
    public void subscribe(final Subscriber<String> subscriber) {
        // you wouldn't really implement it like this ... but you get the point
        new Thread(new Runnable() {

            @Override
            public void run() {
                System.out.println("Subscribed on thread: " + Thread.currentThread());
                BooleanSubscription s = new BooleanSubscription();
                subscriber.onSubscribe(s);
                // ignoring unsubscribe and requestMore behavior for this simple example
                subscriber.onNext("1");
                subscriber.onNext("2");
                subscriber.onComplete();
            }

        }).run();
    }

}

A Publisher is the factory. Each time subscribe is invoked it kicks off the work.

It is not required to multicast. In fact, it is assumed it will unicast. If a backing stream naturally multicasts (a hot stream of events that never stops) then it could choose as an implementation decision to do multicasting. This ties back to the 3 example use cases above with "hot" and "cold" sources.

If we didn't allow multi-subscribe, then the code would need to look like this:

    public static void main(String[] args) {
        MySubscriber s1 = new MySubscriber();
        MySubscriber s2 = new MySubscriber();
        new AStreamExample().subscribe(s1);
        new AStreamExample().subscribe(s2);
    }

This seems trivial in this case, but put it behind an API and it becomes more clear:

    public static void main(String[] args) {
        MySubscriber s1 = new MySubscriber();
        MySubscriber s2 = new MySubscriber();
        getStream().subscribe(s1);
        getStream().subscribe(s2);
    }

Do I have to call getStream() each time I subscribe? If so, why? The subscribe methods says I can subscribe so why not multiple times?

It gets worse once someone starts passing the stream around.

    public void doStuff(AStreamExample stream) {
        stream.subscribe();
        doMoreStuff(stream);
    }

    public void doMoreStuff(AStreamExample stream) {
        // can I subscribe? what if someone else already did?
        stream.subscribe();
    }

Perhaps my original statement was unclear, but I'm not saying a Publisher can't do multicast on a live stream, but that a Publisher should not be required to do multicast as it does not cater to all use cases. There are also many multicast use cases that are better suited to libraries (such as replay with its many different variants).

I suggest we remove the multicast requirement for a Publisher but require that subscribe be capable of being invoked multiple times. It is then left to the Publisher to decide whether it does multicast or unicast based on its use cases, resource usage, etc.

rkuhn commented 10 years ago

@benjchristensen, thanks for your elaboration, now it is perfectly clear what we are talking about: you are equating Publisher with Observable, perhaps because of similarities in method names and also due to the existence of Subscription as a name shared between Rx and Reactive Streams.

To be very blunt and concise: these names have misled you, and I am very sorry about that.

The Publisher/Subscriber pair is designed only for one purpose: to transport exactly one stream of elements across an asynchronous boundary (without unbounded buffering etc.). Therefore a Publisher delivers exactly one stream, which is why multiple subscriptions and multicast are identical in this case.

Thinking about it some more, it would probably have been better to leave out the Subscription altogether—we had it since we started out from the position of building in multicast as a convenience feature that we have come to realize is detrimental—which would lead us to equate Publisher with an Rx Subscription, leaving the details of what you want to publish out of the scope of this specification.

What you want is to have an easy path towards using Reactive Streams as a way to express remote Observables, and what we want is to use it as a way to push data to an Actor without blowing up message queues, and as far as I know everyone agrees on the underlying basic problem set. I’ll try out what the above proposal would look like, and if it makes sense (i.e. passes my plausibility test) then we can continue the discussion on the resulting pull request.

Again, I’m sorry that it took so long for me to realize what the root cause of the misunderstanding was.

benjchristensen commented 10 years ago

Roland, this is not about Observables (though that heavily influences my perspective as I've spent almost 2 years building "reactive stream" applications in production).

Let me equate it with Iterable/Iterator instead so there is no chance of conflict with Observable.

In code, I can pass an Iterable around to anyone and anything can "subscribe" to it via a new Iterator.

Since it's pull based, it creates the Iterator for me on a getIterator() method or something along those lines.

The getIterator() method can be called n times and every time will get a clean "subscription" to iterating the Iterable.

Now we flip to a Publisher/Subscriber. Since it's push, I provide the Subscriber instead of calling getSubscriber(). It is via the Publisher.subscribe(Subscriber s) that I start the push-based iteration (or streaming).

The point I'm making is two-fold:

1) The Publisher should be usable like an Iterable so that developers can pass it around and allow anyone to subscribe to it whenever and however many times they want. Otherwise there ends up needing to be a 3rd component, a PublisherFactory that would be used like PublisherFactory.getPublisher().subscribe(Subscriber s).

2) Multicast is an option, not a requirement that is up to the Publisher to decide upon based on what they're streaming.

Therefore, please put aside comparisons to or use cases of Actors and Observables and consider this being an interface in the JDK for decades to come that everyone will implement. In that case, it will no longer be used purely for interop (as it will be now since it's a 3rd-party dependency). If it becomes part of the JDK it will be implemented directly by classes and be the common interface, like Iterable, and should therefore be capable of being used like an Iterable without a factory being required.

Code such as the following should be safe to write:

doStuff() {
  Publisher p = new PublisherImpl();
  doMoreStuff(p)
  p.subscribe(subscriber);
}

doMoreStuff(Publisher p) {
  p.subscribe(subscriber);
}

If we don't allow multi-subscribe behavior this will instead need to be written with a 3rd level such as:

doStuff() {
  PublisherFactory p = new PublisherFactoryImpl();
  doMoreStuff(p)
  p.getPublisher().subscribe(subscriber);
}

doMoreStuff(PublisherFactory p) {
  p.getPublisher().subscribe(subscriber);
}
jrudolph commented 10 years ago

@benjchristensen What about streams that can naturally only be used once? Like a stream of network data for an already opened network connection? Given your suggestion would it be possible to get a Publisher for a stream like that?

IMO either, you would have to allow that this Producer would instantly call onError for all subsequent subscriptions after the first, or you need some buffering logic / fan-out logic that allows consumers to join the game later.

benjchristensen commented 10 years ago

It's up to the Publisher whether it wants to reuse the underlying network connection or not.

See the 3 use cases I listed above at https://github.com/reactive-streams/reactive-streams/issues/19#issuecomment-40633320

jrudolph commented 10 years ago

Yes, I've seen them. That's why I ask. I don't think an open connection fits into any of those three categories. An already open network connection (similar to an already generated Iterator) has very different backpressure needs than something that publishes events. "Reuse" is the thing that doesn't work easily because the backpressure demand of all possible consumers have to be coordinated somehow.

So, that's the question: can an open network connection (an iterator, an InputStream) be a Publisher in your view (in Rx)?

benjchristensen commented 10 years ago

There are two cases where an open network stream applies.

1) Hot

Assume a "hot" stream where back pressure is not relevant, it's stock prices, mouse events, system metrics or some other thing that continues to fire at you. You can't say "stop", all you can say is "I can't take any more". In that case it's up to the developer to choose whether they want to drop, sample, debounce, buffer (rarely useful in this case). The "requestMore" behavior from a Subscriber can be translated into one of these mechanisms, but that's up to each implementation and the developer to choose.

In a "hot" case like this, a Publisher can safely reuse the network connection because it's going to stream the exact same data whether a single connection or multiple connections are open. There is no buffering, no state per Subscriber, just a flow of data. It is transparent to the Subscriber whether there are network connections being shared or not, it's just a hot stream of data that they can throttle on if they wish. Each Subscription will manage the requestMore behavior independently of each other, and each one will just be doing throttling of some flavor, the Publisher just keeps emitting the data.

Summary:

2) Cold

A "cold" source is very different. In this case the Publisher would need to create a new network connection for each Subscription because "requestMore" is now meaningful.

Each Subscriber will request however much it can handle and the state of data flowing is one-to-one between the underlying resource. The single Publisher can be subscribed to multiple times, but each time a new Subscription is created it will represent a separate network connection that only serves that Subscriber/Subscription pair.

Summary:

In short, the Publisher is the factory that returns a Subscription to the Subscriber that manages the correct state and behavior (via the "requestMore" invocations) depending on what fits the use case.

alexandru commented 10 years ago

I think Publisher here is the equivalent of the Iterator, whereas Iterable.getIterator() is Producer.getPublisher(), is it not?

I agree with @benjchristensen, as from the subscriber's point of view it shouldn't matter if a Publisher is hot or cold. I like this in Rx Observables.

benjchristensen commented 10 years ago

Related to this issue is that I think we should be using the Publisher/Subscriber/Subscription types only (https://github.com/reactive-streams/reactive-streams/tree/master/spi/src/main/java/org/reactivestreams/spi) and not have (Producer/Consumer) => https://github.com/reactive-streams/reactive-streams/pull/25

The getPublisher() thing is misleading in its similarity to getIterator(), as that's not how a push based interface behaves.

viktorklang commented 10 years ago

@benjchristensen Well, technically the method is called iterator() (the one that creates a new Iterator for each incovation). I'd expect the general assumption in Java to be that getters don't create new things. Nevertheless, we should strive to minimize the surface area of the spec/spi.

jbrisbin commented 10 years ago

I think I'm identifying more with @benjchristensen 's comments here. I like the example of iterator() (no "get" ;)) to illustrate the point.

It also seems to me that we might benefit from a hasMore(int num) or equivalent. From hard experience we've found that iteration in high-volume code can be vastly more efficient if you first make sure a collection isn't empty. It will often be the case that a Publisher will be able to answer a hasMore query more efficiently than by assuming work will be done or attempting to do work when none is available.

One might argue that simply attempting to do work and finding out after the fact that none was done would be sufficient but in many cases that won't work. e.g. when doing least-busy kinds of routing.

At the very least we need a way to determine if no work was done so we can decide to do something about it. Even if it's as simple as requestMore(int) returning boolean.

benjchristensen commented 10 years ago

Well, technically the method is called iterator()

Whatever it's called ... here is the implementation of ArrayList where a new Iterator is created each time, exactly as I would expect Publisher.subscribe to do:

    public Iterator<E> iterator() {
        return new Itr();
    }
viktorklang commented 10 years ago

@benjchristensen The name is important (otherwise we have had waaay too many naming discussions already, don't you agree?!) My point was that "nobody" would expect a getter to return a new instance each time. And it seems like we agree there?

Whatever it's called ... here is the implementation of ArrayList where a new Iterator is created each time, >exactly as I would expect Publisher.subscribe to do:

Publisher.subscribe already creates a new Subscription (well, unless the Subscriber was already subscribed) for each subscribe. However, as a Subscriber you simply don't know which elements you'll get (if any!), you'll get the ones the Publisher actually already has, can request from someone else, or can create. Right?

rkuhn commented 10 years ago

@jbrisbin The analogy you are making goes in the wrong direction: the Subscriber does not query the Publisher, it just signals capacity to receive more elements (or “demand”, if we agree on the meaning à la “demand in the market”). A query would be synchronous; if not on the type level then on a semantic level.

rkuhn commented 10 years ago

@benjchristensen We are still discussing about different things, I think: the prime problem we want to solve is how to pass data across an asynchronous boundary with non-blocking back pressure. Since this is a bidirectional endeavor, allowing multiple participants on one side of the fence by inversion means allowing multiple participants on the other as well. Splitting a stream means merging the back pressure, merging streams means splitting the back pressure.

Now, I think it is entirely reasonable to ask the question of “how do I obtain a source for such asynchronous exchange”, but that question is outside the scope of the initial problem. We should first solve the case of connecting both ends of this async pipe.

The reason why I persevere in this case is that, plainly speaking, not every iterator is born of an iterable, and your answer of “just drop data” in the other case (which you called “hot”) is by all means not the only conceivable one. I might well have an incoming TCP stream which does support back pressure at its source, but which I want to consume in parallel in multiple places. There are many strategies of how to connect multiple Subscribers to a logical “Source”, wherefore it just places useless burden on all Publishers if they must implement one of two specific choices.

So, coming back for a second try: what does not work for you with the following proposal?

trait Publisher[T] { // corresponds to Rx Subscription with back pressure
  def subscribe(_: Subscriber[T]): Unit
  def signalAdditionalDemand(N: Int): Unit
  def cancel(): Unit
}
trait Subscriber[T] {
  def onSubscribe(_: Publisher[T]): Unit
  def onNext(_: T): Unit
  def onError(_: Throwable): Unit
  def onComplete(): Unit
}
// and that would be ALL, nothing else in Reactive Streams

(remark: in this case we will want to change the names because PubSub is not 1:1, but that is a discussion to be had afterwards)

benjchristensen commented 10 years ago

coming back for a second try: what does not work for you with the following proposal?

I really like how simple and clean this is and it also solves https://github.com/reactive-streams/reactive-streams/pull/25.

It does however require a 3rd factory type to generate Publisher since the Publisher object now has the state of signalAdditionalDemand and cancel and thus needs a new instance each time.

I would suggest this change that makes Publisher the factory and moves the state to Subscription:

public interface Publisher<T> {
    public void subscribe(Subscriber<T> s);
}

public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onCompleted();
}

public interface Subscription {
    public void signalAdditionalDemand(int n);
    public void cancel();
}

I'm still not 100% convinced the onSubscribe method is correct, as it requires some awkward handling ... but this is close and works.

It allows the Publisher to be the type that gets passed around in code (the factory) and combines SPI/API into a 3 types that are clear in what they do.

Here is code using those 3 types:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

public class Test {

    public static void main(String... args) throws InterruptedException {
        Publisher<Integer> dataStream = getData();

        dataStream.subscribe(new MySubscriber("A"));
        Thread.sleep(750);
        dataStream.subscribe(new MySubscriber("B"));
    }

    static Publisher<Integer> getData() {
        return new MyDataPublisher();
    }

    static class MyDataPublisher implements Publisher<Integer> {

        @Override
        public void subscribe(final Subscriber<Integer> s) {

            AtomicInteger i = new AtomicInteger();

            Subscription subscription = new Subscription() {

                AtomicInteger capacity = new AtomicInteger();

                @Override
                public void signalAdditionalDemand(int n) {
                    System.out.println("signalAdditionalDemand => " + n);
                    if (capacity.getAndAdd(n) == 0) {
                        // start sending again if it wasn't already running
                        send();
                    }
                }

                private void send() {
                    System.out.println("send => " + capacity.get());
                    // this would normally use an eventloop, actor, whatever
                    new Thread(() -> {
                        do {
                            s.onNext(i.incrementAndGet());
                        } while (capacity.decrementAndGet() > 0);
                    }).start();
                }

                @Override
                public void cancel() {
                    capacity.set(-1);
                }

            };

            s.onSubscribe(subscription);

        }

    }

    static class MySubscriber implements Subscriber<Integer> {

        final int BUFFER_SIZE = 10;
        private final ArrayBlockingQueue<Integer> buffer = new ArrayBlockingQueue<>(BUFFER_SIZE);
        private volatile boolean terminated = false;
        private final String token;

        MySubscriber(String token) {
            this.token = token;
        }

        @Override
        public void onSubscribe(Subscription s) {
            System.out.println("onSubscribe => request " + BUFFER_SIZE);
            s.signalAdditionalDemand(BUFFER_SIZE);
            startAsyncWork(s);
        }

        @Override
        public void onNext(Integer t) {
            buffer.add(t);
        }

        @Override
        public void onError(Throwable t) {
            terminated = true;
            throw new RuntimeException(t);
        }

        @Override
        public void onCompleted() {
            terminated = true;
        }

        private void startAsyncWork(Subscription s) {
            System.out.println("**** Start new worker thread");
            /* don't write real code like this! just for quick demo */
            new Thread(() -> {
                while (!terminated) {
                    Integer v = buffer.poll();
                    try {
                        Thread.sleep(100);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    if (buffer.size() < 3) {
                        s.signalAdditionalDemand(BUFFER_SIZE - buffer.size());
                    }
                    if (v != null) {
                        System.out.println(token + " => Did stuff with v: " + v);
                    }
                }
            }).start();
        }

    }

}
benjchristensen commented 10 years ago

The output of the above code is:

onSubscribe => request 10
signalAdditionalDemand => 10
send => 10
**** Start new worker thread
A => Did stuff with v: 1
A => Did stuff with v: 2
A => Did stuff with v: 3
A => Did stuff with v: 4
A => Did stuff with v: 5
A => Did stuff with v: 6
A => Did stuff with v: 7
onSubscribe => request 10
signalAdditionalDemand => 10
send => 10
**** Start new worker thread
signalAdditionalDemand => 8
send => 8
A => Did stuff with v: 8
B => Did stuff with v: 1
A => Did stuff with v: 9
B => Did stuff with v: 2
A => Did stuff with v: 10
B => Did stuff with v: 3
A => Did stuff with v: 11
B => Did stuff with v: 4
A => Did stuff with v: 12
B => Did stuff with v: 5
A => Did stuff with v: 13
B => Did stuff with v: 6
A => Did stuff with v: 14
B => Did stuff with v: 7
A => Did stuff with v: 15
signalAdditionalDemand => 8
send => 8
B => Did stuff with v: 8
signalAdditionalDemand => 8
send => 8
A => Did stuff with v: 16
B => Did stuff with v: 9
A => Did stuff with v: 17
B => Did stuff with v: 10
A => Did stuff with v: 18
B => Did stuff with v: 11
A => Did stuff with v: 19
B => Did stuff with v: 12
A => Did stuff with v: 20
B => Did stuff with v: 13
A => Did stuff with v: 21
B => Did stuff with v: 14
A => Did stuff with v: 22
B => Did stuff with v: 15
A => Did stuff with v: 23
signalAdditionalDemand => 8
send => 8
B => Did stuff with v: 16
signalAdditionalDemand => 8
send => 8
A => Did stuff with v: 24
B => Did stuff with v: 17
A => Did stuff with v: 25
B => Did stuff with v: 18
A => Did stuff with v: 26
B => Did stuff with v: 19
A => Did stuff with v: 27
B => Did stuff with v: 20
A => Did stuff with v: 28
B => Did stuff with v: 21
A => Did stuff with v: 29
B => Did stuff with v: 22
A => Did stuff with v: 30
B => Did stuff with v: 23
A => Did stuff with v: 31
signalAdditionalDemand => 8
send => 8
B => Did stuff with v: 24
signalAdditionalDemand => 8
send => 8
A => Did stuff with v: 32
B => Did stuff with v: 25
A => Did stuff with v: 33
B => Did stuff with v: 26
A => Did stuff with v: 34
B => Did stuff with v: 27
A => Did stuff with v: 35
B => Did stuff with v: 28
A => Did stuff with v: 36
B => Did stuff with v: 29
A => Did stuff with v: 37
B => Did stuff with v: 30
A => Did stuff with v: 38
B => Did stuff with v: 31
A => Did stuff with v: 39
signalAdditionalDemand => 8
send => 8
B => Did stuff with v: 32
signalAdditionalDemand => 8
send => 8
A => Did stuff with v: 40
B => Did stuff with v: 33
jbrisbin commented 10 years ago

I do like how this makes things clean and simple as well.

I'm playing with a reactive Buffer so I'll be trying this out @benjchristensen Is this pushed to your fork yet or somehow otherwise available for me to test against?

/cc @smaldini

jbrisbin commented 10 years ago

Not a fan of signalAdditionalDemand but it's certainly clear in intent and self-documenting...

benjchristensen commented 10 years ago

Is this pushed to your fork yet or somehow otherwise available for me to test against?

No, what is pasted above is the entirety of the code. I purposefully did not make anything depend on RxJava so as to keep this completely separate and clean.

Not a fan of signalAdditionalDemand but it's certainly clear in intent and self-documenting...

Yes, I'm not sold yet on that or the onSubscribe callback, but not ready to bikeshed on those as I can't yet provide a better solution :-)

Going to followup in a few minutes with documented APIs that communicate the contract.

benjchristensen commented 10 years ago

With poorly written docs to try and communicate the contracts:

public interface Publisher<T> {

    /**
     * Request {@link Subscription} to start streaming data.
     * <p>
     * This is a "factory method" and can be called multiple times, each time starting a new {@link Subscription}.
     * <p>
     * Each {@link Subscription} will work for only a single {@link Subscriber}.
     * <p>
     * A {@link Subscriber} should only subscribe once to a single {@link Publisher}.
     * 
     * @param s
     */
    public void subscribe(Subscriber<T> s);
}

/**
 * Will receive call to {@link #onSubscribe(Subscription)} once after passing an instance of {@link Subscriber} to {@link Publisher#subscribe(Subscriber)}.
 * <p>
 * No further notifications will be received until {@link Subscription#signalAdditionalDemand(int)} is called.
 * <p>
 * After signaling demand:
 * <ul>
 * <li>One or more invocations of {@link #onNext(Object)} up to the maximum number defined by {@link Subscription#signalAdditionalDemand(int)}</li>
 * <li>Single invocation of {@link #onError(Throwable)} or {@link #onCompleted()} which signals a terminal state after which no further events will be sent.
 * </ul>
 * <p>
 * Demand can be signalled via {@link Subscription#signalAdditionalDemand(int)} whenever the {@link Subscriber} instance is capable of handling more.
 *
 * @param <T>
 */
public interface Subscriber<T> {
    /**
     * Invoked after calling {@link Publisher#subscribe(Subscriber)}.
     * <p>
     * No data will start flowing until {@link Subscription#signalAdditionalDemand(int)} is invoked.
     * <p>
     * It is the resonsibility of this {@link Subscriber} instance to call {@link Subscription#signalAdditionalDemand(int)} whenever more data is wanted.
     * <p>
     * The {@link Publisher} will send notifications only in response to {@link Subscription#signalAdditionalDemand(int)}.
     * 
     * @param s
     *            {@link Subscription} that allows requesting data via {@link Subscription#signalAdditionalDemand(int)}
     */
    public void onSubscribe(Subscription s);

    /**
     * Data notification sent by the {@link Publisher} in response to requests to {@link Subscription#signalAdditionalDemand(int)}.
     * 
     * @param t
     */
    public void onNext(T t);

    /**
     * Failed terminal state.
     * <p>
     * No further events will be sent even if {@link Subscription#signalAdditionalDemand(int)} is invoked again.
     * 
     * @param t
     */
    public void onError(Throwable t);

    /**
     * Successful terminal state.
     * <p>
     * No further events will be sent even if {@link Subscription#signalAdditionalDemand(int)} is invoked again.
     */
    public void onCompleted();
}

/**
 * A {@link Subscription} represents a one-to-one lifecycle of a {@link Subscriber} subscribing to a {@link Publisher}.
 * <p>
 * It can only be used once by a single {@link Subscriber}.
 * <p>
 * It is used to both signal desire for data and cancel demand (and allow resource cleanup).
 *
 */
public interface Subscription {
    /**
     * No events will be sent by a {@link Publisher} until demand is signalled via this method.
     * <p>
     * It can be called however often and whenever needed.
     * <p>
     * Whatever has been signalled can be sent by the {@link Publisher} so only signal demand for what can be safely handled.
     * 
     * @param n
     */
    public void signalAdditionalDemand(int n);

    /**
     * Request the {@link Publisher} to stop sending data and clean up resources.
     * <p>
     * Data may still be sent to meet previously signalled demand after calling cancel as this request is asynchronous.
     */
    public void cancel();
}
DougLea commented 10 years ago

Ben, this is starting to look a lot like TCP windowing. Have you considered going all the way?

subscribe takes a window size argument that is used to create/size a buffer ack() replaces signalAdditionalDemand to tell the publisher that one more item has been processed so it can slide/advance window. Optionally support ack(int n) to allow bulk advance.

rkuhn commented 10 years ago

@benjchristensen You do notice that this is exactly the status quo, don’t you? (module one name change)

Including this “factory” in the SPI means that the concern of subscribing multiple times to the same data source does become a part of this specification. Please state the precise semantics that users shall expect when creating multiple Subscriptions for the same Publisher.

benjchristensen commented 10 years ago

You do notice that this is exactly the status quo

If so, great, but that's not what was understood when starting this discussion.

There are two things being proposed here that I perceived as different:

1) Remove the API and keep just the SPI ... only Publisher/Subscriber/Subscription (but that's the intent of https://github.com/reactive-streams/reactive-streams/pull/25 .. it has just blended into this conversation when we started discussing signatures) 2) Removing the requirement to support multicasting, leaving it completely up to the Publisher implementation whether it was sharing underlying resources or not.

benjchristensen commented 10 years ago

Ben, this is starting to look a lot like TCP windowing.

@DougLea I don't understand the details of TCP windowing well enough to really comment on that.

In what you wrote though, the important difference is ack(int n) versus just ack().

benjchristensen commented 10 years ago

Please state the precise semantics that users shall expect when creating multiple Subscriptions for the same Publisher.

From the user they should not see anything different. Each Subscription/Subscriber is a one-to-one relationship.

The key thing is that Publisher.subscribe can be invoked n times, and the Publisher can choose whether it's sharing underlying resources or not.

Perhaps all the confusion is simply over the definition of multicasting and multiple subscribers?

From your perspective, does the following from the README mean the same thing I'm saying?

A Publisher can serve multiple subscribers subscribed dynamically at various points in time. In the case of multiple subscribers the Publisher should respect the processing rates of all of its subscribers (possibly allowing for a bounded drift between them). It must eventually clean up its resources after all of its subscribers have been unsubscribed and shut down. A Publisher will typically support fanning out to multiple Subscribers in order to support the dynamic assembly of processing networks from building blocks that can freely be shared.

rkuhn commented 10 years ago

This is exactly where the discussion started: we currently mandate one very specific form of fan-out behavior (i.e. all Subscribers see the same values at the same rate, dictated by the slowest one and allowing for some batching / buffering). The point was that requiring this fan-out complicates the implementation for the most common case (which does not use this) while not being really general (since it is a very specific kind of fan-out).

So, we set out to remove the mandatory fan-out. But that requires that we have a type in the SPI which supports exactly one Subscriber, and which can be passed around. The current Subscription does not fit the bill, because it is already tied to a Subscriber when it is created.

I am currently a bit lost where exactly your proposal is aimed at.

DougLea commented 10 years ago

@benjchristensen The main difference between TCP and current scheme is that TCP sets up window (buffer) size (call it N) on subscribe. Producers initially send N items, They can send item N+1 after getting first ack, and so on. It is less flexible than requestMore (or its aliases) in that the buffer size is fixed at subscribe time.

benjchristensen commented 10 years ago

@rkuhn Please help me understand where the confusion is. I'll attempt to summarize here:

Publisher

Factory class that can be subscribed to n times. It is up to the Publisher implementation to decide whether it is sharing a stream between Subscribers or not.

Life-cycle: Stateless. Factory (ie. could be a singleton). Could be a new instance each time. It's up the providing library.

Subscription

Always created by Publisher and has a one-to-one relationship with a single Subscriber instance. It represents a subscribe lifecycle.

Life-cycle: Stateful. One subscription with a single Publisher and Subscriber

Subscriber

Always created by the user, has a single subscription lifecycle. It can be used once-and-only-once to subscribe via Publisher.subscribe, will receive a single Subscription to request data, and once it receives onComplete or onError (or invokes Subscription.cancel) it is done and can not be reused.

Life-cycle: Stateful. One subscription with a single Publisher and Subscription


Using just these 3 types with that contract above we don't need the Producer/Consumer types of the API and the SPI/API can merge into just these 3.

It also allows libraries to implement Publisher as a public type that they pass around in their code and anybody can safely subscribe to them whenever.

It also allows Publisher implementations to freely manage resources however they wish (new resource allocation per subscribe, or shared across multiple).

benjchristensen commented 10 years ago

@DougLea If I understand correctly it means in our model we have one extra round-trip where Publisher emits the Subscription via onSubscribe which then can request(int n) to start data flowing.

We could theoretically eliminate the first step by changing the signature.

We could make the Subscription be returned immediately to remove the first required callback:

public interface Publisher<T> {
    public Subscription subscribe(Subscriber<T> s);
}

We could then also eliminate the first request by making it part of the subscribe:

public interface Publisher<T> {
    public Subscription subscribe(Subscriber<T> s, int requested);
}

The Subscription class can then still be used for receiving additional events beyond what was originally asked for in the subscribe:

public interface Subscription {
    public void request(int n);
    public void cancel();
}

This would allow for eliminating the first round of message passing so the network request (or whatever) can be kicked off immediately, while still allowing the Subscriber to adjust demand as it wants and retaining batch demand requests.

rkuhn commented 10 years ago

@benjchristensen The Publisher you subscribe is quite limited in applicability: there are many kinds of streams which cannot fulfill this contract. This is the reason why I come back to this.

To put it as precisely as I can: we need a type in the SPI that represents a stream of elements that cannot support multi-cast. If we don’t do this then we in effect mandate multi-cast support for all Publishers representing concrete (i.e. not factory copyable) stream sources, think server socket TCP connection for example.

benjchristensen commented 10 years ago

we need a type in the SPI that represents a stream of elements that cannot support multi-cast.

Are you referring to things such as reading a file? a RESTful request/response? iterating a list starting at position 0? If so, each time subscribe is invoked it creates the new resources. This is exactly why my example above did.

However, the exact same signature can be used on mouse events, stock prices, an infinite TCP stream firing events (like metrics). As long as the Publisher has >0 subscribers it keeps the stream alive and sends the events to each subscriber as per their requests, dropping/buffering as per its implementation.

The Publisher you subscribe is quite limited in applicability: there are many kinds of streams which cannot fulfill this contract.

I can't think of any that this can't accommodate, so can you please provide use cases?

rkuhn commented 10 years ago

You postulate that you only pass around that thing which knows how to connect to or create the real source, but that is far too limiting! Being able to perform a REST call cannot be presumed, and in the example I gave (consuming the incoming byte stream in a TCP server process) you have no choice: you cannot make a new connection.

rkuhn commented 10 years ago

In other words: using your proposed semantics, Reactive Streams are limited in applicability to those scenarios in which the stream source (i.e. Publisher) is actually stateless. This is a very strong and unnecessary restriction.

benjchristensen commented 10 years ago

you have no choice: you cannot make a new connection.

Yes I understand, and that's totally capable of being handled by the Publisher.subscribe mechanism.

So what is the concrete use case that does not work with this signature?

rkuhn commented 10 years ago

How can this be handled without being forced to support multi-cast?

benjchristensen commented 10 years ago

Reactive Streams are limited in applicability to those scenarios in which the stream source (i.e. Publisher) is actually stateless.

No it's not, the Publisher itself is stateless as it's a factory, but it is just a facade. It can reference stateful things under the covers just fine.


    static class MulticastHotPublisher implements Publisher<Integer> {

        static {
            // stateful stuff here
        }

        @Override
        public void subscribe(final Subscriber<Integer> s) {

            s.onSubscribe(subscriptionThatUsesStatefulStuff);

        }

    }
benjchristensen commented 10 years ago

How can this be handled without being forced to support multi-cast?

I provided a fully functioning example above that does not do multicast with this signature. Do you agree that each subscribe in that example is unicast?

rkuhn commented 10 years ago

Given a TCP connection which was already accepted from a listen socket, how do you represent that as a Publisher without being forced to implement multi-cast?

DougLea commented 10 years ago

@benjchristensen Yes, about bypassing initial round-trip. Plus the added restriction that the initial subscribe call establishes max buffer size -- acks cannot expand it. Which is good for resource management.

benjchristensen commented 10 years ago

Roland and I spent the last hour talking on Skype about this instead of littering this issue with more back and forth :-)

We both agree that the Subscriber/Subscription contract needs to be (and is) very clearly specified. This is a good thing and we don't have disagreement here.

The disconnect is about Publisher. I like Publisher as a simple factory type and want it to have very loosely specified behavior that allows implementations to do whatever they wish (unicast, multicast, whatever) as long as they obey the Subscriber/Subscription contract.

Roland has concern with a type existing in the spec that is so loosely specified and thus would rather not have it and basically just have Subscriber/Subscription since the spec can not clearly mandate what Publisher does. My counterargument is that without Publisher the interop and library implementations will be weakened since they will all have to create custom factory (publisher) types. I also think it is clear what Publisher does – it produces data to a Subscriber according to its contract via Subscription.

My view is that having Publisher loosely specified is a good thing. If it is free to be implemented however it wants as long as it maintains the Subscriber/Subscription contract it can then model all possible use cases.

It is then up to the concrete types implementing Publisher to communicate what they do (such as read a file, multicast the NY Stock Exchange, submit a request/response to a REST API, etc).

As I see it, the contract that works for these 3 types is:

rkuhn commented 10 years ago

Thanks for the summary, Ben, and I agree with it. Having a Publisher type in the specification with the semantics you describe is useful (albeit ugly in the sense of being so vague). Since the question of whether to allow multiple subscriptions is so fundamental, I would still like to include a type that expresses a Publisher without the ability to handle more than one subscription. The contract of this type would be a strict semantic subset of Publisher, it would support exactly the same subscribe operation, but only once. If we do not include such a type, then every implementation will create something like it—at least for internal use.

This is my proposal:

trait SingleUsePublisher[T] {
  def subscribe(_: Subscriber[T]): Unit
}

trait Publisher[T] extends SingleUsePublisher[T]

A consumer of Publishers can then specify whether it needs single or multiple use, and a producer of Publishers can express that it is sometimes more efficient to provide a single-use Publisher (as is the case for representing the read-side of a live TCP connection). The other reason is that then we have a fully functional set of SingleUsePublisher/Subscription/Subscriber which has very tightly specified semantics, allowing code to stay away from the arbitrariness of Publisher if desired.

Creating this type in individual libraries is not possible, because they can only derive types and not inject supertypes, and having SingleUsePublisher extend Publisher would not express the right semantics.

sirthias commented 10 years ago

@rkuhn With this proposal, would you require a Publisher to always support multiple subscriptions? Because, if not, then I still won't know if a given Publisher actually does accept multiple subscriptions. The only difference to SingleUsePublisher would be that a Publisher might support multiple subscriptions, while a SingleUsePublisher will never support multiple subscriptions.

Semantically a SingleUsePublisher would therefore still fit the Publisher contract, while a Publisher that does support multiple subscriptions would not fit the SingleUsePublisher contract, even though it forms a sub-type of the latter. Therefore, one could argue that SingleUsePublisher extends Publisher would be the better design, since it wouldn't violate the LSP (Liskov) while Publisher extends SingleUsePublisher does, no?

sirthias commented 10 years ago

Note that, even though with the SingleUsePublisher extends Publisher design an implementation could introduce SingleUsePublisher in its own scope, there might still be value in making it part of the API/SPI in order to have a common way to express no-multiple-subscriptions semantics across implementations.

benjchristensen commented 10 years ago

Thanks for the summary, Ben, and I agree with it.

I'm glad we agree on the first 3 types then: Publisher, Subscriber and Subscription.

However, I think it clutters and confuses things to add SingleUsePublisher and it is not needed.

then every implementation will create something like it—at least for internal use.

Not every implementation. RxJava always represents it as an Observable. The rx.Subscriber does not need to know if an rx.Observable is doing multicast or unicast.

Specifying "single use" vs "multi use" is not necessary for the interface, and a library like Akka can do it if it needs to, but it doesn't need to affect the standard.

A consumer of Publishers can then specify whether it needs single or multiple use

Why does a consumer never need to specify this? It's implicit in usage for one thing, and secondly, a single consumer (a Subscriber) canl only ever subscribe once.

very tightly specified semantics, allowing code to stay away from the arbitrariness of Publisher if desired

This is mixing a particular implementation requirement with the interface definition.

I am fully capable of implementing Publisher in such a way that it behaves well in both unicast and multicast use cases. The concrete type of my Publisher would communicate this, such as "FilePublisher" or "NYSEPublisher" or "MouseEventPublisher".

The code speaks, and it shows that the simplest interface design works very well in allowing the greatest flexibility and that's the whole point of an interop standard. The Subscriber implementations do not need to know if the Publisher is doing multicast or unicast (they don't care since they can only ever subscribe once anyways) and a Publisher makes that decision without any care of what the Subscriber will do. It is driven by the underlying data stream.

In the case of Akka, the actors needs to make a concrete decision what to do if subscribed to multiple times. That has nothing to do however with the reactive-stream interfaces, that is a concrete implementation. Thus, Akka could have SingleUseActorPublisher.

Is there a use case that can not be efficiently implemented with the 3 types and these contract rules?

If there is a use case, I'd like to see code, as we'll continue talking in circles otherwise. I'd appreciate the code in Java as well since that's what the standard is being defined in and has the broadest readability for everyone involved.