pangiole / akka-wamp

WAMP - Web Application Messaging Protocol implementation written with Akka
Other
47 stars 12 forks source link

[error] a.w.s.SerializationFlows - Timeout on waiting for new data #43

Open asciiu opened 7 years ago

asciiu commented 7 years ago

I am attempting to use your actor example to subscribe to a production web socket feed. All is well except this exception (refer to trace below). I'm not sure how to reproduce this other than connecting/subscribing and then waiting for the appropriate condition to occur. Very frequently, I have encountered this exception after leaving my actor to run in the background. I can't seem to keep my connection to the production server alive. Thanks again for making this available!

[error] a.w.s.SerializationFlows - Timeout on waiting for new data
akka.wamp.serialization.DeserializeException: Timeout on waiting for new data
    at akka.wamp.serialization.JsonSerialization.make$1(JsonSerialization.scala:47)
    at akka.wamp.serialization.JsonSerialization.deserialize(JsonSerialization.scala:163)
    at akka.wamp.serialization.JsonSerializationFlows$$anonfun$2.apply(JsonSerializationFlows.scala:58)
    at akka.wamp.serialization.JsonSerializationFlows$$anonfun$2.apply(JsonSerializationFlows.scala:53)
    at akka.stream.impl.fusing.Map$$anon$8.onPush(Ops.scala:42)
    at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:747)
    at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:710)
    at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:616)
    at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:471)
    at akka.stream.impl.fusing.GraphInterpreterShell.receive(ActorGraphInterpreter.scala:423)
Caused by: java.io.IOException: Timeout on waiting for new data
    at akka.stream.impl.io.InputStreamAdapter$$anonfun$read$1.apply$mcI$sp(InputStreamSinkStage.scala:147)
    at akka.stream.impl.io.InputStreamAdapter$$anonfun$read$1.apply(InputStreamSinkStage.scala:132)
    at akka.stream.impl.io.InputStreamAdapter$$anonfun$read$1.apply(InputStreamSinkStage.scala:132)
    at akka.stream.impl.io.InputStreamAdapter.executeIfNotClosed(InputStreamSinkStage.scala:112)
    at akka.stream.impl.io.InputStreamAdapter.read(InputStreamSinkStage.scala:132)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.loadMore(UTF8StreamJsonParser.java:207)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parseNumber2(UTF8StreamJsonParser.java:1470)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parsePosNumber(UTF8StreamJsonParser.java:1378)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:852)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:748)
asciiu commented 7 years ago

So after doing some digging around I modified the same code that I had in my original pull request.

  // JsonSerializationFlows.scala 
  val deserialize: Flow[websocket.Message, wamp.Message, NotUsed] =
    Flow[websocket.Message]
      .mapAsync(1) {
        case TextMessage.Strict(text) =>
          Future.successful(serialization.deserialize(Source.single(text)))

        case TextMessage.Streamed(source) =>
          source.runReduce(_ + _).map ( str => serialization.deserialize(Source.single(str)))
          //serialization.deserialize(source)

And the timeout waiting for new data miraculously goes away. I think the issue is with

// file JsonSerialization.scala line 39
  val inputStream = source.
      map(ByteString(_)).
      runWith(StreamConverters.asInputStream())

Perhaps this is the same issue discussed here: https://github.com/akka/akka/issues/19392

pangiole commented 7 years ago

Hi asciuu,

thanks very much for using akka-wamp Please send emails to paolo.angioletti@gmail.com next time. That's the account I read the most of the time. Feel free to file a new issue on GitHub and a pull request. I'll be happy to consider your investigations and fixes.

Cheers Paolo

On Sun, Oct 30, 2016 at 1:11 AM, asciiu notifications@github.com wrote:

So after doing some digging around I modified the same code that I had in my original pull request.

val deserialize: Flow[websocket.Message, wamp.Message, NotUsed] = Flow[websocket.Message] .mapAsync(1) { case TextMessage.Strict(text) => Future.successful(serialization.deserialize(Source.single(text)))

    case TextMessage.Streamed(source) =>
      source.runReduce(_ + _).map ( str => serialization.deserialize(Source.single(str)))
      //serialization.deserialize(source)

And the timeout waiting for new data miraculously goes away. I think the issue is with

// file JsonSerialization.scala line 39 val inputStream = source. map(ByteString(_)). runWith(StreamConverters.asInputStream())

Perhaps this is the same issue discussed here: akka/akka#19392 https://github.com/akka/akka/issues/19392

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/angiolep/akka-wamp/issues/43#issuecomment-257125617, or mute the thread https://github.com/notifications/unsubscribe-auth/ACIVuhJKAbZQPKYMB_n0IiDt_RvjPGAXks5q4-7UgaJpZM4KggQ8 .

jgfaust commented 7 years ago

I see this timeout in the Poloniex example application (https://github.com/angiolep/akka-wamp/blob/master/examples/poloniex/src/main/java/PoloniexJavaClient.java).

Change the subscription on line 12 from "BTC_XMR" to "ticker", and run the example, like this:

import akka.actor.*;
import akka.wamp.client.japi.*;

public class PoloniexJavaClient {
    public static void main(String[] aaa) {
        ActorSystem actorSystem = ActorSystem.create();
        Client client = Client.create(actorSystem);

        client.connect("wss://api.poloniex.com", "json").thenAccept(conn -> {
            conn.open("realm1").thenAccept(session -> {

                session.subscribe("ticker", event -> {
                    System.out.printf("%s --> %s\n", event.kwargs(), event.args());
                });
            });
        });
    }
}

This always causes the akka.wamp.serialization.DeserializeException: Timeout on waiting for new data within 2 minutes of running.