Closed yuriykulikov closed 2 years ago
Hey, @yuriykulikov thanks for report and PR!
Looking at protocol: https://github.com/rsocket/rsocket/blob/master/Protocol.md#reactive-streams-semantics looks like it's expected behavior to use Long
for counter.
But, looking at rsocket-java: https://github.com/rsocket/rsocket-java/blob/37fc68c68f4b61d826084330a7b0476a456b63da/rsocket-core/src/main/java/io/rsocket/frame/RequestNFrameCodec.java#L22 Int.MAX_VALUE is coercing to Long.MAX_VALUE, so by emulating via Int.MAX_VALUE unbounded stream, as stated in reactive-streams spec 3.17 rule here: https://github.com/reactive-streams/reactive-streams-jvm#3.17
So looks like the right fix for making rsocket-kotlin compatible with protocol and other implementations would be:
unbounded
Boolean property and based on it ignore any updateRequests
, and any check in useRequest
Will you do this, or I can create PR with proposed changes?
BTW, if you will do this, looks like you can also remove using atomic
at all in Limiter
- which was needed when using old Native MM, which is not the case now.
Hello @olme04,
This sounds reasonable and was also my first thought. However, I was confused after digging into https://rsocket.io/about/protocol/#flow-control which states: Please note that this explicitly does NOT follow rule number 17. (...) While Reactive Streams support a demand of up to 2^63-1, and treats 2^63-1 as a magic number signaling to not track demand, this is not the case for RSocket. RSocket prioritizes byte size and only uses 4 bytes instead of 8 so the magic number is unavailable.
.
In the end, both approaches effectively achieve the same.
Will you do this, or I can create PR with proposed changes?
I will be glad to implement the changes as you have suggested.
Let me summarize how I have understand how the Limiter should work:
@yuriykulikov yes, it should work like this. Minor addition is please make sure to not overflow on accumulating by coercing to Long.MAX_VALUE (even if it never happen in well behaved implementation).
Hey @yuriykulikov, thx for implementing it! But we just found an issue with rsocket-java implementation regarding using Int.MAX_VALUE, and looks like current implementation will be further adapted.
TL;DR: the problem is, that Int.MAX_VALUE overall is reasonable request, which doesn't mean, that request is unbounded. more info here: https://github.com/rsocket/rsocket/pull/325
Hey @olme04, I will adjust my implementation to remove the magic meaning of Int.MAX value. Protection against overflow is still relevant even if the U flag is implemented.
Overall LGTM, but please check test
@yuriykulikov thx for the contribution! All tests passed!
Glad I could contribute 😊
This avoids Int overflow when client is misbehaving and is sending multiple RequestN frames with n=Int.MAX_VALUE
Motivation:
This closes #213
Modifications:
Changed AtomicInt to AtomicLong. Overflow is still possible after that, however it will require the client sending a RequestN frame 2147483647 times. Not sure if the request value can become negative under other circumstances. If not, unsigned long can be used instead or the value can be asserted to be positive.
Result:
Server won't hang after receiving RequestN frames anymore.