wso2 / streaming-integrator-tooling

Apache License 2.0
45 stars 33 forks source link

[SI Tooling] Stopping a Siddhi app with a `websocket-server` source doesn't terminate the websocket connection #204

Open senthuran16 opened 2 years ago

senthuran16 commented 2 years ago

Description: $subject

Steps to reproduce:

  1. Create the following Siddhi app:
    
    @App:name("ReceiveWebSocketInXMLFormat")
    @App:description('Receive events via WebSocket transport in XML format and view the output on the console.')

@sink(type='log', prefix= 'received by test websocket source: ') @Source(type = 'websocket-server', host='localhost', port='8025', @map(type='xml')) define stream TemperatureAlertStream (roomNo int,initialTemp double,finalTemp double);


2. Start it in SI Tooling.
3. Start Postman, create a new websocket request to URL `ws://localhost:8025`, and click **Connect**. The message `Connected to ws://localhost:8025` will appear in Postman.
4. Send the following payload:

{"event": {"roomNo": 1, "initialTemp": 10.0, "finalTemp": 20.0}}

5. The following exception will be logged in SI Tooling console:

ERROR {io.siddhi.core.stream.input.source.SourceMapper} - Error while processing '{"event": {"roomNo": 1, "initialTemp": 10.0, "finalTemp": 20.0}}', for the input Mapping 'xml' for the stream 'TemperatureAlertStream'. (Encoded) io.siddhi.core.exception.MappingFailedException at io.siddhi.extension.map.xml.sourcemapper.XmlSourceMapper.mapAndProcess(XmlSourceMapper.java:456) at io.siddhi.core.stream.input.source.SourceMapper.onEvent(SourceMapper.java:201) at io.siddhi.core.stream.input.source.SourceMapper.onEvent(SourceMapper.java:145) at io.siddhi.extension.io.websocket.source.websocketserver.WebSocketServerSourceConnectorListener.onMessage(WebSocketServerSourceConnectorListener.java:64) at org.wso2.transport.http.netty.contractimpl.websocket.DefaultWebSocketConnectorFuture.notifyWebSocketListener(DefaultWebSocketConnectorFuture.java:52) at org.wso2.transport.http.netty.contractimpl.websocket.WebSocketInboundFrameHandler.notifyTextMessage(WebSocketInboundFrameHandler.java:203) at org.wso2.transport.http.netty.contractimpl.websocket.WebSocketInboundFrameHandler.channelRead(WebSocketInboundFrameHandler.java:158) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93) at io.netty.handler.codec.http.websocketx.Utf8FrameValidator.channelRead(Utf8FrameValidator.java:82) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:748)


6. Stop the Siddhi app. The previous websocket connection in Postman, will not be disconnected.
7. Change the Siddhi app's content to the following (The `xml` mapper has been changed to `json`):

@App:name("ReceiveWebSocketInXMLFormat") @App:description('Receive events via WebSocket transport in XML format and view the output on the console.')

@sink(type='log', prefix= 'received by test websocket source: ') @Source(type = 'websocket-server', host='localhost', port='8025', @map(type='json')) define stream TemperatureAlertStream (roomNo int,initialTemp double,finalTemp double);


8. Start the Siddhi app.
9. Send the same message from Postman again.
10. Now, even though the Siddhi app has been modified to accept `@map(type='json')`, the same error is being thrown:

ERROR {io.siddhi.core.stream.input.source.SourceMapper} - Error while processing '{"event": {"roomNo": 1, "initialTemp": 10.0, "finalTemp": 20.0}}', for the input Mapping 'xml' for the stream 'TemperatureAlertStream'. (Encoded) io.siddhi.core.exception.MappingFailedException at io.siddhi.extension.map.xml.sourcemapper.XmlSourceMapper.mapAndProcess(XmlSourceMapper.java:456) at io.siddhi.core.stream.input.source.SourceMapper.onEvent(SourceMapper.java:201) at io.siddhi.core.stream.input.source.SourceMapper.onEvent(SourceMapper.java:145) at io.siddhi.extension.io.websocket.source.websocketserver.WebSocketServerSourceConnectorListener.onMessage(WebSocketServerSourceConnectorListener.java:64) at org.wso2.transport.http.netty.contractimpl.websocket.DefaultWebSocketConnectorFuture.notifyWebSocketListener(DefaultWebSocketConnectorFuture.java:52) at org.wso2.transport.http.netty.contractimpl.websocket.WebSocketInboundFrameHandler.notifyTextMessage(WebSocketInboundFrameHandler.java:203) at org.wso2.transport.http.netty.contractimpl.websocket.WebSocketInboundFrameHandler.channelRead(WebSocketInboundFrameHandler.java:158) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93) at io.netty.handler.codec.http.websocketx.Utf8FrameValidator.channelRead(Utf8FrameValidator.java:82) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:748)



**Related Issues:**
<!-- Any related issues such as sub tasks, issues reported in other repositories (e.g component repositories), similar problems, etc. -->