reactive-streams / reactive-streams-jvm

Reactive Streams Specification for the JVM
http://www.reactive-streams.org/
MIT No Attribution
4.81k stars 532 forks source link

Happens-before relationship between `Publisher#subscribe()` and `Subscriber.onSubscribe()`? #486

Open NiteshKant opened 4 years ago

NiteshKant commented 4 years ago

Considering a simple example:

public class UnsafeSubscriber implements Subscriber<String> {
    private boolean duplicateOnSubscribe = false;

    @Override
    public void onSubscribe(final Subscription s) {
        if (duplicateOnSubscribe) {
            throw new IllegalStateException("Duplicate onSubscribe() calls.");
        }
        duplicateOnSubscribe = true;
    }

    @Override
    public void onNext(final String s) {

    }

    @Override
    public void onError(final Throwable t) {

    }

    @Override
    public void onComplete() {

    }
}

If an UnsafeSubscriber instance is created in a different thread than the one that invokes onSubscribe() (true for an asynchronous Publisher), according to the java memory model, this statement inside onSubscribe():

if (duplicateOnSubscribe) {

is guaranteed to compute to false if and only if the instance is published safely between these threads. None of the rules in the specifications establish a happens-before relationship between Publisher#subscribe() and Subscriber#onSubscribe(). So, the usage above can be categorized as unsafe. In a more convoluted form, the assignment:

private boolean duplicateOnSubscribe = false;

can be interleaved with

duplicateOnSubscribe = true; such that duplicateOnSubscribe is set to false later.

Has this been considered before or am I missing something?

viktorklang commented 4 years ago

@NiteshKant Isn't this addressed by https://github.com/reactive-streams/reactive-streams-jvm#1.3 ?

NiteshKant commented 4 years ago

I do not think so. Take the below code as an example:

public class UnsafePublisher implements Publisher<String> {
    @Override
    public void subscribe(final Subscriber<? super String> subscriber) {
        // Assume unsafePublish does an unsafe publish of the Subscriber instance
        unsafePublish(subscriber);
    }

    private void unsafePublish(final Subscriber<? super String> subscriber) {
        // Assume we are here in a different thread to which subscriber instance was published unsafely.
        subscriber.onSubscribe(new Subscription() {
            private boolean done;
            @Override
            public void request(final long n) {
                if (!done) {
                    done = true;
                    subscriber.onNext("foo");
                    subscriber.onComplete();
                }
            }

            @Override
            public void cancel() {
                done = true;
            }
        });
    }
}

UnsafePublisher follows rule 1.3 i.e. it makes sure that all Subscriber methods are invoked serially. There is a happens-before relationship between onSubscribe() -> onNext() due to rule 2.11 which says receive of onSubscribe() happens before processing of onSubscribe().

However, due to the unsafe publishing of Subscriber instance there is no happens-before between subscribe() and onSubscribe() which means duplicateOnSubscribe in the original code still is racy.

viktorklang commented 4 years ago

@NiteshKant Wouldn't https://github.com/reactive-streams/reactive-streams-jvm#1.9 prevent the onSubscribe to be executed on some other thread, as it would not be executed under subscribe?

Scottmitch commented 4 years ago

https://github.com/reactive-streams/reactive-streams-jvm#1.9 is focused on the ordering of methods invoked on the Subscriber (e.g. onSubscribe must be first). The specification uses the term serial to require a "happens-before" relationship, and it isn't clear that 1.9 provides this (or if some other combination of rules is meant to imply this relationship). What I would expect is something like:

<1.12, or existing rule(s)?>: A Publisher MUST establish a serial relationship between subscribe and its first interaction with the Subscriber (e.g. the onSubscribe method per 1.9).

rational: This rule is intended to clarify that any local non-final state initialized for use in the Subscriber before the Publisher.subscribe call will be visible before the Publisher interacts with a Subscriber.

NiteshKant commented 4 years ago

@NiteshKant Wouldn't https://github.com/reactive-streams/reactive-streams-jvm#1.9 prevent the onSubscribe to be executed on some other thread, as it would not be executed under subscribe?

I don’t think 1.9 disallows onSubscribe() to be called from a different thread as long as the order of method calls to the Subscriber is as suggested by the spec. In my example I am just demonstrating how an unsafe publication of Subscriber can still lead to following the spec but unexpected behavior in the Subscriber.

The intent here is to see whether this aspect should be covered by the spec.

rkuhn commented 4 years ago

So far the creation of protocol entities as well as their internal structure and communication is not mentioned in the spec. It is reasonable to assume that a Subscriber does not need to be thread-safe given the provisions in §1.9, the same as objects delivered to onNext are reasonably expected to be normal (unsynchronized) POJOs. In this light I’d find it reckless to use unsafe publication in the implementation of a Publisher, and if I’m not mistaken this would be outside the JMM in any case. So for me the proposed new rule can already be derived from the existing ones.

@NiteshKant are you aware of any implementations that would be affected by such a new rule?


Side note: §1.9 is indeed ambiguous on whether onSubscribe must happen before subscribe returns — it is worded as though it should (though not crystal clear), but the intent clarification detracts from this provision.

NiteshKant commented 4 years ago

@rkuhn I would agree that it will be reckless to use unsafe publication of a subscriber instance inside a Publisher implementation but isn't one of the goals of a specification to ensure that implementors are not reckless? 🙂

The objects delivered to onNext doesn't have to worry about safe-publication (if processes asynchronously from within onNext) as Rule 2.11 prohibits unsafe publication.

are you aware of any implementations that would be affected by such a new rule?

I do not know of any implementation that would be negatively affected but I can point you to this change in ServiceTalk that will be positively impacted with regards to clarity in memory visibility.

Side note: §1.9 is indeed ambiguous on whether onSubscribe must happen before subscribe returns 

Actually I thought it is intentional for the spec to allow for delayed onSubscribe() signal after subscribe() returns. It certainly is useful to decouple sources that perform blocking work inside subscribe() from the caller of subscribe().

viktorklang commented 4 years ago

@rkuhn @NiteshKant You could also argue that since the spec doesn't mandate that creation of any of Publisher, Subscriber, Subscription, and Processor is safely published, the Publisher cannot assume that it can unsafely publish a Subscriber.

NiteshKant commented 4 years ago

@rkuhn @NiteshKant You could also argue that since the spec doesn't mandate that creation of any of Publisher, Subscriber, Subscription, and Processor is safely published, the Publisher cannot assume that it can unsafely publish a Subscriber.

Yes agreed! The question here is whether we should make this explicit in the spec that a Publisher should safely publish the Subscriber such that implementations of Subscribers (like the ServiceTalk one I referenced above) can assume memory visibility guarantees.

viktorklang commented 4 years ago

@NiteshKant I guess we could clarify the intent of 1.9?

NiteshKant commented 4 years ago

Yes that sounds like something that will be beneficial. Should I take a stab at the clarification?

viktorklang commented 4 years ago

@NiteshKant Yes, please do! :)

