ReactiveX / RxJava

RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
Apache License 2.0
47.89k stars 7.6k forks source link

2.0 Design: Bounded Buffers #3213

Closed benjchristensen closed 8 years ago

benjchristensen commented 9 years ago

A discussion about bounded and unbounded buffers evolved out of a naming discussion in https://github.com/ReactiveX/RxJava/issues/2787#issuecomment-133470735 I want to make it a top-level discussion as it is important.

The question is whether we should support a type in v2 that does not support backpressure and has unbounded buffers in merge/flatMap/observeOn/zip. Effectively this would mean having a type like RxJava 0.19 and earlier, before backpressure was added.

If we did have both types, we would have something like Observable (without backpressure) and Flowable (with backpressure).

My perspective is that RxJava should never automatically use an unbounded buffer. It should only happen when a user opts-in, such as through use of onBackpressureBuffer or toList. Use of observeOn, merge/flatMap, and zip should not permit unbounded buffer growth, as that is not the use case any of those operators is asking for.

In a production application, unbounded buffer growth equates to unbounded latency and memory growth. Additionally, unbounded buffers are an illusion anyways, as everything is always bounded at some point, in time or space.

The argument for unbounded buffers typically is something like "mouse events can't be backpressured". However, this is not quite correct. Even mouse events already have flow control being applied to them (backpressure is just one form of flow control) by the system through sampling resolution.

For a "hot" data source, such as mouse events, or system events (such as occur in our server-side apps as side effects of processing user traffic), we can't "stop" the flow of events, but buffers must be bounded, and we must apply a strategy for how to deal with overflow.

The "backpressure signal", if unhandled should cause a failure, not result in unbounded latency and memory growth. The "backpressure signal" can in turn be used to trigger strategies such as sampling, dropping, throttling, debouncing, and-the-like.

Thus, my proposal is that RxJava v2 stick to the decisions made in v1 to provide a single stream type, Observable, that supports backpressure, and not use unbounded buffers unless they are explicitly asked for.

The "complexity" concerns of modeling "hot" sources (those I sometimes hear called "truly reactive") should be solved by a better Observable.create API that allows simple onNext behavior with a choice of strategy for handling the backpressure signal.

benjchristensen commented 9 years ago

@ReactiveX/rxjava-committers please weigh in on this

akarnokd commented 9 years ago

I think it would be quite confusing to have two fluent base classes and having the Observable not use backpressure after 1.x.

I've been implementing operators such as merge/zip with a bufferSize parameter that let's it tune the internal buffers on a per-chain basis. It is possible to add an extra parameter unbounded which instructs the operator to use an unbounded SpscLinkedArrayQueue with island size of bufferSize.

stealthcode commented 9 years ago

If we decide to not have any unbounded buffers then I like the idea of having 2 types that fluently convert between each other. Otherwise it's not worth the confusion of having 2 types. Also a developer should be able to make an intelligent decision about what the buffer size should be for almost all context. For instance, an observable that consumes off a hot source and uses observeOn(Schedulers.computation()) should be bounded by the size of the computation scheduler thread pool.

An observable stream that windows once per some time interval could also be used to estimate an approximate necessary buffer size. For instance if the developer estimates that a windowed observable should be consumed and unsubscribed before the window emits another one then the developer might safely say that their bounds are 1.

Also in both context they should be able to say what they want to do in the case that the buffer capacity is exceeded. Similar to onBackpressureBuffer or onBackpressureDrop the bounds of Observable<Observable<T>> needs to have some concept of behavior.

IFF it's decided that we will not allow unbounded buffers then I propose the following:

The implementation and maintenance of having two different wrapper types (something like BoundedObservable and HotObservable) would be relatively trivial since they would delegate to the same underlying operators and native observable semantics (lift, subscribe, x, compose, etc). However the users would have to learn that they are working with a different type in different contexts. I think that this is a net positive in the sense that performance and efficiency concerns will be first class in the minds of the users. Also this would be useful for interpreting the code and understanding the intention.

akarnokd commented 8 years ago

I think all of the discussion points are now resolved: