rsocket / rsocket-kotlin

RSocket Kotlin multi-platform implementation
http://rsocket.io
Apache License 2.0
552 stars 37 forks source link

Server: Request-Stream requests counter can overflow when client is misbehaving #213

Closed yuriykulikov closed 2 years ago

yuriykulikov commented 2 years ago

A misbehaving client can cause the server to enter an illegal state by sending RequestN frames. Illegal state is caused by an Int overflow. In this state, the Request-Stream flow emissions are suspended (no Payload frames are sent). Server appears to be hanging.

Expected Behavior

There are two possibilities, not sure what is actually expected:

  1. Continue operation
  2. Send an Error frame to the misbehaving client

Actual Behavior

Emitting coroutine is suspended indefinitely.

Steps to Reproduce

Send a RequestN after RequestStream:

RequestStream frame -> Stream Id: 1 Length: 45
Flags: 0b100000000 (M1F0C0N0)
Initial request: 2147483647

LimitingFlowCollector.<init>

RequestN frame -> Stream Id: 1 Length: 10
Flags: 0b000000000 ()
RequestN: 2147483647

At this point LimitingFlowCollector.requests: AtomicInteger overflows.

As a consequence, useRequest() will suspend indefinitely:

    private suspend fun useRequest() {
        // never true
        if (requests.getAndDecrement() > 0) {
            currentCoroutineContext().ensureActive()
        } else {
            suspendCancellableCoroutine<Unit> {
                awaiter.value = it
                // never true
                if (requests.value >= 0 && it.isActive) it.resume(Unit)
            }
        }
    }

    fun updateRequests(n: Int) {
        // always true
        if (n <= 0) return
        requests += n
        awaiter.getAndSet(null)?.takeIf(CancellableContinuation<Unit>::isActive)?.resume(Unit)
    }

Possible Solution

I will open a PR after it is clear which Expected Behavior is the right one. I am in favor of using an AtomicLong instead of an AtomicInt for the counter.

Your Environment