Open akarnokd opened 8 years ago
The operators merge
and flatMap
are closely related and you can implement one with the other:
Given a nested Publisher
, merging them is equivalent of applying flatMap
with an identity function:
Publisher<Publisher<T>> sources = ...
merge(sources) == sources.flatMap(source -> source);
Given a Publisher
and a mapping function T -> Publisher<R>
, you can map
the source with the function and merge
the resulting nested sequence:
Publisher<T> source = ...
Function<T, Publisher<R>> mapper = ...
source.flatMap(mapper) == merge(source.map(mapper));
This identity is also true for other mapping operators:
concat
and concatMap
concatEager
and concatMapEager
switchOnNext
and switchMap
This is a general property of the operation and works for non-reactive but functional API's as well. This can come in handy if some library doesn't offer both methods but only the one.
Modern reactive libraries offer you fluent conversion operators: extend
, to
, as
or compose
. You can then apply your own transformative function which runs in assembly time and let's you customize a sequence with preset operators:
Function<Publisher<T>, Publisher<T>> addSchedulers = o ->
o.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
range(1, 10).compose(addSchedulers).subscribe(System.out::println);
The function addSchedulers
is executed when the range.compose
is executed in assembly time.
However, sometimes you want stateful operators with a sequence, for example, a counting map
that updates a "global" state. Clearly, such operation doesn't properly work with multiple concurrent Subscriber
s to a sequence.
Luckily, you can use the composition operators above in conjunction with defer
to shift the execution of the function to subscription time and allowing per-Subscriber
state in the applied operators:
range(1, 10).compose(o -> defer(() -> {
int[] counter = new int[] { 0 };
return o.map(v -> ++counter[0]);
})).subscribe(System.out::println);
The operator flatMap
very versatile and comes up frequently when one wants to join different mapped sequences.
The mapper function let's you return any Publisher
instance, including empty
or single element (just
).
We can utilize this property and implement map
with flatMap
:
Function<T, R> mapper = ...
map(mapper) == flatMap(v -> just(mapper.apply(v)));
Given the current value from source, we apply the mapper
function and return a constant just
Publisher
.
We can apply the same methodology with filter
: return empty
if the predicate doesn't hold and return just
if it does hold:
Predicate<T> predicate = ...
filter(predicate) == flatMap(v -> predicate.test(v) ? just(v) : empty());
Note, however, that using flatMap
as such incurs more overhead than a direct map
or filter
. Modern reactive libraries still optimize for these cases because flatMap
let's you short-circuit a sequence by emitting an error Publisher
:
source.flatMap(v -> {
try {
performIO(v);
return just(v);
} catch (IOException ex) {
return error(ex);
}
})
Sometimes, you want to run through a sequence of values and then resume with another sequence of a completely different type once the first completed. In this case, you'd usually also ignore the first sequence's values.
Some libraries already offer operators for it: after
, then
, andThen
, etc. but if not available, you can use concat
for it:
Publisher<Integer> a = ...
Publisher<String> b = ...
Publisher<String> c = (Publisher<String>)concat(a.ignoreElements(), b);
Unfortunately, you'll need some explicit casts and @SuppressWarnings
for this; Java's type system doesn't seem to be powerful enough to express it otherwise.
One of the confusions with reactive operators are centered around subscribeOn
and observeoOn
(publishOn
). For short, subscribeOn
acts during subscription time over subscribe()
calls and its effects travel upstream whereas observeOn
acts during runtime over onNext
|onError
|onComplete
calls and its effects travel downstream.
However, many sequential data sources, such as just
, empty
, error
, fromArray
, fromIterable
and range
, applying either subscribeOn
or observeOn
will yield the same output: events signalled on the specified Scheduler
's thread.
just(1).subscribeOn(scheduler) == just(1).observeOn(scheduler)
range(1, 5).subscribeOn(scheduler) == range(1, 5).publishOn(scheduler)
fromArray("a", "b", "c").subscribeOn(scheduler) == fromArray("a", "b", "c").observeOn(scheduler)
// etc.
The reason for this equivalence is that these constant sources don't really have subscription side-effects and requests to them will trigger emissions on the Scheduler
's thread anyway.
Most modern libraries already exploit this in respect to just
; both patterns are transformed into a custom scheduled Publisher
instance.
In addition, using observeOn
may give better performance with the multi-valued sources due to micro-operator-fusion in its front. However, we don't fully see the ramifications of an automatic subscribeOn
-> observeOn
swap when a sequence is assembled due to the so-called strong pipelining effect when both operators are applied to the sequence:
range(1, 10).subscribeOn(schedulerA).observeOn(schedulerB);
The operator flatMap
let's you run multiple sources at once while concatMap
runs one sequence at a time.
However, flatMap
usually offers a parameter that let's limit its concurrency level. The smallest allowed concurrency level is 1
, which will functionally act the same as concatMap
:
flatMap(mapper, 1) == concatMap(mapper)
This identity helped many times to rule out possible bugs in either flatMap
's or concatMap
's implementation as a cause of some weird behavior or exception.
Note that in many implementations of concatMap
, the operator prefetches its source values (2 elements in RxJava, 32 in Reactor) whereas a concurrency-constrained flatMap
will prefetch exactly one source value and thus trigger sequence dependent side-effects at different times.
Reactive datasources are sequential in nature; they form a processing pipeline that runs in FIFO order, even if source values are available while processing earlier values.
Still, sometimes you want to go parallel, for example, execute many service calls at once or distribute some processing over the available CPUs.
There are some experiments with such reactive stream parallelization, but the effect itself can be achieved today with common operators.
flatMap
One of the ways is to use flatMap
for the job:
range(1, 1_000)
.flatMap(v -> just(v).subscribeOn(computation).map(Object::hashCode))
If there are a lot of source values, scheduling them one-by-one adds quite an overhead. Instead, you can buffer some of them and act on them in a batched fashion:
range(1, 1_000_000)
.buffer(256)
.flatMap(list ->
just(list).subscribeOn(computation)
.map(v ->
v.stream().map(Object::hashCode).collect(toList())
)
)
.flatMapIterable(c -> c)
If the final order still matters, you can use concatMapEager
instead of flatMap
.
window
The operator window
creates a nested Publisher<Publisher<T>>
sequences which can be batched similarly to buffer
but the values themselves become immediately available in the inner Publisher
.
We can use this operator with some mapping and flattening to get the parallel effect:
range(1, 1_000_000)
.window(256)
.map(w ->
w.observeOn(computation)
.map(Object::hashCode)
)
.concatMapEager(w -> w);
Note the use of observeOn
here: the source inner window is no longer constant like above but emits onNext
events as they arrive from the source. Applying subscribeOn
would be useless.
groupBy
The operator groupBy
also creates nested Publisher
instances based on some key selector to determine which value goes into which group; i.e., routing or dispatching values to various inner windows.
One way is to use the current value's hashCode module some integer to select a "bucket" for it:
range(1, 1_000_000)
.groupBy(v -> v.hashCode() % 8)
.flatMap(g -> g.observeOn(computation).map(Object::hashCode))
Given a well distributed hashCode
, this should yield a fairly balanced parallel processing of values.
Alternatively, you can do round-robin dispatching by using a counter instead:
long[] counter = { 0 };
range(1, 1_000_000)
.groupBy(v -> (counter[0]++) & 7)
.concatMapEager(g -> g.observeOn(computation).map(Object::hashCode))
Sometimes you want to execute some action, such as logging, when it turns out the source Publisher
is empty.
You should be familiar with the switchIfEmpty
operator which switches to a new Publisher
if the main Publisher
is empty. We can then switch to an empty
publisher and use doOnComplete
with the desired action:
Publisher<T> source = ...
source.switchIfEmpty(empty().doOnComplete(() -> System.out.println("Empty source!")));
Sometimes you want to process subsequent elements of a Publisher
pairwise (in triplets, etc.). A possible way of doing this is by using buffer
with skip option:
range(1, 9)
.buffer(2, 1)
.filter(b -> b.size() == 2)
.map(b -> b.get(0) + b.get(1))
.subscribe(System.out::println);
Since buffer
may emit a partial buffer if the number of source items is odd, you have to filter out partial buffers (otherwise, the map will blow up with IndexOutOfBoundsException
)
An alternative way is to use the publish
overload taking a Function<Publisher<T>,Publisher<R>>
. This overload, unlike the regular publish
returns a Publisher
and is in fact a cold operator. What it does is that for the duration of a Subscriber
, it makes the source into a hot Publisher
and you can attach as many operators to it via the Function
callback; thus, the source events are shared among different "paths" without subscribing to the source multiple times.
We can use this publish
with skip
and zip
to get pairs:
range(1, 9)
.publish(o -> zip(o, o.skip(1), (a, b) -> a + b))
.subscribe(System.out::println);
There is no need for filtering as there is no list anymore and we get the pairs nicely via lambda parameters. Naturally, it works with triplets as well:
range(1, 9)
.publish(o -> zip(o, o.skip(1), o.skip(2), (a, b, c) -> a + b + c))
.subscribe(System.out::println);
You may often want to execute some Supplier
or Runnable
/Action0
asynchronously. Maybe it's something blocking or just long running and don't want to block/hold up your current thread.
Many modern libraries offer the fromCallable(Callable<T>)
source factory where you can use a regular old Callable
to execute some action. With Java 8 lambdas, it's easy to convert your callbacks into Callable
s:
Supplier<Integer> supplier = () -> 1;
fromCallable(supplier::get).subscribeOn(computation);
Runnable run = () -> System.out.println("Hello!");
fromCallable(() -> { run.run(); return null; }).subscribeOn(computation);
If for some reason fromCallable
is not available, you can use a combination of just
and map
to get the same effect:
just("whatever")
.subscribeOn(computation)
.map(ignored -> supplier.get());
Of course, you can swap the order of subscribeOn
and map
here; they are functionally equivalent. However, as we saw with Gem # 5, just().subscribeOn()
is usually optimized in modern libraries and an intermediate just().map().subscribeOn()
would most likely prevent some optimizations.
A possible use for BehaviorSubject
/BehaviorProcessor
(EmitterProcessor#replayLast
in Reactor), which remembers the last value and emits it to new Subscriber
s at the beginning of a sequence, is to use it as a single element cache - also known as a "reactive property".
Sometimes, the contents of this cache can become outdated and shouldn't be emitted to new Subscriber
s until a proper fresh value is generated. Unfortunately, most implementations don't offer a clear()
method so until a new value gets assigned to to it via onNext
the old one stays.
We will use this onNext
in fact to clear out the current value. To do this, we have to establish a protocol where a special value from the Subject
's value type indicates emptiness and instructs Subscribers
to ignore it if encountered:
Integer CLEAR = new Integer(0);
BehaviorProcessor<Integer> cache = new BehaviorProcessor<>(CLEAR);
Publisher<Integer> front = cache.filter(v -> v != CLEAR);
front.subscribe(System.out::println);
cache.onNext(10);
cache.onNext(CLEAR);
cache.onNext(20);
front.subscribe(System.out::println);
In the example, we create a new (!) Integer instance and use it for reference comparison to determine if the cache is "empty".
Sometimes, however, you can't just create such an "empty" instance of the type you are working with. In this case, you have to revert to the lowest common denominator type: Object
and downcast anything else back to your type that isn't the indicator:
Object CLEAR = new Object();
BehaviorProcessor<Object> cache = new BehaviorProcessor<>();
Publisher<String> front = cache.filter(v -> v != CLEAR).cast(String.class);
cache.onNext("abc");
cache.onNext(CLEAR);
cache.onNext("def");
front.subscribe(System.out::println);
Of course, directly exposing BehaviorSubject<Object>
is a welcoming sign for all kinds of types. You can re-establish the type safety by wrapping the code into some class and allowing only type-correct onNext calls:
public final class Cache<T> implements Observer<T>, Publisher<T> {
static final Object CLEAR = new Object();
final BehaviorProcessor<Object> cache = new BehaviorProcessor<>(CLEAR);
@Override
public void subscribe(Subscriber<? super T> subscriber) {
cache.filter(v -> v != CLEAR).subscribe(subscriber);
}
@Override
public void onNext(T t) {
cache.onNext(t);
}
@Override
public void onError(Throwable t) {
cache.onError(t);
}
@Override
public void onComplete() {
cache.onComplete();
}
public void clear() {
cache.onNext(CLEAR);
}
}
Remember, Processor
s (and Subject
) require a non-concurrent calls to their onXXX
methods so you should only call them in a serialized fashion (such as a GUI thread or single threaded event-loop).
Sometimes, an API gives you a Publisher<List<T>>
where the signaled values are List
s and you want to process the elements in the list while holding the list together to be processed as a whole downstream later on.
Let's start out as a common source of List
s:
Publisher<List<Integer>> source = range(1, 1_000_000).buffer(256);
You can just simply run a for loop and manipulate the list in-place if the list is mutable (buffer
emits such mutable list):
source.map(list -> {
for (int i = 0; i < list.size(); i++) {
list.set(i, list.get(i) + 2_000_000);
}
return list;
});
Or as a new list:
source.map(list -> {
List<Integer> newList = new ArrayList<>(list.size());
for (Integer v : list) {
newList.add(v + 2_000_000);
}
return newList;
});
You can combine the worlds of reactive and the interactive Java 8 Stream processing:
source.map(list ->
list.stream()
.map(v -> v + 2_000_000)
.collect(toList())
);
If you can't mute the list, stuck on Java 7 or before, or just don't want to look "non functional", you can use concatMap
(not flatMap
in order to keep the original list order), extract each list, process elements and then recollect the result into another list:
source.concatMap(list ->
fromIterable(list)
.map(v -> v + 2_000_000)
.toList()
);
Sometimes you have a bunch of Publisher
s doing various things and you want to wait until they all complete and their emitted values are irrelevant in your case.
You may think of zip
or combineLatest
for doing this along with ignoreElements
, however, zip
and combineLatest
won't work with most source combinations as you'd expect. These operators are terminate eagerly: if one of them completes without elements, they complete immediately and cancel the rest of the outstadning source sequences. In addition, if zip
encounters a shorter sequence, it will also terminate eagerly.
You can simply use flatMap
in conjunction with ignoreElements()
to join the sources' termination:
Publisher<String> source1 = just("Hello");
Publisher<Integer> source2 = range(1, 3);
Publisher<Long> source3 = timer(1, TimeUnit.MILLISECONDS);
just(source1, source2, source3)
.flatMap(o -> o.ignoreElements())
.doOnComplete(() -> System.out.println("Done"))
Some libraries like Reactor also use Mono#when(Mono<T>....)
to coordinate the completion and the data from many sources.
By default, reactive protocols treat onError
events as fatal and terminal events, tearing down the whole chain you delicately assembled.
Many times, you'd want to treat an error like any other value and keep the sequences running. For this, you have to hide an error from the library by first not calling onError
but create a holder class for both normal and error values and emit those. Libraries often support this via classes such as Notification<T>
or Signal<T>
, very similar to Java 8's Optional<T>
but for values, errors and emptiness:
EmitterProcessor<Signal<Integer>> bus = EmitterProcessor.create();
bus.dematerialize().consume(System.out::println, Throwable::printStackTrace);
bus.subscribe(System.out::println, Throwable::printStackTrace);
bus.onNext(Signal.next(1));
bus.onNext(Signal.next(RuntimeException()));
bus.onNext(Signal.next(2));
bus.onNext(Signal.next(3));
You can use dematerialize
to turn those notification back to regular calls to onXXX
methods on your Subscriber
yet have the original source still active.
The operator defer
let's you return a custom Publisher
instance for each of the Subscriber
s. The effect can be simulated by using just
with flatMap
:
Func0<Observable<Integer>> supplier = () -> range(System.currentTimeMillis() & 1023, 5);
defer(supplier) == just("whatever").flatMap(v -> supplier.call());
In fact you could write it with concatMap
as well:
defer(supplier) == just("whatever").concatMap(v -> supplier.call());
Even though using flatMap
and concatMap
this way looks like too much overhead, most modern libraries optimize away just
with a special defer
like operator that behaves like this:
Function<Integer, Observable<Integer>> function = v -> range(v, 2);
just(-10).flatMap(function) == defer(() -> function.call(-10));
by exctacting the constant value from just
and building a Func0
supplier that calls the original function with this constant and uses its returned Observable
.
For this reason, using nest()
with concatMap
/flatMap
is practically a no-op:
range(1, 5).nest().concatMap(o -> o.take(3)) == range(1, 5).take(3)
The source operator interval
let's you create a periodic sequence of ever increasing Long
values. Sometimes, we don't care about the values themselves and just want to act at the right time.
Then comes a requirement of emitting items from a list periodically, that is, with some fixed delay between elements. We can first map the Long
values into the elements of the list, but we should also make sure we don't run out of indexes and get IndexOutOfBoundsException
:
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
interval(1, TimeUnit.SECONDS)
.takeUntil(t -> t == list.size() - 1)
.map(t -> list.get((int)t))
.subscribe(System.out::println)
Given the interval, we take its elements until the current running value is exactly the list's size minus one. The operator takeUntil
executes the predicate after the value itself has been emitted downstream. In the map then, we simply call List.get()
with a cast-down of the timer's value and now we get an item from the list.
ConcatMap
has the interesting property of preserving FIFO sequence. You can use that advantage to shift sequence items by a specific time or any arbitrary Publisher
like interval
. Since concurrent execution of the mapped Timer
or Delay
is limited to one, shift will be equally applied to all elements:
range(1, 1_000)
.concatMap(t -> Mono.delay(1000).map( v -> t ))
.consume(System.out::println)
// prints 1, ..... 2, ...... 3 ..... N with ...... =1 second
==
interval(1000)
.map( i )
.consume(System.out::println)
You can also choose to relatively shift forward using FlatMap
which will let many concurrent shift run in parallel, moving all the sequence forward :
range(1, 1_000)
.flatMap(t -> Mono.delay(1000).map( v -> t ))
.consume(System.out::println)
// prints .... 1, 2, 3 , [..] C ..... N with ...... = 1 second and C = max concurrency
// (shifting groups of items)
In SQL, we are used to write inner joins; the ones that pit one table against another table and forms all sorts of pairs. We can do such joins with flatMap
:
Publisher<Integer> main = range(1, 5);
Publisher<String> slave = fromArray("a", "bb", "ccc");
main.flatMap(len -> slave.filter(s -> s.length() == len));
This will, for each main element stream through the slave
Publisher
and filtering out those elements of slave
which have the current length from the main
.
Some libraries offer overload that takes a two argument function which will receive these pairs and should produce some values out of them:
main.flatMap(v -> slave, (m, s) -> m == s.length());
Of course, one can return a Publisher
from the second function and apply flatMap
again to flatten out the resulting inner sequences, allowing us to "drop" irrelevant values from the final sequence:
main.flatMap(v -> slave, (m, s) -> m == s.length() ? just(s) : empty())
.flatMap(v -> v)
Sometimes, one would like to merge concurrently running sources and keep their order at the same time. The usual flatMap
doesn't keep the order and the usual concatMap
doesn't start the sources.
Many reactive libraries have an operator for this: concatMapEager
(and concatEager
) that gives the middle ground. In case your library doesn't support this operator, you can achieve a similar effect by concatenating pre-started sources via the help of an UnicastProcessor
:
range(1, 10)
.map(v -> {
Flux<T> o = getSource(v);
UnicastProcessor<T> up = UnicastProcessor.create();
o.subscribe(up);
return up;
})
.concatMap(v -> v)
.subscribe(System.out::println, Throwable::printStackTrace);
Here, values 1..10 are mapped to a source which is then started by subscribing an UnicastProcessor
to it. UnicastProcessor
caches values (unbounded) until a subscriber, such as that of inside concat
subscribes to it and dutifully replays/relays items.
In case you don't have an UnicastProcessor
available, the are alternatives to it:
1) By using ReplayProcessor
instead, but note that it will retain all values until the whole sequence gets GC'd.
.map(v -> {
Flux<T> o = getSource(v);
ReplayProcessor<T> rp = ReplayProcessor.create();
o.subscribe(rp);
return rp;
})
2) By using replay()
and connect()
.map(v -> {
Flux<T> f = getSource(v);
ConnectableFlux<T> cf = f.replay();
cf.subscribe(e -> { }, Throwable::printStackTrace);
cf.connect();
return cf;
})
That extra subscribe
is necessary because replay
may not start actually running its source unless there are Subscriber
(s) waiting for the items already.
3) By using replay()
and autoConnect()
or cache()
.map(v -> {
Flux<T> f = getSource(v);
Flux<T> cf = f.cache();
cf.subscribe(e -> { }, Throwable::printStackTrace);
return cf;
})
By default, cancellation won't cancel the prestarted sources. You have to manually wire up the end Subscriber
with all participants:
List<Cancellation> cancellations = ...
// ...
.map(v -> {
Flux<T> f = getSource(v);
UnicastProcessor<T> up = UnicastProcessor.create();
cancellations.add(f.subscribe(up));
return us;
})
// ...
.map(v -> {
Flux<T> f = getSource(v);
ConnectableFlux<T> cf = f.replay();
cf.subscribe(e -> { }, Throwable::printStackTrace);
cancellations.add(cf.connect());
return cf;
})
// ...
.map(v -> {
Flux<T> f = getSource(v);
Flux<T> cf = f.replay().autoConnect(1, cancellations::add);
cf.subscribe(e -> { }, Throwable::printStackTrace);
return cf;
})
The operator defer
let's you generate a source Observable
/Publisher
for each Subscriber
whereas using
let's you generate a resource per Subscriber
, then use it to gereate a Observable
/Publisher
.
Therefore, you can imitate defer
via a using
setup where you create and ignore a resource and just create the source:
Supplier<Publisher<Integer>> s = () -> range(1, 10);
defer(s) => using(() -> "whatever", v -> s, v -> { });
However, you could also create the source itself as the resource and use identity-mapping on it:
using(s, v -> v, v -> { })
using(s::get, v -> v, v -> { })
The first shorter case is applicable if your library's using
takes a Supplier
and the second if it takes some () -> T
generator function.
The reverse direction, namely expressing using
as defer
is a bit more involved:
Supplier<R> resource = ...
Function<R, Publisher<T>> source ==
Consumer<R> disposer;
defer(() ->
fromCallable(resource)
.flatMap(r -> {
try {
return source.apply(r)
.doOnTerminate(() -> disposer.accept(r))
.doOnUnsubscribe(() -> disposer.accept(r));
} catch (Throwable ex) {
disposer.accept(r);
return error(ex);
}
})
);
In RxJava, a terminal event is followed by an unsubscribe
call from most end-consumers (due to SafeSubscriber
). In case the resource or the disposer is not idempotent, we have to make sure the dispose happens exactly once:
defer(() ->
fromCallable(resource)
.flatMap(r -> {
AtomicBoolean once = new AtomicBoolean();
try {
return source.apply(r)
.doOnTerminate(() -> disposeOnce(once, disposer, r))
.doOnUnsubscribe(() -> disposeOnce(once, disposer, r));
} catch (Throwable ex) {
disposeOnce(once, disposer, r);
return error(ex);
}
})
);
<R> void disposeOnce(AtomicBoolean once, Consumer<R> disposer, R resource) {
if (once.compareAndSet(false, true)) {
disposer.accept(resource);
}
}
If we want to execute some code once and then hand out the generated values, such as login tokens or results of a network call, we usually can go for cache()
, replay()
, AsyncProcessor
, etc.
However, sometimes the data gets outdated and there is no way of clearing the structures above. But we can restart the whole process and make sure new subscribers get the fresh data if we cache the cache itself and use defer to get the current caching source:
final AtomicReference<Mono<Long>> cache = new AtomicReference<>(getSource());
public Mono<Long> getSource() {
return Mono.fromCallable(System::currentTimeMillis).cache();
}
public Mono<Long> get() {
return Mono.defer(() -> cache.get());
}
public void reset() {
cache.set(getSource());
}
By design, fromCallable
executes the Callable
immediately and doesn't wait for a downstream request to appear.
In case you want to compute only when requested, you can use the same jump-start trick with just().map()
similar to Gem 10).
just("irrelevant")
.map(unused -> {
try {
return callable.call();
} catch (Exception ex) {
throw Exceptions.bubble(ex); // or Exceptions.propagate(ex)
}
})
.subscribe(System.out::println, Throwable::printStackTrace);
Since subscribe()
requests immediately, the difference from fromCallable
is not obvious at first. To see the difference, we need to manually request after the subscribe()
has returned. One way for demonstrating this is via TestSubscriber
:
TestSubscriber<T> ts = new TestSubscriber<>(0L);
just("irrelevant")
.map(unused -> {
System.out.println("callable.call()");
try {
return callable.call();
} catch (Exception ex) {
throw Exceptions.bubble(ex); // or Exceptions.propagate(ex)
}
})
.subscribe(ts);
System.out.println("subscribe()");
ts.requestMore(1); // or ts.request(1); in Rsc
Now the console will first print subscribe()
followed by callable.call()
.
In this issue, we should collect tips and tricks with reactive systems and dataflows.
These are not particularly advanced topics but the markdown support on GitHub makes it easier to write them up.
Once we run out of ideas, we may tidy it up and release it together (maybe a free ebook?).
Please post only gems here and open discussion about them in separate issues. Thanks.