apache / pekko

Build highly concurrent, distributed, and resilient message-driven applications using Java/Scala
https://pekko.apache.org/
Apache License 2.0
1.17k stars 140 forks source link

Feature Request: Add Flow#transform method? #1453

Open He-Pin opened 2 weeks ago

He-Pin commented 2 weeks ago

Motivation:

Using an inline transform method can get more fluent code as Flux. When using reactor-core 's Flux, we have something like:

        sink.asFlux()
            .onBackpressureBuffer()
            .transform(new MsgTransformer())
            .subscribe(msgs -> {
... }

And the transform method is just a syntax sugar:

    public final <V> Flux<V> transform(Function<? super Flux<T>, ? extends Publisher<V>> transformer) {
        if (Hooks.DETECT_CONTEXT_LOSS) {
            transformer = new ContextTrackingFunctionWrapper(transformer);
        }
        return onAssembly(from(transformer.apply(this)));
    }

WDYT?

Result: More fluent code.