ReactiveX / RxJavaReactiveStreams

Adapter between RxJava and ReactiveStreams
Apache License 2.0
235 stars 31 forks source link

Examples with Interop #5

Open benjchristensen opened 10 years ago

benjchristensen commented 10 years ago

We need some examples for the README to demonstrate usage.

Perhaps we can add unit test examples with dependencies on other libraries to demonstrate interop?

benjchristensen commented 10 years ago

/cc @smaldini

rkuhn commented 10 years ago

I’m currently looking into trying out some interop with Akka Streams, are artifacts published somewhere?

ldaley commented 10 years ago

You’ll have to hold off. It’s not spec compliant yet. Once it is I’ll get some artefacts published.

rkuhn commented 10 years ago

Bummer; I guess I’ll look into ratpack and Reactor then (want to show something at conferences next week).

ldaley commented 10 years ago

I’m actively working on this right now. Should have it working in the next day or two.

rkuhn commented 10 years ago

Cool, would you please ping this issue when I should try it out? Thanks!

ldaley commented 10 years ago

What I've just pushed should be compliant and I think is good enough for an initial release. There's probably bugs and there are certainly performance problems, but I think it can go out.

@benjchristensen how do we get a binary out?

benjchristensen commented 10 years ago

I'll push the buttons (since we haven't yet got the fully self-serve Travis based release process working yet like RxScala).

benjchristensen commented 10 years ago

