joffrey-bion / krossbow

A Kotlin multiplatform coroutine-based STOMP client over websockets, with built-in conversions.
MIT License
196 stars 14 forks source link

Frame must be terminated with a null octet #482

Closed fred-bowker closed 3 months ago

fred-bowker commented 5 months ago

What happened?

I send 2 messages to the STOMP server and I get the Frame must be terminated with a null octet error.

I can recreate it by sending 2 messages,this could be a red herring, as it is not the full json payload, but this is the part that changes each time. (include space at the end) abcdefghij abcdefghijk

Reproduction and additional details

Error Frame must be terminated with a null octet

Code Used


    // library versions: krossbow = "5.12.0" okhttp = "4.12.0"
    // Client Side Code
    private val errorHandler = CoroutineExceptionHandler { _, e ->
        log.error("WEBSOCKET: message: {}, stacktrace: {}", e.message, e.printStackTrace())
    }
    private var stompSession: StompSession? = null
    override val messagesFlow = MutableStateFlow<List<MessageModel>>(emptyList())
    private val sendMessagePath = StompSendHeaders(SEND_MESSAGE_PATH, CONTENT_TYPE)
    private val subscribeMessagePath = StompSubscribeHeaders(SUBSCRIBE_MESSAGE_PATH)

    override suspend fun connect(connectId: Int, userId: Int) {
        log.info("WEBSOCKET: connect")

        stompSession = StompClient(OkHttpWebSocketClient())
            .connect(url)

        externalScope.launch(errorHandler) {
            stompSession?.use { x ->
                val messages: Flow<StompFrame.Message> = x.subscribe(subscribeMessagePath)
                messages.collect { message ->
                    log.info(message.bodyAsText)
                }
            }
        }.join()
    }

    override suspend fun disConnect() {
        externalScope.cancel()
        stompSession?.disconnect()
        log.info("WEBSOCKET: connect")
    }

    override suspend fun sendMessage(message: MessageModel) {
        val jsonString = networkJson.encodeToString(message)
        if (stompSession != null) {
            stompSession!!.send(sendMessagePath, FrameBody.Text(jsonString))
            log.info("WEBSOCKET: sendMessage, message: {}", message.message)
        }

        messagesFlow.value = listOf(message) + messagesFlow.value
    }
    // Server Code, this is simple just using SPRING STOMP IMPLEMENTATION
    // Spring version spring = "3.0.6"

Error Message

This error is logged on the spring server side

org.springframework.messaging.simp.stomp.StompConversionException: Frame must be terminated with a null octet at org.springframework.messaging.simp.stomp.StompDecoder.readPayload(StompDecoder.java:322) at org.springframework.messaging.simp.stomp.StompDecoder.decodeMessage(StompDecoder.java:147) at org.springframework.messaging.simp.stomp.StompDecoder.decode(StompDecoder.java:114) at org.springframework.messaging.simp.stomp.BufferingStompDecoder.decode(BufferingStompDecoder.java:114) at org.springframework.web.socket.messaging.StompSubProtocolHandler.handleMessageFromClient(StompSubProtocolHandler.java:252) at org.springframework.web.socket.messaging.SubProtocolWebSocketHandler.handleMessage(SubProtocolWebSocketHandler.java:336) at org.springframework.web.socket.handler.WebSocketHandlerDecorator.handleMessage(WebSocketHandlerDecorator.java:75) at org.springframework.web.socket.handler.LoggingWebSocketHandlerDecorator.handleMessage(LoggingWebSocketHandlerDecorator.java:56) at org.springframework.web.socket.handler.ExceptionWebSocketHandlerDecorator.handleMessage(ExceptionWebSocketHandlerDecorator.java:58) at org.springframework.web.socket.adapter.standard.StandardWebSocketHandlerAdapter.handleTextMessage(StandardWebSocketHandlerAdapter.java:113) at org.springframework.web.socket.adapter.standard.StandardWebSocketHandlerAdapter$3.onMessage(StandardWebSocketHandlerAdapter.java:84) at org.springframework.web.socket.adapter.standard.StandardWebSocketHandlerAdapter$3.onMessage(StandardWebSocketHandlerAdapter.java:81) at org.apache.tomcat.websocket.WsFrameBase.sendMessageText(WsFrameBase.java:390) at org.apache.tomcat.websocket.server.WsFrameServer.sendMessageText(WsFrameServer.java:129) at org.apache.tomcat.websocket.WsFrameBase.processDataText(WsFrameBase.java:484) at org.apache.tomcat.websocket.WsFrameBase.processData(WsFrameBase.java:284) at org.apache.tomcat.websocket.WsFrameBase.processInputBuffer(WsFrameBase.java:130) at org.apache.tomcat.websocket.server.WsFrameServer.onDataAvailable(WsFrameServer.java:84) at org.apache.tomcat.websocket.server.WsFrameServer.doOnDataAvailable(WsFrameServer.java:183) at org.apache.tomcat.websocket.server.WsFrameServer.notifyDataAvailable(WsFrameServer.java:163) at org.apache.tomcat.websocket.server.WsHttpUpgradeHandler.upgradeDispatch(WsHttpUpgradeHandler.java:152) at org.apache.coyote.http11.upgrade.UpgradeProcessorInternal.dispatch(UpgradeProcessorInternal.java:60) at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:57) at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:894) at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1741) at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:52) at org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1191) at org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659) at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) at java.base/java.lang.Thread.run(Thread.java:1589)