devsr commented 4 years ago

I was just about to post a question regarding rule 1.9, but I saw this issue is very related.

Is Publisher.subscribe required to call onSubscribe before returning or can onSubscribe be called asynchronously so long as it happens before other signals?

The rule currently says

Publisher.subscribe MUST call onSubscribe on the provided Subscriber prior to any other signals to that Subscriber and MUST return normally...

The current language seems to only demand a synchronous callback if Publish.subscribe intends to emit signals sychronously. The only firm requirement seems to be that onSubscribe happens before any other signal. The javadoc on Publisher.subscribe is also does not appear to demand a synchronous callback from implementations.

NiteshKant commented 4 years ago

@devsr that is my understanding of the rule too that onSubscribe() must be called before any other signal to the Subscriber and not necessarily before Publisher#subscribe() returns. In fact, having such an expectation is excessively restrictive because it avoids offloading subscribe() call to a different thread. Such an offloading may be required to decouple the callers from a potentially blocking call inside subscribe()

viktorklang commented 4 years ago

@NiteshKant I'm interested in understanding the offload use-case, could you point to some code or otherwise exemplify?

rkuhn commented 4 years ago

Coming back to this makes me think: should we perhaps add an overarching clarification that conformance to this spec does not imply race freedom as defined by the JMM? To my mind such a guarantee was never among our goals, and to me it is unnatural to assume that the spec of a communication scheme that is independent of the concrete message transport guarantees low-level properties like safe publication. If someone chooses to use unsafe publication, they live outside many specifications, including the JMM and Reactive Streams.

In spec parlance: “Using unsafe publication for any communication between Publisher, Subscriber, and Subscription leads to undefined behavior under this specification.”

My guess is that if every specification in the Java ecosystem were checked against this measure, then we’d see this disclaimer on all of them.

viktorklang commented 4 years ago

@rkuhn I'm inclined to agree with your assessment. Relying on unsafe publication to yield safe operational semantics would be an awkward position to take.

NiteshKant commented 4 years ago

@rkuhn @viktorklang the unsafe publication bit maybe tripping y'all off more that what I desired. The suggestion here is in similar spirits as rule 2.11.

A Subscriber MUST make sure that all calls on its signal methods happen-before the processing of the respective signals. I.e. the Subscriber must take care of properly publishing the signal to its processing logic.

Would adding such an overarching clarification make this rule void?

@viktorklang

I'm interested in understanding the offload use-case, could you point to some code or otherwise exemplify?

I should have been more explicit about this to start, sorry about that. Let me start with a simple example and I can point to concrete features from some reactive libraries, if the usecase is not pursuasive enough.

Lets assume a generic library is providing a feature to create a Publisher from a Callable. Callable#call() can block so the intent is to decouple the caller of Publisher.subscribe() from the potential blocking code inside Callable#call(). A way to do it is like:

public class OffloadingPublisher<T> implements Publisher<T> {

    private final Executor executor;
    private final Callable<T> producer;

