reactive-streams / reactive-streams-jvm

Reactive Streams Specification for the JVM
http://www.reactive-streams.org/
MIT No Attribution
4.81k stars 530 forks source link

Clarification request for Rule 2.7 #405

Closed pavelrappo closed 6 years ago

pavelrappo commented 7 years ago

A Subscriber MUST ensure that all calls on its Subscription take place from the same thread or provide for respective external synchronization.

Given rule 1.3, technically, which thread a subscriber's method is called in is out of this subscriber's control. As far as I understand most often Subscription's methods are called from Subscriber's methods synchronously.

Thus, either all calls to Subscription should be properly synchronized all the time (but that's not what we probably want) or the rule should be rephrased.

(Sorry if it looks like nitpicking.)

akarnokd commented 7 years ago

Rule 3.3 mandates limited recursion. The external synchronization and limited recursion can be solved via the same constructs such as dedicated actors, atomic state machines and/or the queue-drain serialization approach.

Thus, either all calls to Subscription should be properly synchronized all the time

Depends on the platform. Think about JavaScript where there is only one thread and thus everything is a synchronous call.

As far as I understand most often Subscription's methods are called from Subscriber's methods synchronously.

Not necessarily. For example, an intermediate Publisher may prefetch data from its source even if the end consumer hasn't requested anything. This end-consumer can now request data which triggers the onNext emission by the Publisher. After a certain threshold, the Publisher may decide to prefetch even more data from its upstream, thus you have practically a request() call triggering more request() from upstream.

viktorklang commented 7 years ago

@pavelrappo RE you comment on 1.3, see 3.1: https://github.com/reactive-streams/reactive-streams-jvm#3.1

pavelrappo commented 7 years ago

@viktorklang This "context" the rule is talking about is a bit hard to define. I'm sure this question has been asked many times before, so pardon me for asking it again:

What is this "context" the rule is talking about?

Is it invocations of Subscriber's methods? Is it any place (method, class, etc.) which the subscriber is aware of and permits calls to its Subscription from? (The thread the call is made in is obviously important, but is rather an orthogonal characteristic. Or is it not?)

pavelrappo commented 7 years ago

@akarnokd

Depends on the platform. Think about JavaScript where there is only one thread and thus everything is a synchronous call.

I think it's fair to say we are talking about JVM and threads. Otherwise this spec might need to become more general than it is today.

Suppose I have a Publisher I don't know anything about, except that it produces the type of items my Subscriber consumes. My Subscriber works as follows. In onSubscribe it requests a single item. After the item has been delivered to onNext, the subscriber starts a timer, say, for half an hour. When the timer has ended, the subscriber requests one more item, rinse repeat.

How should "external synchronization" in this case look like?

viktorklang commented 7 years ago

@pavelrappo The delivery of the "tick" from the timer needs to be delivered to the Subscriber somehow, how is that signal interleaved with signal (onX) processing?

pavelrappo commented 7 years ago

@viktorklang Not sure I understood your question. What I am asking though is how to provide "external synchronization" in this (schematic) case:


    public class MySubscriber<T> implements Subscriber<T> {

        Subscription subscription;

        @Override
        public void onSubscribe(Subscription s) {
            // ...
            (subscription = s).request(1);
        }

        @Override
        public void onNext(T item) {
            // ...
            new Thread(() -> {
                try {
                    TimeUnit.MINUTES.sleep(30);
                } catch (InterruptedException e) {
                    subscription.cancel();
                    return;
                }
                subscription.request(1);
            }).start();
        }
        // ...
    }
viktorklang commented 7 years ago

@pavelrappo Your example would be problematic, since you're not safely publishing the Subscription between the thread which executes the onNext-signal and the thread which sleeps, and back.

Have you had a look at the examples? https://github.com/reactive-streams/reactive-streams-jvm/tree/master/examples/src/main/java/org/reactivestreams/example/unicast

akarnokd commented 7 years ago

Actually, Thread.start() acts as a barrier so store to the subscription field in onSubscribe happens before the execution of the Runnable which then uses that field.

The example shows that request() can be called from a different thread than onNext executes, thus the usual reentrancy-avoiding logic, such as setting a flag at the beginning of the request() implementation and clearing it at the end is not enough.

Depending on what the Publisher does, the thread-safe and reentrant-safe practical implementation could be:

In all three cases, the Subscriber can call Subscription.request() from any thread at any time and get the correct and thread-safe behavior from the Publisher/Subscription implementation.

pavelrappo commented 7 years ago

@viktorklang 1) Thanks for the link. I will have a look at it no doubt. 2) I can't see how subscription is not visible to the created thread.

  1. Publisher guarantees happens-before between calling onSubscribe and onNext.
  2. As per JMM: An action that starts a thread synchronizes-with the first action in the thread it starts. and If an action x synchronizes-with a following action y, then we also have hb(x, y) and If hb(x, y) and hb(y, z), then hb(x, z) and If x and y are actions of the same thread and x comes before y in program order, then hb(x, y)

