micronaut-projects / micronaut-core

Micronaut Application Framework
http://micronaut.io
Apache License 2.0
6.04k stars 1.06k forks source link

Forward remote websocket client data stream to local websocket server #1958

Open Deviad opened 5 years ago

Deviad commented 5 years ago

Hello, for everyone to see my scenario I have prepared the following diagram: https://drive.google.com/open?id=16WCQqOK_KpoATLnRktxxv93A-sp8kpjp

Long story short as per the title right now it does not seam possible to do this with Micronaut.

Steps to Reproduce

1) Clone https://github.com/Deviad/clarity-config 2) clone https://github.com/Deviad/clarity 3) Substitute in clarity-transaction-dispatcher.yml

  httpsEndpoint: <infura_https_endpoint>
  wsEndpoint: <infura_ws_endpoint>

with

  httpsEndpoint: http://localhost:8545
  wsEndpoint: ws://localhost:8546

and commit changes locally

4) in /path/to/clarity-config-server/src/main/resources/application.yml
change uri: ${HOME}/projects/clarity-config-private with your path

5) Run docker-compose up -d in the main project folder.

6) Run clarity-config-server 7) Run clarity-eureka-server 8) Run clarity-dispatcher (the micronaut service)

Expected Behaviour

I should be able to feed broadcastSync function with a flowable stream of objects.

Actual Behaviour

By setting @Client("ws://localhost:8546") this generates an error to begin with because it does not allow for "ws://" apparently. I don't know if in my case the correct combination would be @Client("ws://localhost:8546") and @ClientWebSocket("/") (which fails at the moment for the above reason)

or if it should be @Client("/") and @ClientWebSocket("ws://localhost:8546")

Both of them do not work anyways.

Example Application

https://github.com/Deviad/clarity/blob/feature/use_micronaut/clarity-dispatcher/src/main/java/clarity/dispatcher/CustomWebsocketServer.java

graemerocher commented 5 years ago

You are passing a reactive object into broadcastSync:

https://github.com/Deviad/clarity/blob/feature/use_micronaut/clarity-dispatcher/src/main/java/clarity/dispatcher/CustomWebsocketServer.java#L42

I don't believe this use case is supported, you should use subscribe to the flowable and pass the emitted object to broadcastSync or preferably use broadcase which is the reactive variant

Deviad commented 5 years ago

Hello @graemerocher I will test this today as I get off from the plantation (the workplace :) ).

Deviad commented 5 years ago

OK, so this is how I have modified my code: https://github.com/Deviad/clarity/blob/feature/use_micronaut/clarity-dispatcher/src/main/java/clarity/dispatcher/CustomWebsocketServer.java

EthereumLowLevelWebsocketClient

  @OnMessage
  public void onMessage(String message) {
    messages.offer(message);
  }

CustomWebsocketServer

  @OnMessage
  public void onMessage(String topic, String username, String message, WebSocketSession session) {
    //        String msg = "[" + username + "] " + message;
    Map<String, Object> map =
        Stream.of(
                new AbstractMap.SimpleEntry<>("id", 1),
                new AbstractMap.SimpleEntry<>("method", "eth_subscribe"),
                new AbstractMap.SimpleEntry<>("params", new Object[] {"newHeads"}))
            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

    client
        .connect(map)
        .map(EthereumLowLevelWebsocketClient::getMessages)
        .doOnNext(
            x ->
                broadcaster.broadcastSync(
                    x.poll(), MediaType.TEXT_EVENT_STREAM_TYPE, (o)->true))
        .doOnError(Throwable::printStackTrace)
        .subscribe();
  }

this is how I call the client:

wscat -c ws://localhost:8887/ws/updatedbalance/davide

throws

io.micronaut.websocket.exceptions.WebSocketClientException: Error opening WebSocket client session: Required argument [String topic] not specified
    at io.micronaut.http.client.websocket.NettyWebSocketClientHandler.channelRead0(NettyWebSocketClientHandler.java:207)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297)
    at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:579)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:496)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:458)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:897)
    at io.micronaut.tracing.instrument.util.TracingRunnable.run(TracingRunnable.java:53)
    at io.micronaut.http.context.ServerRequestContext.with(ServerRequestContext.java:52)
    at io.micronaut.http.context.ServerRequestContext.lambda$instrument$0(ServerRequestContext.java:68)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:834)

And this way https://github.com/Deviad/clarity/blob/feature%2Fmicronaut_sixth_update/clarity-dispatcher/src/main/java/clarity/dispatcher/CustomWebsocketServer.java

I am getting:

 data: {"cancelled":false,"done":false,"count":1}
Deviad commented 5 years ago

EDIT I got it working with SSE (Server Side event), it should be working also with Connection1, Connection2 being both websockets (But I am using OKHttp client). https://github.com/Deviad/clarity/blob/feature/micronaut_eigth_attempt/clarity-dispatcher/src/main/java/clarity/dispatcher/ControllerExample.java https://github.com/Deviad/clarity/blob/feature/micronaut_eigth_attempt/clarity-dispatcher/src/main/java/clarity/dispatcher/WebSocketListenerImpl.java

I have a glitch I cannot fix though, the first time I load the Controller I see nothing, although the websocket service seams working by debugging. It's like data are being sent on a different thread. The second time I load the controller it starts to work like magic.

However, I should use the new micronaut beta version because I need @RequestScope since otherwise the beans would be alive even when the user closes the webpage.