    public OffloadingPublisher(final Executor executor, final Callable<T> producer) {
        this.executor = executor;
        this.producer = producer;
    }

    @Override
    public void subscribe(final Subscriber<? super T> s) {
        executor.execute(new Runnable() {
            private T result; // Ignoring multi-threaded access for simplicity
            @Override
            public void run() {
                s.onSubscribe(new Subscription() {
                    @Override
                    public void request(final long n) {
                        // atomically check and deliver result if not delivered
                    }

                    @Override
                    public void cancel() {

                    }
                });
                T result = producer.call();
                 // atomically check and deliver result if already requested and not delivered
            }
        });
    }
}

One could argue that instead of offloading onSubscribe to the executor one can call onSubscribe() first and offload only result delivery. However, since there is no provision for cancellation in this case, that is arguably less useful.

viktorklang commented 4 years ago

@NiteshKant Thanks for clarifying, Nitesh!

Would your problem be solved by instead moving the Thread-offloading to be initialized from within the Subscription instead? Based on the use-case you may not want to trigger it until request(n) has been called? This means that your Publisher can now instantiate the new Subscription, passing the Callable into it, then invoke onSubscribe(subscription) and then return?

For the case where you want to immediately (eagerly) get the value out of the callable, you can after calling onSubscribe(subscription) kick off the computation by invoking your own init-method on your subscriber implementation.

NiteshKant commented 4 years ago

@viktorklang Most definitely thats another way of implementing this Publisher.

The point I am demonstrating here is; spec enforcing that onSubscribe() MUST be called before returning from subscribe() has arguable advantages and makes code arguably complex in certain other scenarios. I am just questioning the value of doing so as opposed to the current state where this is not explicitly enforced.

viktorklang commented 4 years ago

@NiteshKant The main benefit of the rule is to make sure of one thing: When subscribe()-returns there is a signal going to the Subscriber

The reason for this being important is that you don't want to risk having code where associations get lost and there is no way of even starting to try to figure that out. More often than not, the caller is in control of the Subscriber, but the Publisher implementation is provided by third party, so it would be tricky to debug by using a different Subscriber.

NiteshKant commented 4 years ago

@viktorklang I think there is some disconnect here.

The main benefit of the rule is to make sure of one thing: When subscribe()-returns there is a signal going to the Subscriber

From your comment it looks like you are saying that the rule already ensures that onSubscribe() is called before subscribe() returns but I don’t see that defined in the rule and I am advocating not making that change. Am I misunderstanding you here?

OlegDokuka commented 4 years ago

@NiteshKant

I guess the intent here is to clarify that there should not be any possibility to offload onSubscribe to another thread. The goal of this rule is to say. The subscribe method MUST call the onSubscribe method, so the downstream Subscriber will have it once the subscribe() method is returned. The need for the is to make sure that the Subscriber can always cancel the Subscription, and there is no indeterminism related to the thread scheduling.

Imaging a case, you offloading onSubscribe method to another thread, in the mean while, a Subscriber called subscribe() method figured out that the Subscription it has done is not needed anymore. Still, the problem now is that task is scheduled, but there are no ways to cancel it.

Thus, I will clarify the @viktorklang statement.

When subscribe()-returns there is a signal going to the Subscriber

When the subscribe() - returns a Publisher must guarantee the onSubscribe method has been called (happens before the return), and a Subscriber received a subscription so it can decide what to do with a given Subscription even within the subscribe method call stack.

viktorklang commented 4 years ago

The real question is what callers expect to have happened when subscribe(s) has returned.

In either case, whether we want onSubscribe to be invoked synchronously during subscribe or not, it should be clarified in §1.9.

OlegDokuka commented 4 years ago

My expectation has been onSubscribe called, and the Subscriber has already received a Subscription on the moment subscribe(s) is returned. And all of that MUST be synchronous. Otherwise, it may lead to unexpected scenarios (basically pandora box when the task is started, but nobody can cancel it) ( and believe - I have already shot my self in the leg doing that asynchronously)

viktorklang commented 4 years ago

@OlegDokuka For the sync case, I believe the rule could be clarified using something like:

Publisher.subscribe MUST call onSubscribe on the provided Subscriber, and do so prior to any other signals to that Subscriber, and MUST return normally, except when the provided Subscriber is null in which case it MUST throw a java.lang.NullPointerException to the caller, for all other situations the only legal way to signal failure (or reject the Subscriber) is by calling onError (after calling onSubscribe).

OlegDokuka commented 4 years ago

sounds good to me. I guess we can enforce it saying synchronously explicitly (we have that in vocabulary, so it should be fine for those may endup thinking that offloading onSubscribe to eventloop may be legal as well) E.g.

Publisher.subscribe MUST call onSubscribe synchronously on the provided Subscriber, and do so prior to any other signals to that Subscriber, and MUST return normally, except when the provided Subscriber is null in which case it MUST throw a java.lang.NullPointerException to the caller, for all other situations the only legal way to signal failure (or reject the Subscriber) is by calling onError (after calling onSubscribe).