The Client code is bringing back the correct error when I log it

message: Frame must be terminated with a null octet, stacktrace: kotlin.Unit

Recreate

I can recreate it by sending 2 messages,this could be a red herring, as it is not the full json payload, but this is the part that changes each time. (include space at the end) abcdefghij abcdefghijk

Krossbow version

5.12.0

Krossbow modules

krossbow-stomp-core, krossbow-websocket-okhttp

Kotlin version

1.9.21

Kotlin target platforms

Android

joffrey-bion commented 5 months ago

Hi! Thanks for the report.

I haven't verified the hypothesis, but I'm 99% sure I found what the cause is: you're reusing the same StompSendHeaders instance for all messages.

The content-length header is automatically set the first time based on the size of the body of the first message. For the second message, the automatic content-length calculation is disabled because an explicit content-length header is already set. Because the second message is not of the same size as the first, the content-length is incorrect, and the server doesn't find the expected null character after reading content-length bytes, hence the error.

This mutable headers design was done for performance, but the mutability is kinda hidden behind the data class aspect. I believe your current approach shouldn't be considered a misuse of the API, so I'll classify this as a bug on Krossbow side.

In the meantime, you can work around the issue by either using convenience extensions for send() (instead of manually creating headers), or by instantiating a new headers instance each time.

fred-bowker commented 5 months ago

Hi, thanks for looking into this so quickly, I think you are correct moving the StompSendHeaders into the sendMessage function has fixed the issue.

I think it would help to include a full example in the docs, one that has the send and reeive in seperate methods as this is a more realistic scenario. The below code snippet would cover most things, error handling and shutting down the coroutinescope ahead of disconnect. You could still leave the Basic usage section but could include a Detailed usage section also.

@Singleton
class MessageStompDataSource
@Inject
constructor(
    @StompScope private val externalScope: CoroutineScope,
) {

    private val url = "wss://www.acme.com:8081//websocket"

    private val errorHandler = CoroutineExceptionHandler { _, e ->
        println(e.printStackTrace())
    }

    private var stompSession: StompSession? = null
    val messagesFlow = MutableStateFlow<List<MessageModel>>(emptyList())

    suspend fun connect() {
        stompSession = StompClient(OkHttpWebSocketClient()).connect(url)

        val subscribeMessagePath = StompSubscribeHeaders("/topic/messages")

        externalScope.launch(errorHandler) {
            stompSession?.use { x ->
                val messages: Flow<StompFrame.Message> = x.subscribe(subscribeMessagePath)
                messages.collect { message ->
                    val messageModel: MessageModel = Json.decodeFromString(message.bodyAsText)
                    messagesFlow.value = listOf(messageModel) + messagesFlow.value
                }
            }
        }.join()
    }

    suspend fun disConnect() {
        externalScope.cancel()
        stompSession?.disconnect()
    }

    suspend fun sendMessage(message: MessageModel) {
        val sendMessagePath = StompSendHeaders("/app/message", "application/json;charset=utf-8")

        val jsonString = Json.encodeToString(message)
        if (stompSession != null) {
            stompSession!!.send(sendMessagePath, FrameBody.Text(jsonString))
        }

        messagesFlow.value = listOf(message) + messagesFlow.value
    }
}

For the actual issue, I'd probably move getting the content length out of the StompSendHeaders, the StompSendHeaders feels like an immutable collection of user defined headers. Though if the docs were updated with a full example and it was made explicit that StompSendHeaders should not be reused then I think that would be fine.