Therefore, actions in onSubscribe happen-before any action in a newly created thread.

viktorklang commented 7 years ago

@pavelrappo Yes, the barrier to the new thread is fine, but there is no barrier back. (You cannot assume that the Publisher is synchronous.

viktorklang commented 7 years ago

@akarnokd In your example, if I understand you correctly, assuming that there are no other, non-volatile fields involved, the addAndGet would provide the necessary barriers both to-and-from. BUT: publishing the Subscription still needs to be done safely, and concurrent signalling is still bad.

pavelrappo commented 7 years ago

@viktorklang I guess that's what my question was about. What external synchronization I should use to ensure Subscription's methods are called in a thread-safe manner?

viktorklang commented 7 years ago

@pavelrappo Depends on how you implement your Subscriber. Is it a synchronous Subscriber? Is it async? If it is async, how is its execution controlled? Can your timer be hooked into that execution?

I think it makes sense for you to have a look at the examples, and see if that helps a bit or not. :)

viktorklang commented 6 years ago

Is everything addressed here?

viktorklang commented 6 years ago

@pavelrappo Ping

pavelrappo commented 6 years ago

@viktorklang yes, thank you

viktorklang commented 6 years ago

@pavelrappo perfect, thank you.

thekalinga commented 6 years ago

@akarnokd @viktorklang Does this rule exists to guarantee to the Publisher that he sees state modifications that might have happened inside Subscription object due to request(long)/cancel, which are further required for the Publisher to fulfil its requirements such as

  1. Sending that many more onNexts corresponding to request &
  2. Eventual cancellation of the subscription & thus release subscriber ref from publisher?

If so, why cant the Subscription implementation methods request & cancel themselves add these visibility changes inside so that subscriber is freed from this responsibility (as most of the users of the libary have extremely limited understanding of concurrency)

viktorklang commented 6 years ago

@thekalinga Thanks for you question, I hope I can answer it!

If all Subscriber implementations need to employ coordination for all their operations, it introduces overhead for everyone. Reactive Streams is not an end-user API, and it is not expected that developers implement any of the interfaces without understanding the spec or verifying the implementation using the TCK. See this example to get an overview of which concerns need to be addressed in the normal case for an async Subscriber implementation.

Cheers, √

thekalinga commented 6 years ago

@viktorklang Thanks for your response. I've downloaded the source. Will go over the implementation before I can ask any (semi) educated questions

thekalinga commented 6 years ago

@viktorklang One things comes to mind (posting here so that I might not forget it)

