ReactiveX / RxJava

RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
Apache License 2.0
47.85k stars 7.61k forks source link

Conflate Operator #4856

Closed mrudangit closed 6 years ago

mrudangit commented 7 years ago

RxJava should have conflate operator. I see we have sample, debounce etc but actually they are not equivalent to conflate .

Conflate operator should behave as Sample if updates are faster than the given interval. But if updates stops and new update comes after a long period it should push immediately. Instead of waiting for sampling period.

JakeWharton commented 7 years ago

Is the behavior your after equivalent to zip(stream, interval())? I too want such an operator and I was about to file an issue tomorrow. It finally hit my three separate use cases bar for wanting it to be first party.

On Tue, Nov 15, 2016, 7:06 PM MajorMud notifications@github.com wrote:

RxJava should have conflate operator. I see we have sample, debounce etc but actually they are not equivalent to conflate .

Conflate operator should behave as Sample if updates are faster than the given interval. But if updates stops and new update comes after a long period it should push immediately. Instead of waiting for sampling period.

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/ReactiveX/RxJava/issues/4856, or mute the thread https://github.com/notifications/unsubscribe-auth/AAEEEd0LKzKaRnG1inDQ8dVB6VichZr9ks5q-nNQgaJpZM4KzS88 .

mrudangit commented 7 years ago

Not sure how would you achieve with zip.

sample at timeout out interval if source is emitting too fast but if source is slower than the timeout interval emit right away.

O.O.O.O.O...............O --->-->-->-->-->-->-->--> ----X-------X-------------X

abersnaze commented 7 years ago

Is throttleFirst closer to what you are looking for?

JakeWharton commented 7 years ago

throttleFirst drops events. I believe this doesn't want that. It's like "at most" every X interval.

On Tue, Nov 15, 2016, 8:25 PM George Campbell notifications@github.com wrote:

Is throttleFirst closer to what you are looking for?

https://camo.githubusercontent.com/fe3f3d248d4933e30866c27188277684b24cbef8/68747470733a2f2f7261772e6769746875622e636f6d2f77696b692f5265616374697665582f52784a6176612f696d616765732f72782d6f70657261746f72732f7468726f74746c6546697273742e706e67

— You are receiving this because you commented.

Reply to this email directly, view it on GitHub https://github.com/ReactiveX/RxJava/issues/4856#issuecomment-260850974, or mute the thread https://github.com/notifications/unsubscribe-auth/AAEEEQHZBk0SiHGv1w-cS-l-XTh92gBaks5q-oWvgaJpZM4KzS88 .

mrudangit commented 7 years ago

throttleLast/Sample is what i want if source is publishing too fast but then publish the item right away don't wait for the sample timeout if it was published slower than then timeout interval.

took stab at this

https://github.com/mrudangit/HelloRxJava/blob/master/src/main/java/com/solutionarchitects/Conflation.java

abersnaze commented 7 years ago

How about this? image

JakeWharton commented 7 years ago

For my case I would need green before the blues. If that's not conflate I'll file a separate issue.

On Tue, Nov 15, 2016, 8:43 PM George Campbell notifications@github.com wrote:

How about this? [image: image] https://cloud.githubusercontent.com/assets/406038/20335040/e0c40332-ab73-11e6-9238-17bd9ff8cbbc.png

— You are receiving this because you commented.

Reply to this email directly, view it on GitHub https://github.com/ReactiveX/RxJava/issues/4856#issuecomment-260852854, or mute the thread https://github.com/notifications/unsubscribe-auth/AAEEEeugJuqKqFvD7OH2pRXgKPqcpkyfks5q-ooGgaJpZM4KzS88 .

abersnaze commented 7 years ago

I was being lazy about the timing of the end of yellow's timeout and cyan's arrival. I was mostly trying to highlight that this is throttleFirstAndLastButNotLastIfItIsTheSameAsTheFirst

mrudangit commented 7 years ago

Red should output at the end of first period, Green at the end of 2nd period, dark blue at the end of 3rd period , not at the start of period as shown. How to generate diagram. probably can explain that way better

abersnaze commented 7 years ago

Was I misinterpreting this statement?

publish the item right away don't wait for the sample timeout

That seems to conflict with

Red should output at the end of first period

The diagram source file is an OmniGraffle in the wiki repo https://github.com/ReactiveX/RxJava.wiki.git the file is images/rx-operators.graffle

mrudangit commented 7 years ago

image

akarnokd commented 7 years ago

If I understand correctly, you need a throttleFirst but instead of dropping the incoming values in the period, you'd want to keep the latest and emit it immediately once the period ends, starting a new period. What should happen if there is an active period plus a latest element is waiting and the main source completes?

mrudangit commented 7 years ago

emit the last one emitted before onComplete yes want to throttle first but if there is inactivity more than the throttle period when source emits emit right away bcos we already waited more than the throttle periods

so in my diagram yellow is emitted right away even though it falls between the throttle period because it wait one full throttle period before

akarnokd commented 7 years ago

I think this operator should go into rxjava-extras/RxJava2Extensions as it seems to address the needs of a small set of developers only and possibly introduce confusion with throttleFirst. I'm sure @davidmoten and I will accept such operator if contributed.

mrudangit commented 7 years ago

Yes makes sense happy to contribute any guidelines please share advise

akarnokd commented 7 years ago

Just clone the respective repo, create a new branch, pick a package (hu.akarnokd.rxjava2.operators for RxJava2Extensions, com.github.davidmoten.rx.internal.operators for rxjava-extras) write the operator, create the pull request and we will review it.

mrudangit commented 7 years ago

i cloned created a local branch 'conflateOperator'

try to push get error .

remote: Permission to akarnokd/RxJava2Extensions.git denied to mrudangit. fatal: unable to access 'https://github.com/akarnokd/RxJava2Extensions.git/': The requested URL returned error: 403

