rsocket / rsocket-java

Java implementation of RSocket
http://rsocket.io
Apache License 2.0
2.35k stars 354 forks source link

Backpressure not working #399

Closed yschimke closed 5 years ago

yschimke commented 6 years ago

tracking via - https://github.com/reactor/reactor-core/issues/879

Easy to reproduce with

$ rsocket-cli --server -r 2 -i 1 -i 2 -i 3 tcp://localhost:9898
$ rsocket-cli --debug -r 2 -i 1 -i 2 -i 3 --channel tcp://localhost:9898
$ ./rsocket-cli --debug -r 2 -i 1 -i 2 -i 3 --channel tcp://localhost:9898
10:01:10.730    onSubscribe(FluxMap.MapSubscriber)
10:01:10.734    request(256)
10:01:10.762    onSubscribe(FluxPeek.PeekSubscriber)
10:01:10.764    request(32)
10:01:10.775    onNext(Frame => Stream ID: 1 Type: REQUEST_CHANNEL Initial Request N: 2147483647 Payload: data: "1" )
10:01:10.804    onNext(Frame => Stream ID: 1 Type: REQUEST_N RequestN: 2147483647 Payload: )
10:01:10.808    onNext(Frame => Stream ID: 1 Type: NEXT Payload: data: "2" )
10:01:10.808    onNext(Frame => Stream ID: 1 Type: NEXT Payload: data: "3" )
yschimke commented 6 years ago

n.b. flux.limitRate(5) makes a passable version of back pressure work. But it must be applied directly on the result of requestStream or requestChannel. Any operators in between e.g. zipWith can and usually introduce their own arbitrary prefetches, including Integer.MAX_VALUE

robertroeser commented 6 years ago

@yschimke Should we build this into the Java version of RSocket? We could make it by default limit the request n, and then have a setting in the setup factory that lets you turn that feature off. It seems really leaky that the developer needs to know about this, and prone for mistakes.

yschimke commented 6 years ago

I like this idea.

I assume you mean something like an implicit stream.rateLimit(defaultRateLimit) that gets applied when the client seems to be requesting (Int.Max) and following something closer to a cancellable observer.

Perhaps set it reasonably high (1024) that it stops the pathologically bad behaviour (client can't cancel due to flood of infinite responses), but still encourages users to set a more appropriate default.

val clientRSocketFactory = RSocketFactory.connect().defaultRateLimit(16)

This seems to handle the various cases well

1) as observed currently - request(MAX) to be followed by cancel. We would insert this default when we see MAX as a form of yield. 2) subscriber requests a specific number. Whatever this is, we honour it and don't implicitly apply rateLimit() 3) client configures RSocketFactory.connect().defaultRateLimit(MAX) to really request an infinite non-ratelimited stream

Is this what you had in mind?

In the second case for some large enough value, should the implicit rate limit still apply?

yschimke commented 6 years ago

@robertroeser are you planning to work on this soonish? or should I submit a PR for review?

robertroeser commented 6 years ago

@yschimke Yep that is what I was thinking. It'll probably be a week or before I could get to it. If you got something go ahead and submit a PR and I'll take a look.

yschimke commented 6 years ago

n.b. I can't directly use limitRate because it drains fully before renewing, so if the server uses credit exhaustion as a signal, then it will pause and resume the server processing a lot. Not great over the wire in the cases I want to use it for.

There is a limitRequest coming in 3.1.1, and I hope it gets extended to support a low watermark.

yschimke commented 6 years ago

limit rate only drains fully for 2 & 3. per: https://github.com/reactor/reactor-core/pull/894

yschimke commented 6 years ago

I'm going to revisit this on top of reactor-core 3.1.1 https://github.com/reactor/reactor-core/releases

yschimke commented 6 years ago

@robertroeser we have the necessary support in reactor-core with new operators, but I've got limited time at the moment. Just had 4 weeks of travel out of 5.

So if someone else wants to grab this, don't block on me.