Yes, the barrier to the new thread is fine, but there is no barrier back. (You cannot assume that the Publisher is synchronous.

Can you please clarify what does "barrier back" mean. What does the publisher "not see" if there is no external synchronization from subscriber while invoking methods on Subscription

Is it the state changes inside the Subscription object that Publisher passed down to Subscriber that request & cancel might introduce inside the Subscription object?

In so, can the responsibility be pushed on to Publisher (the creator of Subscription) instead of Subscriber, because Publisher knows insideout of the Subscription & usually Subscriber is clueless (is supposed to be) about the internals of Subscription

viktorklang commented 6 years ago

@thekalinga If you think about it in a fork+join manner, there is a fork (the new thread) but there is no join phase.

thekalinga commented 6 years ago

@viktorklang But the underlying reason is still not explained. What happens when you don't join (synchronization) & why is join even required? That's what I'm trying to find as the rule says we need to synchronise some how, but why synchronise in the first place on the subscriber side?

viktorklang commented 6 years ago

@thekalinga The Subscriber needs to do coordination if it runs asynchronously, and since concurrent signalling is forbidden, if it exposes the Subscription it needs to ensure that no concurrent signalling takes place.

thekalinga commented 6 years ago

Looks like I got the answer (not sure if its correct). Will post here for others who might have the same query as me

From Spec explanation of 2.7:The intent of this rule is to establish that external synchronization must be added if a Subscriber will be using a Subscription concurrently by two or more threads.

This implies that its required because request & cancel introduce side effects (to track how much more demand needs to be fulfilled) either in Subscription/Publisher

My question was "If above is true, shouldn't this be the responsibility of Publisher to make sure that whatever the Subscription implementation he is sending to Subscriber is thread safe when request & cancel are invoked? Why should Publisher expect his internal side effects be guarded by Subscriber?"

After thinking about it for a while, I see that most Subscribers invoke Subscription methods request/cancel within the same onNext thread (Publisher thread) synchronously. So any thread safety code Publisher might keep to handle thread safety for Subscription method calls will degrade the performance for all the above synchronous Subscriber due to cost of memory fences (Question: Shouldn't JIT optimise this on its own? i.e even if guards are in place, when it encounters an execution path at runtime that's guarded but none of the statements pass Subscription object to any other thread [synchronous invocation scenario], should it skip the unnecessary memory fence for that specific path?)

If that the case (Assuming JIT does not optimize), then every Subscriber will pay memory fence penalty for every request & cancel whether synchronous/asynchronous. To avoid this penalty, if Publisher pushes this memory fence responsibility onto Subscribers who want to invoke Subscription methods asynchronously (i.e they invoke Subscription methods from non Publisher threads aswell). If these kinds of Subscribers cooperates with Producer in making sure that the Subscription sideeffects are Publisher, then all synchronous Subscription invocations would be much faster

Since Publisher will obviously be handling its upstream onNexts in a separate thread in the asynchronous demand passing case, which will most likely be not the same as Subscriber async thread (used to call request/cancel), unless Subscriber synchronises Subscription method invocations, Publisher thread handling onNext may not see the side effects inside Subscription/Publisher done by request & cancel (triggered from Subscribers async thread). If Subscriber dont cooperate, we will most likely see data races (& corruption)

If Subscriber cooperates & ensures synchronisation, Publisher can do his job of signalling correctly without any inconsistent state, while all other synchronous Subscribers will benefits from performance benefits (no explicit synchronisation points inside Subscription for request/cancel)

PS: Not sure if this is the whole explanation. May be I'm missing something/wrong

thekalinga commented 6 years ago

Not sure whether its correct. Can you please confirm?

thekalinga commented 6 years ago

The Subscriber needs to do coordination if it runs asynchronously, and since concurrent signalling is forbidden, if it exposes the Subscription it needs to ensure that no concurrent signalling takes place.

Why request/cancel should not be performed concurrently? What are we trying to achieve by doing enforcing this restriction?

Does my above explanation address these questions?

viktorklang commented 6 years ago

@thekalinga Let's take a step back—what problem are you looking to solve?

thekalinga commented 6 years ago

@viktorklang I'm trying to understand the reason for the existence of this rule in reactive streams spec, which I need to create a training material for each of the rules of reactive streams spec

PS: Thanks for your time

viktorklang commented 6 years ago

@thekalinga Ok. :) My hope would be that the Intent sections for these rule answer why they exist. For 2.7 specifically, it is about setting the appropriate expectations for Subscriber implementations. I've found a good exercise to be to imagine that the rule doesn't exist, and what would happen if code didn't take it into consideration.

thekalinga commented 6 years ago

@viktorklang Is the above explanation https://github.com/reactive-streams/reactive-streams-jvm/issues/405#issuecomment-414318349 on the mark/did I go wrong somewhere?

thekalinga commented 6 years ago

The current explanation is too terse & does not give any underlying reasons. It just says "do this" & does not explain clearly why one should, especially when the whole point of forcing Publisher to make serialised calls is to make sure that Subscriber deals with as little concurrent data structures as possible (as they are very easy to get them wrong)?

If there is an explanation of what happens if this is not followed will give people clear justification of why this rule exists. Most of the rules of reactive streams are easy to follow & obvious to anyone with basic understanding of java threading, but rules like this are not that obvious

viktorklang commented 6 years ago

@thekalinga Perhaps the easiest answer is that the rule exists to prevent un-coordinated calls to request, and to make sure that the Subscription is safely published between threads.

@reactive-streams/contributors Anyone have a better phrased explanation?

thekalinga commented 6 years ago

@viktorklang Since a hot Publisher can have many Subscribers, a hot Publisher should guard its request/cancel appropriately & expect Subscription methods be called from multiple threads, AFAIK. So I'm not sure whether what you gave is the full explanation

viktorklang commented 6 years ago

