Closed akarnokd closed 8 years ago
I personally am not a fan of Flow
or Flowable
. The Observable
type is part of the Reactive Extensions brand and domain knowledge, so I don't think it's good to move away from that name.
Similar with Subject
. I think that can implement Processor
but doesn't need to stop being a Subject
.
RxJava/1 Subscription
does indeed need to become Disposable
.
The various Java 8 functions are a tricky one. I don't like how confusing they are. We named ours in a way that they sort correctly in docs, they are more descriptive, and we also support arities beyond 3.
I think we should first explore whether the Func*
and Action*
interfaces can just be refactored to implement those interfaces (with apply
instead of call
) for example. I'm not convinced of the value of using the java.util.function interfaces, particularly when we're targeting Java 8 where lambdas mean it should be rare to concretely implement one of them, it is just the shape of the lambda. We could move Action
to Consumer
but we retain Consumer0
, Consumer1
, Consumer2
etc. Same with Func
.
Similar with Subject. I think that can implement Processor but doesn't need to stop being a Subject I had to introduce ProcessorEx just to add our extra methods, so I guess Subject stays.
Java 8 functional interfaces indicate the purpose not just the shape, but it is a matter of taste. If lambdas are used, it doesn't really matter except when the user has some Runnable r = () -> { ... }
instance already and still has to convert it to Action0. JIT will probably inline the call properly, but if the call site ends up being megamorphic, we'll still get an extra wrapper object. Defining Action0 on top of Runnable this way:
interface Action0 extends Runnable { default void run() { call(); } void call(); }
This helps if one wants to send an Action0 to a executor but not the other way around.
Still, there could be name conflicts between Func0/Action0 and the higher arity functions when compared to JDK types: Supplier.get
vs Function.apply
and Runnable.run
vs Consumer.accept
. By itself, Callable
already has method name conflict with Supplier
.
For discoverability, we may define Func0 on top of Supplier and so on, but the method naming will be inconsistent across arities.
One thing I'd like to emphasize is the use of Predicate instead of Func1<T, Boolean>. I've read somewhere that the latter boxing/unboxing is not always optimized away by HotSpot and we get a reference equality check everywhere instead of just a CPU flag test.
It is hard to decide: an independent set of FuncX
, PredicateX
and ActionX
gives us more control, but RxJava 2.0 can be accused of having its own functional types instead of standard Java ones. Having such interfaces name their single method call
would certainly help with stateful implementations (as it does in RxJava 1.0).
I'm thinking about this right now ... wondering if Flowable
is the right thing as opposed to Observable
.
I still think this:
The Observable type is part of the Reactive Extensions brand and domain knowledge, so I don't think it's good to move away from that name.
However, what I'm wondering is whether we should have both Observable
and Flowable
. If we had both, Flowable
would implemented ReactiveStreams Publisher
and Observable
would not.
-> Observable
-> no backpressure -> hot data sources
-> Flowable
-> backpressure -> cold data sources, or hot ones converted to Flowable with flow control strategy
Is this something we should consider and pursue?
The name conflict is a smaller issue compared to the re-learning of the entry class name and scrub all the historical knowledge. I don't believe there will be many that use both versions in the same project and those who switch have to revisit every use place due to the package/API changes anyway.
Let's stick to Observable
What about different types to represent hot and cold?
-> Observable -> no backpressure -> hot data sources -> Flowable -> backpressure -> cold data sources, or hot ones converted to Flowable with flow control strategy
That is what I have been proposing for a while, and what I am pursuing with RxMobile, since it only needs to deal with semi realtime hot streams that have no intrinsic support for back-pressure.
Semantically, conceptually, and scenario-wise Observable
and Flowable
are very different. In the current RxJava the lines have gotten blurred. This is a perfect place where having two types makes the distinction crystal clear. You are dealing with one kind of stream, and you need to take care of certain constraints, or you are dealing with a whole other kind of streams and you have different concerns.
It is not Flowable
either or Observable
, but Observable
and also Flowable
.
Like any collection, both types support the same kind of operators like map
, flatMap
, filter
, ... That is where you re-use your knowledge.
KISS FTW
Been thinking more about this. I conceptually like the idea of them being separate, and Observable
not supporting backpressure, but I don't know how to solve the flatMap
issue without an unbounded buffer, and I really don't think we should have an unbounded buffer unless a developer asks for it.
In Rx.Net merge
/flatMap
doesn't have an unbounded buffer – because it blocks emissions when contention occurs by synchronizing (at least as of a couple years ago).
In RxJava, where we're targeting environments with event loops like Netty, merge
can't be implemented using synchronization. To be non-blocking merge
must allow asynchronous enqueuing of items when contention occurs. This means either (a) unbounded buffers, or (b) bounded buffers and backpressure.
@headinthebox I know that you argue that unbounded buffers are not always bad, and I agree that they aren't always bad. I think they're just fine when a developer opts in to it. I do however think they are bad when hidden, and don't feel comfortable reverting the decisions of 1.x to allow hidden unbounded buffers again.
So, "KISS FTW" sounds nice, but it doesn't address the real issue of not-simple-to-debug silent latency and memory bloat of unbounded buffers. I have experienced this with RxJava 0.x pre-backpressure in production systems.
So, if we do have an Observable
without backpressure, how do we address merge
/flatMap
such that we don't have an unbounded buffer?
Let me state upfront that I do not understand why Netty excludes using synchronization, but that is my total ignorance on how Netty works.
However, I think that you can (roughly) achieve the same effect as non-blocking merge by doing an observeOn
with the trampoline scheduler afterwards (if you squint at the queue draining code you will see it looks very much like a trampoline).
If that is true, then the use of a buffer becomes explicit due to the observeOn
and it obvious in your code that you have to be careful, and no worse than when you use onBackPressureBuffer
.
(BTW with async code there is no such thing as "simple-to-debug" ;-)
I do not understand why Netty excludes using synchronization
It's not just Netty. Anything using event loops, including Schedulers.computation()
, Aeron, CouchBase, most if not all memcached clients I'm aware of, Akka, and virtually anything else using NIO.
Synchronization of 2 event loops together would result in 1 thread being blocked. That is an absolute no for RxJava to cause.
observeOn
I focused on merge
/flatMap, but
observeOnand
zipare the other 2 major operators that have unbounded buffers and are problems.
mergecould indeed be implemented like
observeOn`, but that again is an unbounded buffer without backpressure.
(BTW with async code there is no such thing as "simple-to-debug" ;-)
There are gradients of "simplicity", it's not black-or-white. I'd rather encapsulate as much complexity in the implementation as possible so operating it is simpler. I'd also rather require a little more thought at time of creating an Observable
than when consuming it or during system operation.
I think the biggest mistake in RxJava v1 after adding backpressure was continuing to let the raw Observable.create
be the primary way of creating an Observable
. It gave all the power tools without guiding towards the common "hot" and "cold" usage scenarios.
Unless we don't offer flatMap
/merge
/zip
/observeOn
operators (which is unlikely), I am not comfortable having an Observable
type with unbounded buffers hidden within operators. Thus, it leads me back to having a single type (either Observable
or Flowable
), and a createHot
method (or something with a similar name) that requires a choice of what to do if the consumer is too slow. By default it is to fail, but other strategies can be drop, sample, throttle, buffer (bounded or unbounded), etc.
cc @stevegury and @tmontgomery who I consulted with on this topic in case either of them want to weigh in.
I've opened a new issue to move this conversation about bounded/unbounded: https://github.com/ReactiveX/RxJava/issues/3213
If this is so important for you, then we should only have Flowable
and always do back pressure, and not call it Observable
anymore.
[I remain of the opinion that it is absolutely OK that it is the developer's responsibility to make sure their app runs under the required memory & time constrains, and that given we are in a Turing complete world, unbounded buffers are no different than any other unbounded use of memory, or time. But obviously I am alone in this viewpoint, so will happily surrender to the majority vote.]
@headinthebox Is this perspective driven by theoretical purity about Observable
being "purely reactive", or is there a functional or usability concern with the RxJava v1 Observable
type that drives this?
That seems to be a purely theoretical and subjective argument then, not one based on functionality. Is there functionality or behavior that causes problems?
From a design standpoint, I feel that separating out the backpressure/hot stream concept is a huge point. One of the most confusing things to get your head round in Rx is the concept of backpressure since the distinction between them is very blurred in both the docs and the actual implementation.
From a technical perspective, I can see it being a burden to maintain possibly two concurrent sets of operators but I'm approaching this purely from a design standpoint.
It was mentioned to me that it may help to restate what I'm seeking for here as it could be interpreted as a theoretical and polarized argument. Good point, so I'll try. It is somewhat long.
I am open to having 2 types in RxJava to represent bounded/unbounded, hot/cold, Observable/Flowable.
However, I do want solid reasons for the decisions, as we spent over a year debating many proposals in the 0.x phase before landing on an Observable
with reactive-pull backpressure and choosing to adopt it as the single type. The community, and @headinthebox, were involved in that design and decision.
I and others then spent many months of debate formalizing the pattern in Reactive Streams, demonstrating across many companies and projects that a async stream type with reactive-pull backpressure is valid and wanted. Those types are now proposed for inclusion in JDK 9 (j.u.c.Flow), demonstrating fairly broad agreement.
In the time since while using the reactive-pull Observable
type, I have not found a streaming use case it does not work well for, including "hot" streams. In fact, I find the backpressure signal to be great on "hot" streams, as it forces me to consider how to behave when production is faster than slower, and I have the tools to apply a strategy.
That said, I also recognize that in many environments, an Observable
with unbounded buffers works just fine when only working with small finite streams of data, particularly request/response environments (which can use the Single
) type.
I prefer not making users have to choose between two virtually identical types (Observable
and Flowable
) unless there are important functional, usability or performance benefits to doing so.
If we do have two types, here is how I would envision them co-existing:
Creation is as expected, emit however one wishes:
create(s -> {
onNext(t)
})
Cold generation though should be discouraged, and should return a Flowable
. For example:
Observable.from(iterable) -> Flowable
That case should just return a Flowable
as that is what it is. Or we just don't have from
or just
overloads, which would make more sense, since Observable
is intended to be "hot", otherwise the type doesn't help communicate the distinction.
The merge
/flatMap
and observeOn
cases would be unbounded with their associated dangers.
I suggest that zip(Observable, Observable)
not exist, but allow observable.zipWith(Iterable)
or Observable.zipWith(Flowable)
An Observable
could become a Flowable
with a backpressure strategy:
observable.toFlowable(Strategy.DROP).
The Flowable
type could be used for both "hot" or "cold", but "hot" Flowable
instances would always have a strategy for dealing with a backpressure signal: drop, buffer, whatever.
Flowable.createHot(s -> {
s.onNext(...)
}, Strategy.FAIL)
// cold sync generator
Flowable.createSync(...)
// cold async generator
Flowable.createAsync(...)
How would these two types be used when producing public APIs in libraries? I think it will be more confusing than today where this is just one type unless it is very clear that Observable
represents "hot" data. If we are clear in that distinction then it could work well.
/**
* Observable signals a "hot" stream where you must account for flow control yourself or risk unbounded latency and memory growth.
*/
Observable<T> getStuff();
/**
* Flowable signals a "cold" or "hot" stream that will adapt its flow control, or emit an error if it overwhelms your consumption.
*/
Flowable<T> getStuff2();
What functional, performance or usability items warrant making people choose between 2 very similar types, when Flowable
can do all of the behavior?
Is the "confusion" of backpressure in v1 Observable
just because we did a poor job of API design on Observable.create
? Or is it more fundamental?
I believe it is just a usability issue of Observable.create
that makes people have to think about backpressure too much. I think that issue should effectively be hidden from almost everyone.
I think it's possible to have a single type that is easy to use for all of these use cases ("hot", "cold", backpressured, not-backpressured), and it's purely an implementation detail of the operators.
I think it would be far more confusing to choose when to expose an Observable
vs a Flowable
than to just have a single type with proper "creation methods" that enable simply creating the correct "hot" or "cold" behavior.
Then again, perhaps it is worth communicating via a type that something is "hot": https://github.com/ReactiveX/RxJava/issues/2785
Community ... please provide your insight.
Do we have the capacity to maintain 4 versions at the same time (1.x, 2.x backpressure, 2.x unbounded, Single)?
I wouldn't use the name Flowable
unless it extends j.u.c.Flow
and I'd stick to Observable
for the 2.x backpressurre version.
We can likely support Java 9 with 2.x without giving up Java 8 compatibility with multi-release jar files: http://openjdk.java.net/jeps/238 I intend on implementing Flow and Publisher (if the versioned jar feature works).
The capacity to maintain multiple variants is a very good question, and valid for decision making. Particularly when all use cases are addressed by 1 implementation.
Though, most operators would behave the same on both. We could probably make most of this just a public API division with most of the implementation sharing. If it makes sense to have two types.
Here is an attempt at separating out behavior between Observable
and Flowable
:
Observable
: async push without backpressure
Flowable
: async pull, and push/pull with backpressure via batch requesting
Below I separate out the operators (such as temporal flow control) and creators/generators as applicable to each type.
I'd appreciate feedback on this direction.
This is for creating a true "push" Observable without backpressure
The subscribe
behavior should have take an Observer
without backpressure
A key question is whether Observable
should implement the Publisher
interface for seemless interop between Flowable
and Observable
. To be true to the types, Observable
should not implement Publisher
and we would need to call obs.toFlowable(strategy)
when interacting with Flowable
s subscribe(Subscriber<? super T>) // if we implement this, then we need a default OnBackpressureStrategy that would throw MissingBackpressureException
This would exist to convert to a Flowable
The following should all either not exist on Observable, or return Flowable instead
These would have unbounded buffering-
I don't think zip
should exist on Observable
as it is dangerous, and honestly should be a Flowable
we could however have something like this
these would go away
these should return a Single
instead of Observable
Most of the other operators make sense to leave as they are, but I've stopped itemizing them for brevity. Perhaps we can continue using Publisher
as the type so Observable
and Flowable
can compose together (but we need to answer what that means as I presented above)
This is for creating a "push-pull" Flowable with backpressure
We should also have generators for the common (and hard to implement) cases (these are being designed in 1.x right now)
the other expected factory methods for creation include-
The subscribe
behavior should be the Reactive Stream & juc.Flow Subscriber
with backpressure
but we can also have the Observer
overload for the default and common behavior of request(Long.MAX_VALUE)
The following should all either not exist on Observable, or return Flowable instead
These would have bounded buffering-
these should return a Single
instead of Flowable
I question any of the following existing on Flowable
since they are temporal in nature which is better suited to Observable
. Generally if time is involved, then Flowable
style flow control does not work, except as a signal to choose a strategy which would work when composing via the Observable.toFlowable(OnBackpressureStrategy)
choice.
Most of the other operators make sense to leave as they are, but I've stopped itemizing them for brevity.
Additionally, I think Single<T>
has been a good addition, so it should continue to exist.
I'd estimate a 75% of the operators require 3 implementations as backpressure awareness is subtle in many operators and guaranteed single value by itself allows different optimizations. I guess we still want high performance for Flowable, Observable and Single, right? The fact that there isn't one Subscriber type for all of them drags in individual XXXOperator<R, T> implementations (interface FlowableOperator, SingleOperator, ObservableOperator).
My question is that will these be prerequisites to a release or can we publish RS backpressure supporting Flowable and do the rest in 2.1?
I'd estimate a 75% of the operators require 3 implementations as backpressure awareness is subtle in many operators and guaranteed single value by itself allows different optimizations. I guess we still want high performance for Flowable, Observable and Single, right?
Yes we will want high performance for each. But internal implementations of operators is definitely something that can evolve and change over time after we release 2.0. The Flowable operators are the ones that can be leveraged across all 3 by injecting unbounded buffering and using Long.MAX_VALUE. It won't achieve the possible performance gains for Observable and Single, but those can come as we have time. Additionally, the perf and implementation differences will be most significant in merge, observeOn, and a handful of others. The rest are mostly synchronous, without buffers, and will have little gain if any from the change behind the MAX_VALUE fast-paths we already do (such as filter, map, take).
To get the public APIs right with Observable, Flowable, and Single, I suggest we use the Flowable operator implementations for them all and focus on API, correct behavior, composition, and stability. Then as we go we will end up with custom operators as needed (merge will happen sooner for example).
My question is that will these be prerequisites to a release or can we publish RS backpressure supporting Flowable and do the rest in 2.1?
Getting the relationship between the types is very important and will affect the public API of each over, thus I view this is an essential thing to get done before 2.0 is released since we can't change APIs of the Flowable type in 2.1, and as my examples above show, it will definitely cause Flowable to change.
I think the naming has come to a reasonable standing.
Since we are aiming at Java 9 and
j.u.c.Flow
as the base API, we have the opportunity to use Java-native functional interfaces instead of our Func and Action interfaces, plus rename other components.I propose the following changes:
Observable
->Flowable
XXXSubject
->XXXProcessor
XXXSubscription
->XXXDisposable
Func0
->j.u.c.f.Supplier
Func1
->j.u.c.f.Function
Func1<T, Boolean>
->j.u.c.f.Predicate
Func2
->j.u.c.f.BiFunction
Func3..FuncN
->rx2.functions.TriFunction
orrx2.functions.Function3
etc.Action0
->Runnable
Action1
->j.u.c.f.Consumer
Action2
->j.u.c.f.BiConsumer
Action3..ActionN
->rx2.functions.TriConsumer
orrx2.functions.Consumer3
etc.