spring-attic / reactive-streams-commons

A joint research effort for building highly optimized Reactive-Streams compliant operators.
Apache License 2.0
357 stars 51 forks source link

Gem 22) discussion #23

Open akarnokd opened 8 years ago

akarnokd commented 8 years ago

From @dsyer

Interesting. Do you mean Mono.fromCallable() (because when I looked at the source code it seemed to me that the Callable.call() only happens when there is a subscribe())? In general, what's a good way to assert or inspect statements like that? What did I misunderstand?

akarnokd commented 8 years ago

Gem 22 handles the case when there is a subscriber but it hasn't called request() yet. fromCallable runs immediately but defers the emission. This gem defers the execution because the front just emits only when requested, then that value triggers the execution of the callable

dsyer commented 8 years ago

Gem 22 link: https://github.com/reactor/reactive-streams-commons/issues/21#issuecomment-222080500

dsyer commented 8 years ago

Here's the code from Gem 22 (with a concrete supplier and a log instead of using sysout):

        Callable<String> callable = () -> "foo";
        Mono.just("irrelevant").log().map(unused -> {
            try {
                return callable.call();
            } catch (Exception ex) {
                throw Exceptions.bubble(ex); // or Exceptions.propagate(ex)
            }
        }).subscribe(log::info, Throwable::printStackTrace);

and here's the output:

10:06:25.504 [main] INFO reactor.core.publisher.FluxLog -  onSubscribe(reactor.core.util.ScalarSubscription@5442a311)
10:06:25.506 [main] INFO reactor.core.publisher.FluxLog -  request(unbounded)
10:06:25.506 [main] INFO reactor.core.publisher.FluxLog -  onNext(irrelevant)
10:06:25.506 [main] INFO com.example.MonoFeaturesTests - foo
10:06:25.506 [main] INFO reactor.core.publisher.FluxLog -  onComplete()

Here's the vanilla fromCallable() (as I understood Gem 22 this should be different)

        Mono.fromCallable(callable).log().subscribe(log::info, Throwable::printStackTrace);

and the output:

10:05:13.317 [main] INFO reactor.core.publisher.FluxLog -  onSubscribe(reactor.core.subscriber.DeferredScalarSubscriber@1794d431)
10:05:13.318 [main] INFO reactor.core.publisher.FluxLog -  request(unbounded)
10:05:13.318 [main] INFO reactor.core.publisher.FluxLog -  onNext(foo)
10:05:13.318 [main] INFO com.example.MonoFeaturesTests - foo
10:05:13.318 [main] INFO reactor.core.publisher.FluxLog -  onComplete()

What's the difference?

akarnokd commented 8 years ago

Updated the gem to show the effect if the request is actually delayed.

dsyer commented 8 years ago

I see, thanks.