@thekalinga Any Publisher can have many Subscribers, but they all get their own Subscription. It's not whether the Subscriber will run on another thread, it is about if the Subscriber uses the Subscription on many threads concurrently to each other not concurrently with the Publisher.

thekalinga commented 6 years ago

@viktorklang Yes you are right that every subscriber will have its own subscription

Even then the side effects of all Subscriptions either inside Subscription/Publisher (if Subscription is a simple passthru) should be seen by Publisher to track/cancel demand for each Subscriber. So it affects Publisher indirectly

It's not whether the Subscriber will run on another thread, it is about if the Subscriber uses the Subscription on many threads concurrently to each other not concurrently with the Publisher.

But I am not sure why the above needs to be done by Subscriber. Asking Subscriber to do this job breaks Single responsibility principle & encapsulation (Please note that any rule can broken but only with a good justification like say performance impact). If the passed Subscription/Publisher encapsulates thread safety inside itself (which I believe it should), there is no need for Subscriber to worry about whether the passed in Subscription is thread safe or not. He should be able to call the method from anywhere he wants without any issue

If Subscription/Publisher pair has already taken care of thread safety issues (in RxJava Subscription does take care of this aspect as per @akarnokd), its unnecessary to do one more round of thread safety checks in Subscriber & even unreasonable for us to expect Subscriber to blindly deal with the thread safety because Subscription might not have taken care of its own thread safety

The thread safety responsibility of Subscription methods are better encapsulated in Subscription itself instead of pushing them to Subscriber

I had a conversation with @akarnokd on twitter. Adding that here as a reference. Can you please go over it.

https://twitter.com/1tontech/status/1031548972572110858

Thanks for your time

viktorklang commented 6 years ago

@thekalinga

But I am not sure why the above needs to be done by Subscriber.

I believe I have already explained why coordination needs to be done by the Subscriber if the Subscriber implementation has decided to use possibly different threads of execution at different, or possibly the same, time. I'm not sure how I can say it in a different way? :/

Asking Subscriber to do this job breaks Single responsibility principle & encapsulation

Absolutely not. IF the Subscriber decides to use multiple threads then of course the Subscriber is required to coordinate access to its resources. I hope we can agree here.

(Please note that any rule can broken but only with a good justification like say performance impact)

???

. If the passed Subscription/Publisher encapsulates thread safety inside itself (which I believe it should), there is no need for Subscriber to worry about whether the passed in Subscription is thread safe or not

It is a bit of a truism to say that coordination doesn't need to be done if coordination is already done.

I would hope that it is a bit of a non-issue, as don't you agree that if someone implements a multi-threaded Subscriber, they had better understand how to do so safely?

If the Subscription/Publisher pair has already taken care of thread safety issues (in RxJava Subscription does take care of this aspect as per @akarnokd), its unnecessary to do one more round of thread safety checks in Subscriber & even unreasonable for us to expect Subscriber to blindly deal with the thread safety because Subscription might not have taken care of its own thread safety

See above.

The thread safety responsibility of Subscription methods are better encapsulated in Subscription itself instead of pushing them to Subscriber

By that logic, should all collections in Java be thread safe?

I had a conversation with @akarnokd on twitter. Adding that here as a reference. Can you please go over it.

Better to ask @akarnokd to reply here

viktorklang commented 6 years ago

@thekalinga Please let me know if there's anything I can elaborate on which would be of help, I'm not sure I can add much else to this specific conversation. Some other @reactive-streams/contributors might have a different perspective than I. Thanks for raising the question!

thekalinga commented 6 years ago

@viktorklang Thanks for your response

By that logic, should all collections in Java be thread safe?

No because Subscriber is not initialising Subscription & is clueless about how demand is managed inside Publisher when he makes calls on Subscription. Since Publisher is creating Subscription, he knows what kind of implementation is being used for Subscription & should be in a position to make a judgement as to how to protect from multi threaded calls and also do it efficiently

For eg., If I create an object, I am in a better position to handle thread safety for this object efficiently than the users of my object. I assume the reason for the existance of non thread safe collections in java is performance, but who ever owns these objects them should take care of the responsibility of managing the thread safety rather than asking all its users to worry about it. So all Publishers are operating in multi threaded environment should be aware that his methods can be invoked simultaneously from multiple threads so should guard against that scenario

Everything is boiling down to who owns the responsibility. Tho there is one to one mapping between Subscription & Subscriber, the implementation used for Subscription is known only by Publisher as he creates it and passes down, so is in a better position to guard than Subscriber