0.2.0 is released on BinTray (https://bintray.com/reactivex/RxJava/RxJavaReactiveStreams/0.2.0/view) and is making its way to Maven Central.

@alkemist Thank you for your work on this. Would you mind providing a section on the README or in the Wiki with basic usage examples?

smaldini commented 10 years ago

Looking forward to it for my next demos :D

On Wed, Oct 29, 2014 at 5:23 PM, Ben Christensen notifications@github.com wrote:

0.2.0 is released on BinTray ( https://bintray.com/reactivex/RxJava/RxJavaReactiveStreams/0.2.0/view) and is making its way to Maven Central.

@alkemist https://github.com/alkemist Thank you for your work on this. Would you mind providing a section on the README or in the Wiki with basic usage examples?

— Reply to this email directly or view it on GitHub https://github.com/ReactiveX/RxJavaReactiveStreams/issues/5#issuecomment-60966573 .

Stéphane

benjchristensen commented 10 years ago

It can be seen on Maven Central now: http://repo1.maven.org/maven2/io/reactivex/rxjava-reactive-streams/0.2.0/

rkuhn commented 10 years ago

Thanks a lot, @benjchristensen and @alkemist!

ldaley commented 10 years ago

Pushed one example of interop with Ratpack: https://github.com/ReactiveX/RxJavaReactiveStreams/blob/0.x/examples/ratpack/src/test/java/rx/reactivestreams/example/ratpack/RatpackExamples.java#L46-46

Leaving this ticket open for more examples and some stuff in the README.

benjchristensen commented 10 years ago

Cool, nice to see that code, that's very helpful.

rkuhn commented 10 years ago

@alkemist I’m trying to extract the sample code you linked to (since I cannot figure out how to run the tests you point to). The problem I am facing is that I cannot figure out how to publish ratpack 0.9.10-SNAPSHOT locally, what is the magic incantation? I tried adding the maven-publish plugin but that does not do anything when I say ./gradlew publishToMavenLocal (and ratpack-rx:publishToMavenLocal does not exist).

smaldini commented 10 years ago

@rkuhn can you use this repo: maven { url "http://oss.jfrog.org/repo" } ? can't remember for sure but I think this is where goes all the ratpack snapshots. BTW if you can share your example I want to complete it with Reactor as I keep pitching : Reactor for the backend access/data layer, Akka Streams to get into an Actor system and scale out, RxJava to bridge with some metrics and especially Hystrix right now, Ratpack to bridge with the HTTP client (WS/ESS) :dancer:

rkuhn commented 10 years ago

Got it working, thanks! You can find my sample project here. @benjchristensen this might be interesting for you as well.

smaldini commented 10 years ago

@rkuhn Having some hard time configuring the sample build into an IDEA project :(

benjchristensen commented 10 years ago

Thanks @rkuhn ... that code looks like a good example for my talks next week as well. Do you mind if I use it (and possibly tweak/enhance it)?

rkuhn commented 10 years ago

@benjchristensen By all means: use it!

@smaldini That is the reason why I created an sbt project, I cannot figure out this gradle thing ;-) You should be able to just add a gradle build if you know better how that works (I don’t use IDEA).

rkuhn commented 10 years ago

@smaldini I added Reactor to my sample: Java and Scala. It works!

smaldini commented 10 years ago

Beautiful

smaldini commented 10 years ago

I am trying to convert the project to our own sample suite as I am having issues with both. Do you know where I can find 0.9 snapshots for Akka Streams, my build seems to complain about that.

rkuhn commented 10 years ago

Ah, yes: I am about to publish a suitable version, should not take more than an hour.

smaldini commented 10 years ago

I'll also propose we all use our own schedulers/dispatchers to make it more obvious that we talk each other handling async back pressure :dancers:

E.g. rxjava.observeOn(Schedulers.computation()), reactorStream.dispatchOn(new Environment())

rkuhn commented 10 years ago

Akka Streams 0.10-M1 is on its way to Maven Central; I’ll update my code as soon as it is there.

rkuhn commented 10 years ago

updated the build and it still works :-)

benjchristensen commented 10 years ago

Using Akka Streams 0.10-M1 I was playing around and I've found an issue somewhere that the backpressure isn't propagating. Not sure yet where.

package reactive_streams_interop;

import java.util.concurrent.atomic.AtomicInteger;

import org.reactivestreams.Publisher;

import rx.Observable;
import rx.RxReactiveStreams;
import rx.observables.GroupedObservable;
import rx.schedulers.Schedulers;
import akka.actor.ActorSystem;
import akka.stream.FlowMaterializer;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;

public class RxAkka {

    public static void main(String... args) {
        final ActorSystem system = ActorSystem.create("InteropTest");
        final FlowMaterializer mat = FlowMaterializer.create(system);

        final AtomicInteger numEmitted = new AtomicInteger();

        // RxJava Observable
        Observable<GroupedObservable<Boolean, Integer>> oddAndEvenGroups = Observable.range(1, 1000000)
                .doOnNext(i -> numEmitted.incrementAndGet())
                .groupBy(i -> i % 2 == 0)
                .take(2);

        Observable<String> strings = oddAndEvenGroups.<String> flatMap(group -> {
            // schedule odd and even on different event loops
                Observable<Integer> asyncGroup = group.observeOn(Schedulers.computation());
                /* using Akka Streams */
                // convert to Reactive Streams Publisher
                Publisher<Integer> groupPublisher = RxReactiveStreams.toPublisher(asyncGroup);
                // convert to Akka Streams Source
                Source<String> stringSource = Source.from(groupPublisher).map(i -> i + " " + group.getKey());
                // convert back from Akka to Rx Observable
                return RxReactiveStreams.toObservable(stringSource.take(2000).runWith(Sink.<String> fanoutPublisher(1, 1), mat));

                /* using only Rx */
                //                return asyncGroup.take(2000).map(i -> i + " " + group.getKey());
            });

        strings.toBlocking().forEach(System.out::println);
        system.shutdown();
        System.out.println("Number emitted from source (should be < 6000): " + numEmitted.get());
    }
}

This non-deterministically blows up with:

Exception in thread "main" java.lang.RuntimeException: rx.exceptions.MissingBackpressureException
    at rx.observables.BlockingObservable.forEach(BlockingObservable.java:138)
    at reactive_streams_interop.RxAkka.main(RxAkka.java:45)
Caused by: rx.exceptions.MissingBackpressureException
    at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:222)
    at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.onNext(OperatorObserveOn.java:115)
    at rx.internal.operators.OperatorGroupBy$GroupBySubscriber$1$3.onNext(OperatorGroupBy.java:236)
    at rx.internal.operators.BufferUntilSubscriber.onNext(BufferUntilSubscriber.java:181)
    at rx.internal.operators.NotificationLite.accept(NotificationLite.java:150)
    at rx.internal.operators.OperatorGroupBy$GroupBySubscriber.emitItem(OperatorGroupBy.java:278)
    at rx.internal.operators.OperatorGroupBy$GroupBySubscriber.onNext(OperatorGroupBy.java:182)
    at rx.internal.operators.OperatorDoOnEach$1.onNext(OperatorDoOnEach.java:84)
    at rx.internal.operators.OnSubscribeRange$RangeProducer.request(OnSubscribeRange.java:93)
    at rx.Subscriber.setProducer(Subscriber.java:143)
    at rx.Subscriber.setProducer(Subscriber.java:137)
    at rx.internal.operators.OnSubscribeRange.call(OnSubscribeRange.java:39)
    at rx.internal.operators.OnSubscribeRange.call(OnSubscribeRange.java:1)
    at rx.Observable$1.call(Observable.java:145)
    at rx.Observable$1.call(Observable.java:1)
    at rx.Observable$1.call(Observable.java:145)
    at rx.Observable$1.call(Observable.java:1)
    at rx.Observable$1.call(Observable.java:145)
    at rx.Observable$1.call(Observable.java:1)
    at rx.Observable$1.call(Observable.java:145)
    at rx.Observable$1.call(Observable.java:1)
    at rx.Observable$1.call(Observable.java:145)
    at rx.Observable$1.call(Observable.java:1)
    at rx.Observable.subscribe(Observable.java:7463)
    at rx.observables.BlockingObservable.forEach(BlockingObservable.java:98)
    ... 1 more
