Closed rkuhn closed 7 years ago
This is still the same catch 22 situation, the purpose you quoted is absolute and any deviation from it voids any arguments against the purpose.
This is crucial: it is about efficient exchange across an asynchronous boundary, also across different libraries.
RxJava 2 is the proof this efficient exchange can be way more efficient with the rule relaxations I proposed.
If you were to use a Sink.publisher and sent a request(0) to the materialization result you would see a spec-compliant rejection. This is all that matters.
I'd say this is a convenience that allows you to sidestep/break RS internally.
I have not followed the discussions that led up to the current design, so don’t take this as a dismissal of it—there are probably very good reasons for this choice.
RxJava 2 was implemented in the spirit ReactiveX guided RxJava 1. When the TCK checks were added at the very end, we were faced with the failures of §1.3 and §3.9 related tests. @smaldini the time confirmed he run into similar failures probably due to the same reasons.
I think we subconciously expected resistance against changes to RS and sidestepped the problems by introducing conformance wrappers to make the tests pass. Later on, RxJava 2 turned the solution into an operator to help with the interoperability.
I'm pretty sure I can come up with a solution to keep the RS compliance badge and not lose performance in RxJava 2 while maintaining our own binary compatibility.
From #339:
And as long as a user stays within the world of a single RS implementation (i.e. a vendor- homogeneous stream setup) the RS spec imposes no restrictions whatsoever.
I'd agree with this except RxJava 2 and Reactor 2-3 are the only libraries that expose each stages as Publishers. Correct me if I'm wrong but every other, DSL-style implementation (i.e., not > one-time publishers) like Akka-Stream, Swave, etc. don't expose such a Publisher at each step but have method(s) to gain access to a Publisher.
I'd like to second @rkuhn by pointing out that this appears to be the very root of the discussion in #339 as well as quite a few prior discussions.
Over the whole course of the RS effort it was very clear that the goal of the RS protocol is the definition of an interop standard, i.e. an external interface specification. As we all know external interfaces differ quite a bit from internal ones. Among other things they require more validation, more documentation and quite a bit more stability. That is the reason that Akka Stream (and swave) have chosen to rely on other interfaces internally, which are completely under our own control, have less overhead and can be evolved independently from the external ones.
Like @rkuhn I'm not sure why exactly you decided to rely on RS also for internal interfaces, but it appears to me as if you are trying to mitigate the consequences of this design decision by influencing the RS protocol in a way that makes it more suited to your internal use case.
My impression is that the resistance you meet against such "evolution" here stems from the feeling that such changes would indeed weaken the RS protocol in fulfilling its sole and actual purpose, which is providing an interop standard.
From my perspective one could argue that the RS protocol has already suffered from too many such concessions, e.g. by allowing synchronous execution across boundaries. Akka Stream and swave, for example, intentionally do not make use of this option, because, in our view, this couples two implementations too tightly and doesn't really fit the external interface character.
From reading your prior words in #339 I even get the impression that you have already come to the same conclusion that internal and external interfaces differ. If you simply rename your "weak RS" to "internal Interface", "strong RS" to "RS" and ".strict()" to ".toPublisher()" you should be in the same spot as Akka Stream (and swave).
I'd say this is a convenience that allows you to sidestep/break RS internally.
This is the crux of the matter: whatever anyone does inside their Publisher or Subscriber is completely up to them, the standard must not care. The only thing this standard can and should enforce is the externally visible behavior of these interfaces.
There is no sidestepping let alone breaking of RS compliance in Akka, not anywhere! As @drewhk has pointed out at https://github.com/reactive-streams/reactive-streams-jvm/pull/339#issuecomment-280659907 every single implementation of these interfaces is tested against the TCK. What we do within our Actors, however, is not and cannot be governed by the standard. Reactive Streams is not a standard for how to implement stream processing engines; it defines how to plug these engines together.
From @akarnokd’s last paragraph above I conclude that we are in agreement, but I think it would benefit all involved parties—current and future—to explicitly acknowledge this point.
There is no sidestepping let alone breaking of RS compliance in Akka, not anywhere!
Excuse my navigation skill limits but a link showing an internal Akka-Stream component implemented via RS would be great, more specifically, if you could point me to the class that ensures §3.9 internally would be much appreciated.
If §1.3 and §3.9 weren't so harsh, we all could have more efficient implementations and one less reason for splitting between an internal and external implementation. I also welcome you to see what happens to the Akka-Stream performance if the serialization required to fulfill §3.9 would be lifted.
if you are trying to mitigate the consequences of this design decision by influencing the RS protocol in a way that makes it more suited to your internal use case
The spec was already heavily influenced by the design decision by Akka-Stream (or its variant at the time) relying on separate Actors on each computation stage and thus justifying it via mandating "asynchronous boundaries". One of the great insights of Rx.NET was that in a fluent reactive API/design, asynchrony can be a parameter of the flow applied on demand instead of the requirement. Adding asynchrony is easy, removing it is hard (see Actor fusion difficulties).
The second heavy influencing act was the attempt to make the RS component(s) completely single use only, similar to how Future
is single use because it represents a running or an already completed computation. Calling get()
twice won't restart the underlying task. Java Stream
is also single use whereas Iterable
can be assumed reusable almost always.
In RS terms, this means that there was only Processor
, each computation stage is a Processor
and I don't think the "blueprint system" what's possible today would be. Having only processors requires reassembling the stages manually, as if you have to start over from Source
if multiple consumption were needed.
Correct me if I'm wrong but there is no retry in Akka-Stream or in Swave, right? If a source fails, the Publisher representing that source can't be resubscribed by a new Subscriber and have a chance to get a successful completion. Akka-Streams's Source
sounds like a blueprint of computation but in order to rerun it, I have to rematerialize it that requires context to be dragged along. Resubscribing in case of a failure is a primary resilience feature that falls out naturally from the RS design. If I'm right here, that means having an "inner" RS (or other architecture) and the outer "RS" prevents retry/resilience over the interop boundary and if Akka-Stream is the source, I can't even have it retry within the Akka-Stream API.
Sorry, @akarnokd, it seems impossible for me to get through to you. You are still painting Reactive Streams as an implementation tool for stream processing engines, which it is not. I’ll stop here.
PS: see here for how Akka Streams implements request
and cancel
, which also makes it very clear that there is no efficiency to be gained for an asynchronous abstraction—which is the sole purpose of Reactive Streams.
Can't disprove a definition.
If it is incorrect to interpret Reactive-Streams anything other than asynchronous interop then looks like I and @smaldini have discovered an extension to Reactive-Streams that welcomes synchronous interoperation as well as more efficient interop possibility (sync or async). Interop protocols get new versions all the time in order to fix, expand or enhance the previous version(s), sometimes at the cost of breaking compatibility (see HTTP/2).
Okay, we’re making progress. Yes, it certainly looks like you have found a very efficient way of implementing synchronous streaming pipeline pieces, and different implementations may want to interoperate at that level. I’d see this not as an incompatible extension of RS but a different standard with a different purpose. As soon as you want to pass data to a network socket or another thread (e.g. in .observeOn
) RS semantics will come in very handy!
I still think that the problem with §1.3 is a matter of wording. How about something like:
For each Subscription, actions across each Subscriber method (onSubscribe, onNext, onComplete, and onError) must be totally (sequentially) ordered.
This constrains the only property that matters without saying how it is obtained. For example, for all we care, maybe all the actions are linked as continuations. A more precise rule requires defining actions "transactionally" with a specified beginning and end. Just saying "action" seems good enough.
@DougLea Perhaps it would be better to discuss the spec rule clarifications separately from this high-level issue regarding overall intention of the entire standard. If not in #339 then we can also open a new issue for this particular point.
@DougLea
§1.3 requires 1 CAS per subscriber most of the time and later on a volatile read on the Subscription
field instead of a plain one. It adds overhead but the longer the stream the less it matters.
The big performance hog is §3.9 that imposes per item synchronization. I've updated RxJava 2 to be strict by default, relaxed if subscribed to with a relaxed Subscriber and the following comparison shows the costs depending on the length of the stream and the cpu cycles consumed when processing an item:
i5 6440HQ, Windows 10 x64, Java 8u121, operations per second - larger is better Source code
It shows that if the business logic takes more than 100 CPU cycles per item, the 2 CAS per item (external mode) poses reasonably low overhead. Not having 2 CAS and less than 100 CPU cycles per item can yield up to 5x less overhead. The benchmarks are synchronous which I think indicates that they represent an upper bound on the achievable throughput, any additional async boundary just decreases this bound.
My suspicion is that pure async implementations likely schedule each individual item as a task in some thread pool (fork-join) and already start from a 100 cycle overhead due to crossing the async boundary. Even if an item doesn't result in a task being submitted to a thread pool, indicating available items still requires an atomic increment on the entry point and one at the consumption point (see our observeOn).
Since the Flow API already requires an interop with RS on its own, you are in the unique position to reword §1.3 as you suggested in the JDK and drop the MUST from §3.9, that is a non-positive request doesn't require one to break a flow with onError, cancellation and per item synchronization. Then once my interop PR actually builds with gradle/travis, the Flow -> RS conversion can include the same strengthening I do in RxJava 2 and leave RS -> Flow with no additional overhead.
@akarnokd Thanks for data. I had considered considered subscriber-side buffering of demand but didn't do it in jdk9 SubmissionPublisher because if didn't seem worth the complexity. But your results imply that something along these lines might help. I'm not all the way sold that 3.9 needs to be weakened to do this though. Decreasing demand only when finished with a run of tasks could still be done.
I'll try to demonstrate what's going on via some pseudocode:
This is roughly what SubmissionPublisher
and likely Akka-Stream is doing:
Queue<Message> queue = ...
AtomicInteger state = ...
Subscriber<T> subscriber = ...
ExecutorService asyncBoundary = ...
long requested;
void request(long n) {
queue.offer(new RequestMessage(n));
if (state.getAndIncrement() == 0) {
asyncBoundary.submit(this);
}
}
void onNext(T t) {
queue.offer(new ItemMessage(t));
if (state.getAndIncrement() == 0) {
asyncBoundary.submit(this);
}
}
void run() {
do {
Message m = queue.poll();
if (m is RequestMessage) {
long r = m.amount;
if (r <= 0) {
subscriber.onError(new IllegalStateException("§3.9 ..."));
return;
}
requested += r;
}
if (m is ItemMessage) {
subscriber.onNext(m.item);
requested--;
}
// other messages...
} while (state.decrementAndGet() != 0);
}
Here, run
is guaranteed to run on a single thread at a time thus calling onError
due to the bad amount is serialized in respect to onNext
. However, the setup has 2 atomic increment *per item** of onNext
.
Now let's see the strict mode serialization in RxJava
AtomicInteger wip = ...
Subscriber<T> subscriber = ...
Subscription upstream = ...
Throwable error = ...
void request(long n) {
if (n <= 0) {
upstream.cancel();
error = new IllegalArgumentException("§3.9 ...");
if (wip.getAndIncrement() == 0) {
subscriber.onError(error);
}
} else {
upstream.request(n);
}
}
void onNext(T item) {
if (wip.getAndIncrement() == 0) {
subscriber.onNext(item);
if (wip.decrementAndGet() != 0) {
subscriber.onError(error);
}
}
}
In this case, the async boundary is probabilistically present between request and onNext as they could be executing at the same time but on different threads (for example, because the request comes from behind a second async boundary). As you can see, this also adds 2 atomic increments per item as well, leveling the costs between the two approaches. However, if §3.9 was not required to call onError
, there was no need for the 2 atomic increments per item at all.
@akarnokd Could you also similarly show your non-strict mode?
My previous comment was that some of the atomic updates of demand can be reduced by pre-decrementing before loops (with some messy compensation).
But your suggestion seems to only apply to cases in which request() calls are always within onNext()? Which would be an OK thing to optimize for if you somehow knew how to detect/ensure. But if you could, then you'd also know how to return to strict mode when bad requests occur?
Sure,
Subscriber<T> subscriber = ...
Subscription upstream = ...
void request(long n) {
if (n <= 0) {
// "raise this error condition in a fashion that is adequate for the runtime environment"
Thread t = Thread.currentThread();
t.getUncaughtExceptionHandler().handleException(t, new IllegalArgumentException("§3.9 ..."));
} else {
upstream.request(n);
}
}
void onNext(T item) {
subscriber.onNext(item);
}
But your suggestion seems to only apply to cases in which request() calls are always within onNext()?
If I knew request()
is always called from onNext (but the first is always from onSubscribe) then I'd know there is no need for synchronization. In practice request() may come from any thread because RxJava is not a framework.
The underlying issue is that by default, Java threads are not threadpools themselves and thus you can't just say, get me the current pool (Thread.currentExecutor()
) and submit yourself a message or make sure others send your processing stage request/data through that executor.
In RS terms, a flow may be emitting from one of the fork-join thread but is consumed from and requested from the "main" thread.
Akka is a framework which ensures messages from one Actor come from the same thread or are otherwise sequential in respect of activation of the same Actor; as far as I understand.
But if you could, then you'd also know how to return to strict mode when bad requests occur?
On an async boundary like observeOn
, §3.9 would be relatively cheap since the operator pretty much looks like my first code example. I could just set a volatile boolean badRequest from request()
and the drain loop checked it and called onError(IAE)
. However, §3.9 mandates all operators of mine, regardless being an async boundary of its own or not.
Bad requests may occur concurrently when the sender is onNext
ing. Detecting that there is an ongoing concurrent onNext
requires atomics.
Since there is no JVM magic to remove atomics and replace them with something similar to biased locking or lock elision, what's left is to live with the atomics or use synchronized
which defeats the "non-blocking" intention of RS probabilistically.
Two notes:
The spec does not say when onError must be issued. It could be delayed. (Considering that its only value is to alert programmers of bugs, there's no pressing QoS concern.)
You don't have to always avoid atomics. Just doing them less frequently usually washes out impact. That's why requests > 1 work well, and why batch-decrementing should improve performance.
@DougLea
I've implemented a MulticastPublisher
similar to SubmissionPublisher
that honors §3.9 here and incorporates your suggestions (not 100% sure I covered all aspects of the spec though without my usual RxJava tools).
When I benchmark it manually (not investigated how to get JMH do it), I get
// Java 9 Flow
MulticastPublisher 36,482 ops/s
SubmissionPublisher.offer 20,584 ops/s
SubmissionPublisher 18,063 ops/s
// RxJava 2.0.6
PublishProcessor 56,195 ops/s
PublishProcessor+observeOn 23,279 ops/s
PublishProcessor-relaxed 131,500 ops/s
PublishProcessor-relaxed+observeOn 30,891 ops/s
i7 4770K, Windows 7 x64, Java 9b157
Not sure why SubmissionPublisher
is that slower as I don't fully understand its internals.
This seems to show that buffering atomics can improve throughput under at least some scenarios. (And that we should do more of this in SubmissionPublisher). But I'm not sure how it relates to the §3.9 question, because I don't understand connection to your Rxjava comparisons? Although I assume that the "relaxed" part shows that eliminating all atomics in purely same-thread operations could have a big impact if we had some better way to reliably detect it automatically or allow users to explicitly choose it. (This is reminiscent of ForkJoinTasks supporting tryUnfork and direct calls to compute() so users can optimize strictly local operation when they know they can benefit from doing so.)
Since there's potential propagation delay between requesting and delivering a valid implementation (of 3.9) would be to flip a switch and amortize when checking it and propagating the onError—or am I missing something?
On Mon, Feb 20, 2017 at 3:48 PM, DougLea notifications@github.com wrote:
This seems to show that buffering atomics can improve throughput under at least some scenarios. (And that we should do more of this in SubmissionPublisher). But I'm not sure how it relates to the §3.9 question, because I don't understand connection to your Rxjava comparisons? Although I assume that the "relaxed" part shows that eliminating all atomics in purely same-thread operations could have a big impact if we had some better way to reliably detect it automatically or allow users to explicitly choose it. (This is reminiscent of ForkJoinTasks supporting tryUnfork and direct calls to compute() so users can optimize strictly local operation when they know they can benefit from doing so.)
— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/reactive-streams/reactive-streams-jvm/issues/341#issuecomment-281097999, or mute the thread https://github.com/notifications/unsubscribe-auth/AAAqd6purXPlOWeI4Ibp6W8YWpGsrZpoks5reafOgaJpZM4MEmhs .
-- Cheers, √
This just shows there is no additional per-item penalty signalling the bad request (§3.9 detection) on the emitter thread because it can be hidden behind the existing checks and the whole setup has already large enough overhead to begin with.
As @viktorklang says, the reporting of non-positive request (btw SubmissionPublisher ignores 0 in the request() method at L1317) is amortized by the fact that each item delivery across the async boundary has an atomic increment at least.
@akarnokd
The spec was already heavily influenced by the design decision by Akka-Stream (or its variant at the time) relying on separate Actors on each computation stage and thus justifying it via mandating "asynchronous boundaries".
Please don't invent history. The above is patently false.
You are right, I wasn't in on the committee and thus I can only rely on my observations, own experience in the reactive programming field and stories I've heard from others. I should have started with "In my opinion".
Since I can't read Scala properly, tend to be mistaken on its properties and apparently can't benchmark it to show its strengths, I'll refrain myself expressing what, how and why Akka-Stream is doing.
@akarnokd This is not about opinions, it is about facts—the standard is for interop. Why else would there need to be a standard at all?!
«Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure.»
«It is the intention of this specification to allow the creation of many conforming implementations, which by virtue of abiding by the rules will be able to interoperate smoothly, preserving the aforementioned benefits and characteristics across the whole processing graph of a stream application.»
The whole initiative started when I met Erik Meijer in Lausanne, we stood in front of a whiteboard and started discussing how to implement observeOn
with a bounded buffer.
@viktorklang
What bothers me immensely is the word "asynchronous" that lingers inexcusably on the site you quote regularly - etched into granite it seems to me . RxJava 2 is the proof it is possible to implement dataflows synchronously - this falls out from the orthogonality of "Scheduler"s of the ReactiveX design and the way co-routines help implement non-blocking backpressure via various trampolining algorithms.
I'd be in joy if I could now claim that by my recognition and design, RxJava 2 is able to extend the RS standard with a synchronous mode while backed by the same 4 interfaces, 7 methods and relaxation of several of the existing RS rules and largely the same operator internals.
@rkuhn
The technology has come a long way since then I assume and you can easily see now how simple is to implement observeOn
or any other signal trampolining (with or without a thread hop).
Now back to the discussion on the internal/external usage, I've changed things around so RxJava 2.0.7 will detect our own custom Subscriber
s and work in relaxed mode - little to no penalties so far. Everybody else taking a Flowable
(implements org.reactivestreams.Publisher
) will find a source conforming the RS specification when calling subscribe(org.reactivestreams.Subscriber)
on it proven for ~100 (lossless) operators via the standard RS TCK.
@akarnokd Nobody here, has to my knowledge, ever, claimed that it would even be a hard thing to support synchronous data flows. In fact, as it stands, RS incidentally already does this. Also, I'd even go so far as to say that you can most probably get to, or within reasonable, performance to what your relaxed (spec breaking) changes propose, but without breaking anything at all.
Again (and for the last time):
«Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure.»
I was there from nearly the beginning, meeting at the Twitter offices, and similar discussing things like bounded observeOn
with Jonas at a conference, so I want to back up what the Akka folks are saying, from the RxJava side of things which I was leading during the time ReactiveStreams was created.
The spec was already heavily influenced by the design decision by Akka-Stream (or its variant at the time) relying on separate Actors on each computation stage and thus justifying it via mandating "asynchronous boundaries".
This is false. The Akka team was excellent in choosing to collaborate and compromise, including on topics they definitely didn't agree with, such as allowing synchronous invocation for optimizations, since RxJava wanted to continue supporting that.
The second heavy influencing act was the attempt to make the RS component(s) completely single use only, similar to how Future is single use because it represents a running or an already completed computation. Calling get() twice won't restart the underlying task. Java Stream is also single use whereas Iterable can be assumed reusable almost always.
This is false. RxJava was there in the discussions from the beginning and ensured that a Publisher
could support being subscribed to multiple times with either "hot" or "cold", "stateful" or "stateless" implementations of the Publisher.
If it is incorrect to interpret Reactive-Streams anything other than asynchronous interop then looks like I and @smaldini have discovered an extension to Reactive-Streams that welcomes synchronous interoperation as well as more efficient interop possibility (sync or async). Interop protocols get new versions all the time in order to fix, expand or enhance the previous version(s), sometimes at the cost of breaking compatibility (see HTTP/2).
This is false. @smaldini and I were involved in the discussions from the beginning for Reactive Streams and together collaborated with @rkuhn @viktorklang and others and resulted in rules such as 3.2 and 3.3 permitting synchronous emission for use cases where asynchrony was not needed. This was directly influenced by how RxJava has pipelines of operators that synchronously invoke each other.
I'd be in joy if I could now claim that by my recognition and design ...
@akarnokd It's hard for me to decide how to respond to a statement like this, but feel it needs to be addressed. Every person in this discussion thread is very intelligent and accomplished. Some such as @DougLea have literally "written the book" on concurrency and his concurrency libraries are used by nearly the entire world via Java servers powering the internet. @viktorklang and @rkuhn have architected and led the development of Akka and several other libraries used by massive numbers of people and powering very large systems.
None of them are overtly seeking personal recognition or self-aggrandizement, particularly via open source collaborations, and certainly not as an argument for something to happen. Nor are they leaning on their credentials or past experience in order to make their arguments.
Yet your argument is that you want "joy" from being recognized for your contribution and design? You even marked the word "my" in bold while arguing that you need to be recognized.
Please stop seeking personal aggrandizement and polluting community collaboration with egotistical pursuits. Stick to objective use cases, statements of fact, and API design questions driven by data. And recognize that there is far more than microbenchmarks driving the design and implementation of these libraries, particularly when interop and long-term support are key goals.
Again (and for the last time): «Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure.»
Exactly. If we are doing synchronous streams then they are either Iterable
and we don't need the request(n)
semantics, or they are callstack blocking and again we don't need request(n)
semantics.
Ben, thank you for declaring your stance on these topics.
I just wanted to recap that I agreed with @viktorklang on that my suggestions were indeed behaviorally backwards incompatible and the particular issue could be revisited later. On the subject of this issue, after considering the consequences and feedback, the incompatibility shortcoming has been addressed in RxJava 2. This issue can now be closed as resolved.
Doug, I'm sorry if overwhelmed you with my opinions. It was never my intention question or disrespect your experience in the field of java concurrency and I value your insights greatly. I have the bad habit of arguing with more and more examples if I sense I wasn't convincing enough yet not recognizing when to stop.
For the rest of the participants, I thank you for showing me the "lay of the land".
Thanks @reactive-streams/contributors and especially @rkuhn @viktorklang @akarnokd for the lengthy but informative discussion. For one thing, I am in favor of debating and challenging, that will keep this vision ahead of the tech curve in its uniqueness and usefulness. @viktorklang and @rkuhn well pointed the aspect that binds us, the specification reward, a standard for a very opinionated space in much need for this. Each time I mention the diversity of the portfolio backing this spec, people are wow'ing, and we pulled that off outside of any regular process but practical work and passion. I was totally sold when @viktorklang reached out to us and enjoyed alpha testing the tck @ktoso kept improving. I am even more sold today. To me we (reactor) did have a shortcoming that is now accidentally clarified via this discussion (duplicate subscribe) and we're going to fix by a next release. Interestingly if I look at our decisions in the past regarding the spec, I sided strongly on some argument on two different visions I think that greatly contributed to the spec in general. There is a strong influence from Rx and Akka teams, and so much good thought process that my position was usually to just test, confirm or eventually feedback given a different perspective at the time as we too suffered from backpressure in fixed-sized queues. And that's what I like the most in the spec, the coming of different visions to shape the master plan, the great one, the magnificient, RS.
I still think there is room as usual for improvements, our eternal doom in programming probably. @akarnokd has again well documented his finding, passionately argued about them with sometime a divisive tone that he admitted was overmuch. But his arguments stand and its a chance for all of us and the spec to show activity now and tomorrow by starting iterating the next items for a 1.1 or 2.0. I don't think we need to cross 2.0 yet but 1.1 with breaking change polish would be an awesome plan that we could separately evaluate in a more focused series of shorter discussions.
[IMPORTANT EDIT] : does anyone play overwatch ?
I still think there is room as usual for improvements,
Would you like to restart the discussion around the 3-4 points? Near the beginning of the original thread I expressed that it seems at least some of the desired changes can be done without breaking backwards compatibility. Perhaps an individual Github Issue for each of the items?
Can we close this one?
Yes.
@akarnokd
In https://github.com/reactive-streams/reactive-streams-jvm/pull/339#issuecomment-280720388 you argue that Akka Streams is not RS compliant in the same way that RxJava 2 and Reactor 3 are, based on the evidence that fusion makes internal execution synchronous, which you deem ruled out by definition. There are some misunderstandings here, please allow me to elaborate.
What is the purpose of Reactive Streams?
As stated—prominently—the purpose is non-blocking back-pressure for passing data elements across an asynchronous boundary. It is meant as an interoperability standard for plugging together different reactive streaming libraries.
This is crucial: it is about efficient exchange across an asynchronous boundary, also across different libraries.
Design of Akka Streams
Akka Streams chooses to use a lifted representation. The DSL that is exposed to users describes a processing graph, where Reactive Streams interfaces do not feature except in Source and Sink adapters, because the DSL does not produce reactive streams—it only describes how they should look like after materialization.
After materialization a running stream (graph) is represented as actors. Actors communicate exclusively by asynchronous means and therefore employ the RS standard. Every interaction on this level is fully RS compliant—even though technically this would not be required since no user can access the internal connections.
If you were to use a
Sink.publisher
and sent arequest(0)
to the materialization result you would see a spec-compliant rejection. This is all that matters.The downsides of choosing a direct implementation
If you choose to use Publisher as the end-user interface—which we explicitly warn against due to the difficulty and overhead of verifying the TCK-compliance of all Subscribers—then you are bound to the rules of Reactive Streams. These rules are crafted for asynchronous boundaries because that is their purpose. It is not surprising that the rules demand some qualities that lead to suboptimal performance for a fully synchronous implementation. You would be able to cut corners, and not in the bad sense, since you know that within your processing network some interactions are synchronous.
Amending the RS spec to cater to this synchronous use-case is counter-productive since it weakens the original purpose.
To put it bluntly: this discussion would be moot if Flowable were not to implement Publisher and only return a Publisher from
.strict()
(which would then have a better name). I have not followed the discussions that led up to the current design, so don’t take this as a dismissal of it—there are probably very good reasons for this choice. But this choice does have downsides as discussed above.