I would hope that it is a bit of a non-issue, as don't you agree that if someone implements a multi-threaded Subscriber, they had better understand how to do so safely?

Sure, he is. But do u not agree that in multiple subscriber case, the hot publisher must take care of thread safety even if Subscriber has ensured thread safety before he called request, as simultaneous requests from multiple Subscriptions from different threads can still make Publisher unsafe unless thread safety is ensured inside Publisher? If so, we might aswell NOT force Subscriber to take care of multi threaded issues, as we will have double synchronisation, one at Subscriber & another at Publisher level aswell

In case of cold publisher case, it will give some performance edge if async subscribers (those that call Subsciption asynchronously) are forced to take care of thread safety. This way those subscribers that are using Subscription synchronously will not pay any synchronisation penalty as they are publishing their demand on the Publisher thread itself

If Subscriber is forced to own the synchronisation & Publisher too is not relieved of synchronisation issue (hot publisher case), why not push this responsibility to Subscription/Publisher combination, who knows how to deal concurrency in the most effective way because they know the implementation details, where as Subscriber is not

I only see that the cold publisher case giving some performance benefits to justify Subscriber sharing this responsibility, else Publisher should own this responsibility (which is what I mentioned here aswell https://github.com/reactive-streams/reactive-streams-jvm/issues/405#issuecomment-414318349)

(Please note that any rule can broken but only with a good justification like say performance impact) ???

I was saying rules like single responsibility principle can be broken if it gives performance advantage

Please let me know if I'm wrong in my analysis

Thanks

thekalinga commented 6 years ago

Updated my previous comment as I found it somewhat incorrect. Now it reflects my views correctly

DougLea commented 6 years ago

I'm a little confused by some parts of this exchange. But maybe this helps: There is a classic race condition on any variable representing demand: the subscriber increases, the publisher decreases. If, in some special cases, you can somehow arrange that this race does not occur, you can relax synchronization. But in general, you cannot do so while still conforming to spec.

viktorklang commented 6 years ago

@thekalinga Thanks for updating the comment, I hope I'm answering your questions as it is likely that this response will be my last in this conversation as I am not sure I have anything more to add here, I'm afraid. :(

By that logic, should all collections in Java be thread safe?

No because Subscriber is not initialising Subscription & is clueless about how demand is managed inside Publisher when he makes calls on Subscription.

Imagine the following scenario:

void onSubscribe(java.util.List list) { // Would you assume that `list` is thread safe here? } >Since Publisher is creating Subscription, he knows what kind of implementation is being used for Subscription & should be in a position to make a judgement as to how to protect from multi threaded calls and also do it efficiently Since a Subscription represents a 1-to-1 relationship, it does not need to assume that it is going to be invoked in parallel by its Subscriber, without sufficient coordination. This is a classic situation of Single-Producer-Single-Consumer vs Multiple-Producer-Single-Consumer. These different modes have different performance (and scalability) implications. >For eg., If I create an object, I am in a better position to handle thread safety efficiently than the users of my object when the user makes a call on any of my object. I assume the reason of non thread safe collections in java is performance, but who ever owns them should take care of the responsibility of managing the thread safety rather than asking all its users to worry about it. So all Publishers operating in multi threaded environment should be aware that his methods can be invoked simultaneously from multiple threads so should guard against that scenario If your position is—and I don't know that it is—that all classes should be thread-safe, then I'm certain that this is not the place for that conversation, and I hope you agree with me there. >Everything is boiling down to who owns the responsibility. Tho there is one to one mapping between Subscription & Subscriber, the implementation used for Subscription is known only by Publisher as he creates it and passes down, so is in a better position to guard than Subscriber What I know is that nothing is in a better position to deal with the thread-safety of a Subscriber which chooses to implement its own functionality using multithreading than in that very Subscriber. >>I would hope that it is a bit of a non-issue, as don't you agree that if someone implements a multi-threaded Subscriber, they had better understand how to do so safely? >Sure, he is is. But do u agree that in multiple subscriber case (hot source), Publisher must take care of thread safety even if Subscriber has ensured thread safety before he called request, as simultaneous requests from multiple Subscriptions from different threads can still make Publisher unsafe unless thread safety is ensured inside Publisher? If so, we might aswell NOT force Subscriber to take care of multi threaded issues, as we will have double synchronisation, one at Subscriber & another at Publisher level aswell I'm not sure that we understand each other, so let me try to clarify my perspective: We are talking about the implementation of Subscriber. In the case of synchronous subscribers, a hot Publisher *still* has to manage coordination (thread safety and semantics) for possibly many Subscribers, since they could be subscribed from different threads. What 2.7 is about is when the implementation of Subscriber decides it's going to use multi-threading *internally* (or call out to some Executor or otherwise). So what the rule says is that if you are going to implement a Subscriber which uses multi-threading, you need to ensure safe publication of the Subscription and to ensure that there's a happens-before relationship between request-invocations. >In case of cold publisher case, it will give some performance edge if async subscribers (those that call Subsciption asynchronously) is forced to take care of multil threading. This way those subscribers that are using Subscription synchronously will not pay any synchronisation penalty. >If Subscriber is forced to own the synchronisation & Publisher too is not relieved of synchronisation issue (hot publisher case), why not push this responsibility to Subscription/Publisher combination, who knows how to deal concurrency in the most effective way because they know the implementation details, where as Subscriber is not Many Subscribers are actually also Publishers, since few streams are a single Publisher-Subscriber pair. >I only see that the cold publisher case gives some performance benefits to justify Subscriber owning this responsibility, else Publisher should own this responsibility (which is what I mentioned here aswell #405 (comment)) To my knowledge at this point: changing the spec to require that all Subscriptions are thread-safe under all usage patterns would be a breaking change, and breaking the compatibility of the spec would require extraordinary reasons. I'm sure you agree that the burden of evidence has to be on the person who proposes such a breaking change. Cheers, √
thekalinga commented 6 years ago

@DougLea @viktorklang

Thanks for your effort in helping me understand this. I tried my level best to explain what I'm trying to ask in this reply

To make things concrete, I wrote the following mock code. In this example we have multiple Publishers & Subscribers (Please note that this is pseudo code & is not spec complaint but only written to demonstrate the point I'm trying to make)

As you can see Safe27NonCompliantSubscriber violates 2.7, yet safe to use with SafePublisher. Also, Unsafe27CompliantSubscriber complies with 2.7, yet an unsafe Subscriber

// (mostly) works correctly with any subscriber even when demands are made simultaneously
// by multiple subscribers (i.e both as hot & cold publisher). Also well encapsulated
// `Subscription`/`Publisher` pair who manage thier internal state without any external help from `Subscriber`
class SafePublisher<T> implements Publisher<T> {
  AtomicLong unfilledDemand = new AtomicLong();
  AtomicBoolean done = new AtomicBoolean();

  @Override
  public void subscribe(Subscriber<? super T> subscriber) {
    subscriber.onSubscribe(new SafeSubscription());
  }

  class SafeSubscription implements Subscription {
    @Override
    public void request(long n) {
      for (;;) {
        long prevDemand = unfilledDemand.get();
        if (unfilledDemand.compareAndSet(prevDemand, prevDemand + 1)) {
          return;
        }
      }
    }

    @Override
    public void cancel() {
      done.set(true);
    }
  }

  void fulfillDemandSomehow() {
    // fulfills demand
  }
}

// rarely works correctly
class UnsafePublisher<T> implements Publisher<T> {
  long unfilledDemand;

  @Override
  public void subscribe(Subscriber<? super T> subscriber) {
    subscriber.onSubscribe(new UnsafeSubscription());
  }

  class UnsafeSubscription implements Subscription {
    @Override
    public void request(long n) {
      unfilledDemand += n;
    }

    @Override
    public void cancel() {
      unfilledDemand = 0;
    }
  }

  void fulfillDemandSomehow() {
    // fulfills unpredicatble amount of demand
  }
}

// 2.7 complaint safe subscriber (I hope its complaint :) )
class Safe27CompliantSubscriber<T> implements Subscriber<T> {
  Subscription s;
  AtomicInteger itemCount = new AtomicInteger();

  @Override
  public void onSubscribe(Subscription s) {
    this.s = s;
    new Thread(() -> {
      itemCount.addAndGet(1); // happens-before contract
      s.request(1);
    }).start();
  }

  @Override
  public void onNext(T t) {
    new Thread(() -> {
      int total = itemCount.addAndGet(1); // happens-before contract
      if (total < 10) {
        s.request(1);
      }
    }).start();
  }

  @Override
  public void onComplete() { }

  @Override
  public void onError(Throwable t) { }
}

// 2.7 complaint, but unsafe subscriber
class Unsafe27CompliantSubscriber<T> implements Subscriber<T> {
  PrivateState state;

  @Override
  public void onSubscribe(Subscription s) {
    // 2.7 complaint as we are only making calls on `Subscription` from within `Publisher` thread
    s.request(MAX_VALUE);
  }

  @Override
  public void onNext(T t) {
    new Thread(() -> {
      // No call to `Subscription` from non `Publisher` thread, yet have to guard concurrant modifications to `state` to work correctly
      // If not guarded properly bad things will happen
      doUnsafeSideEffectsToPrivateState(t);
    }).start();
  }

  @Override
  public void onComplete() { }

  @Override
  public void onError(Throwable t) { }
}

// 2.7 non complaint, but safe subscriber, takes care of its internal state (as it has side effect free actions)
class Safe27NonCompliantSubscriber<T> implements Subscriber<T> {
  Subscription s;

  @Override
  public void onSubscribe(Subscription s) {
    this.s = s;
    new Thread(() -> {
      s.request(1);
    }).start();
  }

  @Override
  public void onNext(T t) {
    new Thread(() -> {
      doSideEffectFreeAction(t);
      s.request(1);
    }).start();
  }

  @Override
  public void onComplete() { }

  @Override
  public void onError(Throwable t) { }
}

// 2.7 non complaint, unsafe subscriber
class UnSafe27NonCompliantSubscriber<T> implements Subscriber<T> {
  Subscription s;
  PrivateState state;

  @Override
  public void onSubscribe(Subscription s) {
    this.s = s;
    new Thread(() -> {
      s.request(1);
    }).start();
  }

  @Override
  public void onNext(T t) {
    new Thread(() -> {
      //Have to guard concurrant modifications to `state` to work correctly. If not guarded properly bad things will happen
      doUnsafeSideEffectsToPrivateState(t);
      s.request(1);
    }).start();
  }

  @Override
  public void onComplete() { }

  @Override
  public void onError(Throwable t) { }
}

class Entrypoint {
  public static void main(String[] args) {
    SafePublisher<Integer> safePublisher = new SafePublisher<Integer>();
    safePublisher.subscribe(new Safe27CompliantSubscriber<>());
    safePublisher.subscribe(new Unsafe27CompliantSubscriber<>());
    safePublisher.subscribe(new Safe27NonCompliantSubscriber<>());
    safePublisher.subscribe(new UnSafe27NonCompliantSubscriber<>());

    UnsafePublisher<Integer> unsafePublisher = new UnsafePublisher<Integer>();
    unsafePublisher.subscribe(new Safe27CompliantSubscriber<>());
    unsafePublisher.subscribe(new Unsafe27CompliantSubscriber<>());
    unsafePublisher.subscribe(new Safe27NonCompliantSubscriber<>());
    unsafePublisher.subscribe(new UnSafe27NonCompliantSubscriber<>());
  }
}

For all the above examples, which of the Publisher/Subscription combination is desirable, i.e which of them encapsulate their state well. I believe SafePublisher

This is what the spec says

2.7 Rule: A Subscriber MUST ensure that all calls on its Subscription take place from the same thread or provide for respective external synchronization.

Explanation: The intent of this rule is to establish that external synchronization must be added if a Subscriber will be using a Subscription concurrently by two or more threads.

My question has been, "how does its matter from which thread the Subscriber invokes the Subscription methods, so long as Publisher has ensured thread safety on its own for its internal state by ensuring thread safety during request & cancel?"

If Subscriber has not taken care of happens-before relationship, its should not affect Publisher contracts in anyway, it should only affect the correctness of Subscriber. Subscriber should take care of his own internal state management & so is Publisher when it comes to Subscription

But the rule states this Subscriber synchronization has something to do with Subscription, but I am not sure if it is/it should be (unless ofcourse there is a performance cost involved if Publisher does this), as Subscription can be well encapsulated by Publisher (I hope the above mock code demonstrates this)

@viktorklang

you need to ensure safe publication of the Subscription and to ensure that there's a happens-before relationship between request-invocations

I believe happens-before relationship inside Subscriber is for making sure that the Subscriber state is intact from its own point of view between multiple calls from Publisher & has nothing to do with happens-before relationship inside Subscription/Publisher (or) thier safety . Is that correct?

If correct, what does "safe publication of Subscription" means here (also inside the 2.7 rule)? If not, I'm not sure why it has anything to do with Publisher/Subscription safety

I'm sure I'm missing something obvious (may be my lack of clear understanding of multi threading or something) (or) I'm not seeing that's obvious in front of my eyes. Appreciate if anyone can clarify

If anyone can give a concrete example on how this rule helps as I believe I have provided counter examples for this rule

DougLea commented 6 years ago

I agree that "safely concurrent" and "obeys all rules" are NOT synonymous. And that it is possible to follow 2.7 and still construct an unsafe component. Do you have any rewording in mind that would help here?

thekalinga commented 6 years ago

@DougLea Do u agree with this statement

"Subscriber synchronisation is required only for ensuring Subscriber state safety between multiple signals from Publisher & has nothing to do with Subscription/Publisher state. Also, the thread from which Subscription methods are invoked should not affect the correctness of Publisher & Subscription"

If this can be agreed upon, then the changes to the definition can be worked out

Please let me know if that's your understanding too/I'm wrong anywhere

NOTE: This is not new definition/explanation, but only my understanding in concise form

DougLea commented 6 years ago

To rephrase: I agree that subscriber synchronization is not necessarily required to maintain protocol state correctness, but may be necessary to maintain subscriber state correctness.

viktorklang commented 6 years ago

I believe that it is perfectly valid according to the spec to create a Subscription which tracks a delivered long and a demanded long.

Such a Subscription would work perfectly fine as long as there is an external guarantee that request happens-before the next request.

And to reiterate, suddenly mandating that all Subscriptions be thread-safe in all interaction patterns would not be forwards-compatible (new Subscribers calling old Publishers).

thekalinga commented 6 years ago

Thank you both for your responses

@DougLea I concur with your statement

@viktorklang

Such a Subscription would work perfectly fine as long as there is an external guarantee that request happens-before the next request.

The tricky part is this section of your statement external guarantee that request happens-before the next request

As a contract, do you agree that an unsafe Subscriber should not break the functionality of Publisher/Subscription? Especially when we know in advance that end users like me (Subscribers) struggle to correctly maintain their own state, why should any upstream component (Publishers & Processors) expect Subscriber to give guarantees for upstream state (indirectly via Subscription) as well in addition to Subscribers own state?

Unless Subscriber needs to do this for some performance gain upstream (which again is not yet demonstrated), as of now, I dont see any serious justification for 2.7 in its current form

A Subscriber MUST ensure that all calls on its Subscription take place from the same thread or provide for respective external synchronization.

Other than the backward compatibility with old Publishers, as a principle, do u disagree with @DougLea's statement?

Can you please clarify

viktorklang commented 6 years ago

@thekalinga

Reactive Streams is a akin to a SPI, and end-users are not expected to implement the interfaces themselves—unless they know they can adhere to the spec and verifies the implementation with the TCK— instead they should be using implementations such as, but not limited to, Akka Streams, Reactor, RxJava etc. The benefit comes from being able to combine such implementations to get correct async, non-blocking, back-pressured streams.

As a contract, do you agree that an unsafe Subscriber should not break the functionality of Publisher/Subscription? Especially when we know in advance that end users like me (Subscribers) struggle to correctly maintain their own state, why should any upstream component (Publishers & Processors) expect Subscriber to take of the guarantees for upstream state as well in addition to Subscribers own state?

So, to be clear, we are talking about the situation where someone decides to create a multithreaded Subscriber. Conceptually, the Subscription is a Single-Producer-Single-Consumer item, if you decide to use it differently then it is up to you to ensure correctness.

Whether or not a Subscriber should be able to "break" a Subscription by not adhering to the spec—well the spec is there for a reason, and there are plenty of cases where a faulty, or even Byzantine, Subscriber implementation can cause all kinds of problems. (And, the Subscription becomes a part of the Subscriber's state, and it needs to deal with memory visibility for its state.)

I'm afraid that unless anything new (substantial) is added to this conversation, I have nothing else to add.

Cheers, √

thekalinga commented 6 years ago

@viktorklang

Agree with you for the SPI & implementation parts. Sure end users like me rarely implement our own Publisher/Processor, but needs to implement Subscribers for any non trivial activity

Even if the above one is granted that we rarely implement Subscribers, I dont think it justifies current form of 2.7 the way its framed, linking Subscriber to take care of Subscription state. It clearly breaks Single Responsibility Principle

Subscription is Publishers agent given to Subscriber to talk back to Publisher. So, the responsibility to ensure state consistency should lie with the implementors of Subscription, not Subscriber

Can someone else please provide justification for 2.7 in its current form, as I dont see any as of now (other than the fact that some Publishers might have little performance advantage if Subscriber cooperates this way, but for non cooperating Subscriber there are no guarantees, which is a problem IMO)