rsocket / rsocket-kotlin

RSocket Kotlin multi-platform implementation
http://rsocket.io
Apache License 2.0
552 stars 37 forks source link

Resume support #87

Open whyoleg opened 4 years ago

whyoleg commented 4 years ago

Postponed until rsocket protocol resume changes

olme04 commented 3 years ago

POC Public API to support resume 2.0:

public interface ResumeResolver : Closeable {
    public suspend fun shouldResumeRequestResponse(payload: Payload): Boolean = false
    public suspend fun shouldResumeRequestStream(payload: Payload): Boolean = false
    public suspend fun shouldResumeRequestChannel(payload: Payload): Boolean = false
    override fun close(): Unit = Unit
}

public interface ResumeFrameStorage : Closeable {
    public fun state(): ResumeState                                // on keep alive send
    public fun onFrameSend(streamId: Int, frame: ByteReadPacket)   // on send to connection
    public fun onFrameReceive(streamId: Int)                       // on receive from connection
    public fun resumeFrames(streamId: Int): List<ByteReadPacket>   // on reconnect
    public fun releaseFrames(streamId: Int, impliedPosition: Long) // on keep alive receive
    public fun remove(streamId: Int)                               // on full remove
}

public class ResumeStrategy(
    public val resumeResolver: ResumeResolver,
    public val resumeFrameStorage: ResumeFrameStorage
) : Closeable

Server side API:

RSocketServer {
  resume {
    strategy { config: ConnectionConfig ->
      //example of resume strategy configuration, that shows, how we can decide on resumability of requests based on
      // both request payload (which contains f.e. route), specific storage capacity or any other parameter
      val storage = InMemoryResumeFrameStorage("SERVER")
      //some check based on setup payload
      val resumeStream = config.setupPayload.data.readText().contains("RESUME:STREAM") 
      ResumeStrategy(
        resumeFrameStorage = storage,
        resumeResolver = object: ResumeResolver {
          override suspend fun shouldResumeRequestStream(payload: Payload): Boolean {
            val route: String? = payload.metadata?.read(RoutintMetadata)?.tags?.firstOrNull()
            return resumeStream && route == "resumable_route" && storage.hasEnoughSpaceForResumingStream()
          }
        }
      )
    }
  }
}

Connector side API change:

RSocketConnector {
  reconnectable(10) //reconnect config for resume
  resume {
    token { 
      buildPacket { 
        writeText(generateStringToken()) //some token
      } 
    }
    //resumeResolver omitted, so client side Responder will not resume any stream by default
    strategy { config: ConnectionConfig ->
      ResumeStrategy(
        resumeFrameStorage = InMemoryResumeFrameStorage("CLIENT")
      )
    }
  }
}