akarnokd commented 7 years ago

Push to your own repository.

mrudangit commented 7 years ago

pushed to https://github.com/mrudangit/RxJava2Conflate

akarnokd commented 7 years ago

Now if you go to my repo, you should see the create PR option.

valeriyo commented 7 years ago

@mrudangit, in your latest drawing, shouldn't green be emitted (immediately), and then dark-blue, and light-blue, and then purple (delayed)?

mrudangit commented 7 years ago

yes it can be looked that way also to convey intent i did that way but yes the first one should be output immediately and the. delay as per interval

akarnokd commented 7 years ago

Did you mean this operator & behavior: ObservableConflate.java?

mrudangit commented 7 years ago

nice. as per test looks what it should do . will take it to spin. is it with Rxjava2 ?

akarnokd commented 7 years ago

RxJava 1.x

valeriyo commented 7 years ago

I've been looking for a non-lagging rate-limiting operator like the one described here for a while now, and couldn't find it. So, after some collaboration with our in-house experts (including @JakeWharton and @loganj), we came up with a compose-transformer (tested with RxJava 1.2.3):

public static <T> Transformer<T, T> adaptiveSample(long time, TimeUnit unit, Scheduler scheduler) {
  return source -> source
      .publish(shared -> concat(
          shared.take(1),
          shared.sample(time, unit, scheduler))
          .repeatWhen(a -> shared.debounce(time, unit, scheduler)));
}

This must be the most natural rate-limiting operator, because at any given moment of time it's as close to the source observable as possible, given the restriction of the rate-limiting time period. For example (time period == 4 chars):

source: -1-2-3-45------6-7-8-
output: -1---3---5-----6---8-

Notice how it emits immediately after a period of quiet, and then continues to sample while source is emitting at high rate. Thus, there is no unnecessary lag, as it "adapts" the sampling to the source, hence the name.

It could be used to rate-limit UI updates of an "unread message count", or to limit frequency of metadata refresh requests to server. Basically, it's what most people expect when they start looking at sample, throttle, throttleFirst, and similar operators.. which unfortunately do not work too well in real-world scenarios.

@mrudangit - could you try it and see if it works for you?

mrudangit commented 7 years ago

@valeriyo looks good in testing. noticed one thing if i have more than 1 subscribers they are not getting same values. some get one published before etc. in conflation done on publish side all the subscribers should get the same conflated output .

@akarnokd I am not able to compile in Java 8 / intellij .

akarnokd commented 7 years ago

@mrudangit what is the compiler's error message?

akarnokd commented 7 years ago

I see. Oddly, Eclipse didn't complain. I have updated the gist.

mrudangit commented 7 years ago

I get this error. Error:(45, 17) java: name clash: call(rx.Subscriber<? super T>) in com.xxx.. overrides a method whose erasure is the same as another method, yet neither overrides the other first method: call(T) in rx.functions.Func1 second method: call(T) in rx.functions.Action1

akarnokd commented 7 years ago

I'm closing this issue due to inactivity. If you have further input on the issue, don't hesitate to reopen this issue or post a new one.

JakeWharton commented 6 years ago

I built my 3rd instance of this for a project (next time i'm stealing David's gist!). It seems to come up a lot when building UIs. You want to be notified immediately and then no frequently than X, unless more than X has passed in which case the next notification should be immediate. This is especially true as we have a lot of sources for UI which emit synchronously upon subscribe and then connect to some asynchronously updating source.

Sometimes I just compromise and do a take(1) and throttleLast merged despite it slowing down the async data.

I'm all for keeping RxJava small. There's plenty of custom one-off custom or composite operators I've had to build that I wouldn't want in the main lib. This one seems like it could make the cut for general applicability. I was able to find 2 other people in 10 minutes who had written versions of this (aside from the two others in this thread and me).

I'm not sure how we would determine whether or not something like this makes the cut. For me, it can be thought of as a variant of existing operators rather than something wildly new, it can't be created as a composite operator with acceptibly-low overhead (Valeriy's solution works, but it allocates quite a bit), and the use case seems non-rare (based on a small sample, no pun intended).

Would love to hear what others think.

valeriyo commented 6 years ago

Hi @JakeWharton,

Thanks for bringing this issue back to life :)

In my opinion, none of the built-in throttling operators are usable "as is" for many common tasks (including updating UI):

The middle two operators also maintain a "ticking" timer, which would wake up and re-schedule each and every interval, even if there is nothing to emit... hence, wasteful with bursty streams of events.

For these reasons, my opinion is that it would be way better to have one rate-limiting operator, which works (no latency, not lossy, no starvation, no unnecessary ticking timer) than have 4+ flawed ones ;)

Heck, it took me weeks, if not months to 1) realize that none of the built-in operators do what I want, 2) formulate the desired behavior, 3) search for solution online, give up, then consult, implement, and test - it shouldn't be so difficult!

By the way, here is the revised version, without repeatWhen (per #5414):

public static <T> Transformer<T, T> adaptiveSample(long time, TimeUnit unit, Scheduler scheduler) {
  return source -> source
      .publish(shared -> shared
          .debounce(time, unit, scheduler)
          .map(a -> 0)
          .startWith(0)
          .switchMap(a -> concat(shared.take(1), shared.sample(time, unit, scheduler))));
}

Regarding naming, I'm not sure that "conflate" is a good name... it's short, but it doesn't convey the meaning well enough. Maybe "rateLimit" or "naturalSample"... something more easily understandable?

Thanks for reading.

akarnokd commented 6 years ago

This is a bit old issue and I can't remember the exact pattern expected here. I guess we can add this to RxJava. For discoverability, I'd name it throttleAndSample so it appears along with the other throttleX operators.