rsocket / rsocket-kotlin

RSocket Kotlin multi-platform implementation
http://rsocket.io
Apache License 2.0
553 stars 37 forks source link

Streams with flexible requestN semantic #118

Closed whyoleg closed 3 years ago

whyoleg commented 4 years ago
whyoleg commented 4 years ago

@yschimke What do you think about it? I've found, that it's really not so easy to integrate better backpressure control using just Flow and current RSocket interface. So I created new ReactiveFlow inherited from Flow (naming is TBD). And because only incoming Flow should be controlled by requestN (so on requester side it's requestStream and requestChannel output and on responder side requestChannel input), and for other cases it's not needed, and will just add complexite, I've decided to split RSocket interface for requester and responder. So, do you think, that this approach is OK, and what do you think about RequestStrategy API?

and yes, it's still WIP, need more tests and fixes may be, but overall works

yschimke commented 4 years ago

@whyoleg I think I'm the wrong reviewer for this. You should try to get some of the Pivotal folk with more experience across rsocket-java and spring reactor. There have been a lot of discussions in the past on reactors infinite and cancel defaults and limits, and I have simple requirements to be honest now.

In FB days it was more relevant to me since there was a polling mechanism in the background and a real cost to every additional emmitted result, but infinite and cancel is mostly fine for what I'm doing now e.g. rsocket-demo with a stream of tweets (line separated http response) or with my own backends being purely event based anyway.

yschimke commented 4 years ago

Or maybe @elizarov and work out what the extension methods to ReactiveFlow should be. I think ideally none new inside rsocket-kotlin. We would just react to the usage and contract of ~ReactiveFlow~ (edit: I thought this was part of kotlin, later answer corrects this). And use concepts the end users are already familiar with externally.

IIRC That was essentially the decision with rsocket-java and spring reactor. Not to build our own terminology, but to keep using the operators from the underlying framework.

whyoleg commented 4 years ago

The idea behind ReactiveFlow is to just store some RequestStrategy which will allow for better control of requestN frames and in current implementation, it has only one realization PrefetchStrategy(requestSize, requestOn) - which will request requestSize elements, when requestOn elements are left for collecting in Flow. So, the simplest one. Im not sure, that something else should be added for now. From user perspective, it's even may be not needed, and default strategy (requestSize=64, requestOn=16) will be enough.

We need to add such logic, with one new type (ReactiveFlow), as Flow only works with collecting one by one, and even take(5) will work as collect by one until 5 elements collected.

yschimke commented 4 years ago

I'm not a huge fan of defining our own ReactiveFlow

elizarov commented 4 years ago

If all you need is to store a request strategy, then I can suggest looking into using a context for this kind of storage. This way you can have a sensible default that will add zero overhead (and zero new concepts) and rely on the existing context-passing support in both Reactor and Kotlin Flow to change this request strategy if needed.

whyoleg commented 4 years ago

@elizarov cool idea, I will try to do that! thx!

OlegDokuka commented 4 years ago

as discussed, lets split these changes into two PRs: 1) For RequestN via flowOn 2) RSocket interface API changes