[ERROR] [11/01/2014 22:45:52.646] [InteropTest-akka.actor.default-dispatcher-16] [akka://InteropTest/user/$a/flow-2-1-map] failure during processing
rx.exceptions.MissingBackpressureException
    at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:222)
    at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.onNext(OperatorObserveOn.java:115)
    at rx.internal.operators.OperatorGroupBy$GroupBySubscriber$1$3.onNext(OperatorGroupBy.java:236)
    at rx.internal.operators.BufferUntilSubscriber.onNext(BufferUntilSubscriber.java:181)
    at rx.internal.operators.NotificationLite.accept(NotificationLite.java:150)
    at rx.internal.operators.OperatorGroupBy$GroupBySubscriber.emitItem(OperatorGroupBy.java:278)
    at rx.internal.operators.OperatorGroupBy$GroupBySubscriber.onNext(OperatorGroupBy.java:182)
    at rx.internal.operators.OperatorDoOnEach$1.onNext(OperatorDoOnEach.java:84)
    at rx.internal.operators.OnSubscribeRange$RangeProducer.request(OnSubscribeRange.java:93)
    at rx.Subscriber.setProducer(Subscriber.java:143)
    at rx.Subscriber.setProducer(Subscriber.java:137)
    at rx.internal.operators.OnSubscribeRange.call(OnSubscribeRange.java:39)
    at rx.internal.operators.OnSubscribeRange.call(OnSubscribeRange.java:1)
    at rx.Observable$1.call(Observable.java:145)
    at rx.Observable$1.call(Observable.java:1)
    at rx.Observable$1.call(Observable.java:145)
    at rx.Observable$1.call(Observable.java:1)
    at rx.Observable$1.call(Observable.java:145)
    at rx.Observable$1.call(Observable.java:1)
    at rx.Observable$1.call(Observable.java:145)
    at rx.Observable$1.call(Observable.java:1)
    at rx.Observable$1.call(Observable.java:145)
    at rx.Observable$1.call(Observable.java:1)
    at rx.Observable.subscribe(Observable.java:7463)
    at rx.observables.BlockingObservable.forEach(BlockingObservable.java:98)
    at reactive_streams_interop.RxAkka.main(RxAkka.java:45)

This means the request flow isn't working.

It sometimes works however:

3998 true
4000 true
Number emitted from source (should be < 6000): 7055
[INFO] [11/01/2014 22:46:56.696] [InteropTest-akka.actor.default-dispatcher-23] [akka://InteropTest/user/$a/flow-1-fanoutPublisher] Message [akka.stream.impl.Cancel] from Actor[akka://InteropTest/deadLetters] to Actor[akka://InteropTest/user/$a/flow-1-fanoutPublisher#-718931434] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [11/01/2014 22:46:56.696] [InteropTest-akka.actor.default-dispatcher-23] [akka://InteropTest/user/$a/flow-2-2-take] Message [akka.stream.impl.RequestMore] from Actor[akka://InteropTest/deadLetters] to Actor[akka://InteropTest/user/$a/flow-2-2-take#-1094054996] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [11/01/2014 22:46:56.696] [InteropTest-akka.actor.default-dispatcher-23] [akka://InteropTest/user/$a/flow-2-2-take] Message [akka.stream.impl.RequestMore] from Actor[akka://InteropTest/deadLetters] to Actor[akka://InteropTest/user/$a/flow-2-2-take#-1094054996] was not delivered. [3] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [11/01/2014 22:46:56.696] [InteropTest-akka.actor.default-dispatcher-23] [akka://InteropTest/user/$a/flow-2-fanoutPublisher] Message [akka.stream.impl.Cancel] from Actor[akka://InteropTest/deadLetters] to Actor[akka://InteropTest/user/$a/flow-2-fanoutPublisher#51622231] was not delivered. [4] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [11/01/2014 22:46:56.696] [InteropTest-akka.actor.default-dispatcher-23] [akka://InteropTest/user/$a/flow-1-2-take] Message [akka.stream.actor.ActorSubscriberMessage$OnNext] from Actor[akka://InteropTest/deadLetters] to Actor[akka://InteropTest/user/$a/flow-1-2-take#-1970444328] was not delivered. [5] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [11/01/2014 22:46:56.697] [InteropTest-akka.actor.default-dispatcher-23] [akka://InteropTest/user/$a/flow-1-2-take] Message [akka.stream.actor.ActorSubscriberMessage$OnNext] from Actor[akka://InteropTest/deadLetters] to Actor[akka://InteropTest/user/$a/flow-1-2-take#-1970444328] was not delivered. [6] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [11/01/2014 22:46:56.697] [InteropTest-akka.actor.default-dispatcher-23] [akka://InteropTest/user/$a/flow-1-2-take] Message [akka.stream.actor.ActorSubscriberMessage$OnNext] from Actor[akka://InteropTest/deadLetters] to Actor[akka://InteropTest/user/$a/flow-1-2-take#-1970444328] was not delivered. [7] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [11/01/2014 22:46:56.697] [InteropTest-akka.actor.default-dispatcher-23] [akka://InteropTest/user/$a/flow-1-2-take] Message [akka.stream.actor.ActorSubscriberMessage$OnNext] from Actor[akka://InteropTest/deadLetters] to Actor[akka://InteropTest/user/$a/flow-1-2-take#-1970444328] was not delivered. [8] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [11/01/2014 22:46:56.697] [InteropTest-akka.actor.default-dispatcher-23] [akka://InteropTest/user/$a/flow-1-2-take] Message [akka.stream.impl.RequestMore] from Actor[akka://InteropTest/deadLetters] to Actor[akka://InteropTest/user/$a/flow-1-2-take#-1970444328] was not delivered. [9] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

It works deterministically if I use just Rx without the conversion to/from. I have not yet spent time to hunt down where the issue is occurring.

benjchristensen commented 10 years ago

The above is done with:

compile 'io.reactivex:rxjava:1.0.0.rc.8'
compile 'io.reactivex:rxjava-reactive-streams:0.3.0'
compile 'com.typesafe.akka:akka-stream-experimental_2.11:0.10-M1'
benjchristensen commented 10 years ago

Here is an example I'm considering using for a presentation on Tuesday. It's buggy right now (as shown above) but demonstrates the goals of interop while going through non-trivial operators (groupBy and flatMap) along with injected concurrency and thread-hopping.

rxjava-akka-streams-interop-example

Any recommendations on what to do differently that would be better? Can someone provide me a more realistic example of going from or to Akka Streams? I'd prefer to have something that is not so contrived if possible.

@smaldini I'll play more with yours next as shown in https://github.com/rkuhn/ReactiveStreamsInterop/blob/7124906fb50f9a91cee4e8d58c00853898eed239/src/main/java/com/rolandkuhn/rsinterop/JavaMain.java

benjchristensen commented 10 years ago

This example with Reactor and RxJava seems to be working deterministically:

package reactive_streams_interop;

import java.util.concurrent.atomic.AtomicInteger;

import org.reactivestreams.Publisher;

import reactor.rx.Stream;
import reactor.rx.Streams;
import rx.Observable;
import rx.RxReactiveStreams;
import rx.observables.GroupedObservable;
import rx.schedulers.Schedulers;

public class RxReactor {

    public static void main(String... args) {

        final AtomicInteger numEmitted = new AtomicInteger();

        // RxJava Observable
        Observable<GroupedObservable<Boolean, Integer>> oddAndEvenGroups = Observable.range(1, 1000000)
                .doOnNext(i -> numEmitted.incrementAndGet())
                .groupBy(i -> i % 2 == 0)
                .take(2);

        Observable<String> strings = oddAndEvenGroups.<String> flatMap(group -> {
            // schedule odd and even on different event loops
                Observable<Integer> asyncGroup = group.observeOn(Schedulers.computation());

                // convert to Reactive Streams Publisher
                Publisher<Integer> groupPublisher = RxReactiveStreams.toPublisher(asyncGroup);

                // Convert to Reactor Stream
                final Stream<String> linesStream = Streams.create(groupPublisher).map(i -> i + " " + group.getKey()).take(2000);

                // convert back from Reactor Stream to Rx Observable
                return RxReactiveStreams.toObservable(linesStream);
            });

        strings.toBlocking().forEach(System.out::println);
        System.out.println("Number emitted from source (should be < 6000): " + numEmitted.get());
    }
}

Output ends with:

3987 false
3989 false
3991 false
3993 false
3995 false
3997 false
3999 false
Number emitted from source (should be < 6000): 4098
// sometimes with this (which is okay ... concurrency races) ...
Number emitted from source (should be < 6000): 6147

@smaldini Do you have a better example you'd like me to show?

benjchristensen commented 10 years ago

rxjava-reactor-interop-example

benjchristensen commented 10 years ago

@rkuhn Are you okay with me using the following slide? Is there anything you'd like me to change?

rxjava akka-streams

If you can provide me a more realistic example I'd happily change. For example an Akka Actor that acts as a source to RxJava for consumption.

benjchristensen commented 10 years ago

@smaldini I can't find a logo for Reactor, is there something I should use?

benjchristensen commented 10 years ago

@alkemist Is there an example you'd like me to show in my presentation at QCon? Are you close to publishing to Maven Central a ratpack-rx version that supports RxJava 1.0? I tried with 0.9.9 but that isn't working (only 17 more days until 1.0 Final and no more breaking changes!).

ldaley commented 10 years ago

@benjchristensen 0.9.10 was released today. It's in Jcenter but a successful sync to Central is proving elusive. Regarding what to demo, I just pushed examples for SSE and WebSocket streaming using publishers. Chunks, SSE and WebSocket are really the only useful ways we leverage RS streams ATM.

Up to you what you want to show.

rkuhn commented 10 years ago

@benjchristensen The Akka part of the sample is fine, you can make it arbitrarily complex with all the operators out there.

The problem you are seeing very likely is that Akka streams buffer less elements, exposing the deadlock in your sample code: following a groupBy with a flatMap can only work under very specific circumstances, in general it is not a safe combination under bounded memory processing (since the even numbers will need to be buffered for as long as it takes to finish all the odd ones).

Concerning the exception itself: looking at the lines referenced in the stack trace it seems that data are just pushed too eagerly within the part leading up to observeOn, either by not respecting the signaled demand or by observeOn signaling more than it can take in. Do the RxJava or Reactor versions also work if you insert a Thread.sleep(1) within the map operation?

benjchristensen commented 10 years ago

@alkemist Thanks, I'll try that.

benjchristensen commented 10 years ago

@rkuhn

very likely is that Akka streams buffer less elements

How much it buffers shouldn't matter, as request(n) will determine how much it requests. One impl should be able to request(1) and another request(Long.MAX_VALUE) and it work.

exposing the deadlock in your sample code: following a groupBy with a flatMap can only work under very specific circumstances

What do you mean by this? The groupBy and flatMap uses cases work and correctly slow down (i.e. apply backpressure) to whatever the slowest group is. It would only "dead lock" as you call it if you completely blocked one group, which is exactly what should happen in a backpressure case. The user is free to decouple fast and slow streams if they wish by using a multitude of different operators that will make it behave differently (such as sample, throttle, onBackpressureBuffer, onBackpressureDrop, debounce, buffer, window, etc).

it seems that data are just pushed too eagerly within the part leading up to observeOn

That's correct, hence the problem. It works with Rx+Reactor and with Rx by itself, which implies that the request(n) is not correctly being composed through when it is Rx+Akka. It may be the RxJavaReactiveStream implementation, but Rx+Reactor are working with the exact same use case.

I have not yet spent the time to debug where too much is being requested or the request is being lost.

I'll paste examples below using Thread.sleep to demonstrate that it works with RxJava by itself or with RxJava+Reactor.

benjchristensen commented 10 years ago

Example of RxJava + Reactor where the even group takes 10ms per item and the odd group takes 1ms per item

package reactive_streams_interop;

import java.util.concurrent.atomic.AtomicInteger;

import org.reactivestreams.Publisher;

import reactor.rx.Stream;
import reactor.rx.Streams;
import rx.Observable;
import rx.RxReactiveStreams;
import rx.observables.GroupedObservable;
import rx.schedulers.Schedulers;

public class RxReactor {

    public static void main(String... args) {

        final AtomicInteger numEmitted = new AtomicInteger();

        // RxJava Observable
        Observable<GroupedObservable<Boolean, Integer>> oddAndEvenGroups = Observable.range(1, 1000000)
                .doOnNext(i -> numEmitted.incrementAndGet())
                .groupBy(i -> i % 2 == 0)
                .take(2);

        Observable<String> strings = oddAndEvenGroups.<String> flatMap(group -> {
            // schedule odd and even on different event loops
                Observable<Integer> asyncGroup = group.observeOn(Schedulers.computation()).map(i -> {
                    if (group.getKey()) {
                        // make 'even' group be slow
                        try {
                            Thread.sleep(10);
                        } catch (Exception e) {
                        }
                    } else {
                        // make 'odd' group be faster but still have some "computational cost"
                        try {
                            Thread.sleep(1);
                        } catch (Exception e) {
                        }
                    }
                    return i;
                });

                // convert to Reactive Streams Publisher
                Publisher<Integer> groupPublisher = RxReactiveStreams.toPublisher(asyncGroup);

                // Convert to Reactor Stream
                final Stream<String> linesStream = Streams.create(groupPublisher).map(i -> i + " " + group.getKey()).take(2000);

                // convert back from Reactor Stream to Rx Observable
                return RxReactiveStreams.toObservable(linesStream);
            });

        strings.toBlocking().forEach(System.out::println);
        System.out.println("Number emitted from source (should be < 6000): " + numEmitted.get());
    }
}

You end up with output such as this:

1 false
3 false
5 false
7 false
9 false
11 false
13 false
15 false
17 false
2 true
19 false
21 false
23 false
...
3990 true
3992 true
3994 true
3996 true
3998 true
4000 true
Number emitted from source (should be < 6000): 4097
benjchristensen commented 10 years ago

RxJava by itself doing groupBy + flatMap:

package reactive_streams_interop;

import java.util.concurrent.atomic.AtomicInteger;

import rx.Observable;
import rx.observables.GroupedObservable;
import rx.schedulers.Schedulers;

public class RxJavaGroupByFlatMap {

    public static void main(String... args) {

        final AtomicInteger numEmitted = new AtomicInteger();

        // RxJava Observable
        Observable<GroupedObservable<Boolean, Integer>> oddAndEvenGroups = Observable.range(1, 1000000)
                .doOnNext(i -> numEmitted.incrementAndGet())
                .groupBy(i -> i % 2 == 0)
                .take(2);

        Observable<String> strings = oddAndEvenGroups.<String> flatMap(group -> {
            // schedule odd and even on different event loops
                return group.observeOn(Schedulers.computation()).map(i -> {
                    if (group.getKey()) {
                        // make 'even' group be slow
                        try {
                            Thread.sleep(10);
                        } catch (Exception e) {
                        }
                    } else {
                        // make 'odd' group be faster but still have some "computational cost"
                        try {
                            Thread.sleep(1);
                        } catch (Exception e) {
                        }
                    }
                    return i;
                }).map(i -> i + " " + group.getKey()).take(2000);

            });

        strings.toBlocking().forEach(System.out::println);
        System.out.println("Number emitted from source (should be < 6000): " + numEmitted.get());
    }
}
benjchristensen commented 10 years ago

Here is one that multicasts a stream (use publish) creates 2 async/parallel streams off of it (odd and even using filtering) then zips them together with one of the streams slow.

package reactive_streams_interop;

import java.util.concurrent.atomic.AtomicInteger;

import rx.Observable;
import rx.schedulers.Schedulers;

public class RxJavaPublishZip {

    public static void main(String... args) {
        final AtomicInteger numEmitted = new AtomicInteger();

        Observable<Object> strings = Observable.range(1, 1000000).doOnNext(i -> numEmitted.incrementAndGet())
                .publish(oi -> {
                    // schedule it so we are async and need backpressure
                        Observable<String> odd = oi.observeOn(Schedulers.computation())
                                .filter(i -> i % 2 != 0).map(i -> i + "-odd").map(s -> {
                                    // make odd slow
                                        try {
                                            Thread.sleep(1);
                                        } catch (Exception e1) {
                                        }
                                        return s;
                                    });
                        Observable<String> even = oi.observeOn(Schedulers.computation())
                                .filter(i -> i % 2 == 0).map(i -> i + "-even");
                        return Observable.zip(odd, even, (o, e) -> o + " " + e + "   Thread: " + Thread.currentThread());
                    }).take(2000);

        strings.toBlocking().forEach(System.out::println);
        System.out.println("Number emitted from source (should be ~4000): " + numEmitted.get());
    }
}

Output would be like this:

1-odd 2-even   Thread: Thread[RxComputationThreadPool-3,5,main]
3-odd 4-even   Thread: Thread[RxComputationThreadPool-3,5,main]
5-odd 6-even   Thread: Thread[RxComputationThreadPool-3,5,main]
7-odd 8-even   Thread: Thread[RxComputationThreadPool-3,5,main]
9-odd 10-even   Thread: Thread[RxComputationThreadPool-3,5,main]
11-odd 12-even   Thread: Thread[RxComputationThreadPool-4,5,main]
13-odd 14-even   Thread: Thread[RxComputationThreadPool-3,5,main]
... 
3995-odd 3996-even   Thread: Thread[RxComputationThreadPool-3,5,main]
3997-odd 3998-even   Thread: Thread[RxComputationThreadPool-3,5,main]
3999-odd 4000-even   Thread: Thread[RxComputationThreadPool-3,5,main]
Number emitted from source (should be ~4000): 4864

You can see the backpressure happening because the num emitted is < 5000 in this case rather than spinning and emitting all 1000000. It is not exactly 4000 because we are async and allow buffers in observeOn to fill.

If I put take on the individual streams that it would be exact as it would propagate through, but via zip it can't do that as it splits streams, so each stream maintains its own backpressure and zip also maintains its own.

ldaley commented 10 years ago

@benjchristensen do you have in mind anything more than what we have now in terms of the general pattern? Where the proposed pattern is to have more projects in examples/ integrating different libraries/frameworks.

I'd like to encourage the respective library/framework owners to contribute examples. I don't intend to do it.

ldaley commented 10 years ago

Looking at the readme again, we probably do want at least one example of real interop there.

benjchristensen commented 10 years ago

@alkemist The examples above are sufficient enough, but we should get them into /examples or something like that as you suggest, and probably have the simple ones presented in the README or wiki similar to how my slides give a quick intro: https://speakerdeck.com/benjchristensen/reactive-programming-with-rx-at-qconsf-2014?slide=158

benjchristensen commented 10 years ago

@rkuhn Any thoughts on the examples above? I'm interested in your perspective since you stated that groupBy and flatMap are not safe.

rkuhn commented 10 years ago

Sorry for the delay, I’m traveling too much this month; I’ll try to elaborate later this week, otherwise we can talk it through next week at React.