cornerman / mycelium

Requests, Responses and Events over Websockets in scala
MIT License
5 stars 4 forks source link

connection problem #198

Closed elyphas closed 1 year ago

elyphas commented 2 years ago

I am having a problem while mycelium is trying to set a connection. The error is this: Any idea? please the mycelium version is 0.3.1

ws_taak [ERROR] [11/03/2022 15:45:08.174] [default-akka.actor.default-dispatcher-8] [akka.actor.ActorSystemImpl(default)] Websocket handler failed with class scala.collection.immutable.$colon$colon cannot be cast to class java.lang.String (scala.collection.immutable.$colon$colon is in unnamed module of loader 'app'; java.lang.String is in module java.base of loader 'bootstrap'
ws_taak java.lang.ClassCastException: class scala.collection.immutable.$colon$colon cannot be cast to class java.lang.String (scala.collection.immutable.$colon$colon is in unnamed module of loader 'app'; java.lang.String is in module java.base of loader 'bootstrap')
ws_taak     at boopickle.BasicPicklers$StringPickler$.pickle(Pickler.scala:136)
ws_taak     at boopickle.PickleState.pickle(Pickler.scala:421)
ws_taak     at RESTServer$Pickler$macro$1$1$Pickler$macro$9$1$.pickle(RESTServer.scala:82)
ws_taak     at RESTServer$Pickler$macro$1$1$Pickler$macro$9$1$.pickle(RESTServer.scala:82)
ws_taak     at boopickle.CompositePickler.pickle(CompositePicklers.scala:28)
ws_taak     at boopickle.PickleImpl$.apply(Default.scala:67)
ws_taak     at boopickle.PickleImpl$.intoBytes(Default.scala:72)
ws_taak     at chameleon.ext.boopickle$$anon$1.serialize(boopickle.scala:11)
ws_taak     at chameleon.ext.boopickle$$anon$1.serialize(boopickle.scala:10)
ws_taak     at mycelium.akka.server.WebsocketServerFlow$.$anonfun$apply$11(WebsocketServerFlow.scala:81)
ws_taak     at akka.stream.impl.fusing.Map$$anon$1.onPush(Ops.scala:52)
ws_taak     at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:542)
ws_taak     at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:496)
ws_taak     at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:390)
ws_taak     at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:650)
ws_taak     at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:521)
ws_taak     at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:625)
ws_taak     at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:800)
ws_taak     at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:818)
ws_taak     at akka.actor.Actor.aroundReceive(Actor.scala:537)
ws_taak     at akka.actor.Actor.aroundReceive$(Actor.scala:535)
ws_taak     at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:716)
ws_taak     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
ws_taak     at akka.actor.ActorCell.invoke(ActorCell.scala:548)
ws_taak     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
ws_taak     at akka.dispatch.Mailbox.run(Mailbox.scala:231)
ws_taak     at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
ws_taak     at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
ws_taak     at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
ws_taak     at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
ws_taak     at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
ws_taak     at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
elyphas commented 2 years ago

I found more info:

ws_taak 19:17:20.838 [default-akka.actor.default-dispatcher-11] INFO akka.actor.DeadLetterActorRef - Message [mycelium.core.message.CallResponse] from Actor[akka://default/user/$a#-430912052] to Actor[akka://default/deadLetters] was not delivered. [2] dead letters encountered. If this is not an expected behavior then Actor[akka://default/deadLetters] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
ws_taak 19:17:20.840 [default-akka.actor.default-dispatcher-11] INFO akka.actor.DeadLetterActorRef - Message [mycelium.core.message.CallResponse] from Actor[akka://default/user/$a#-430912052] to Actor[akka://default/deadLetters] was not delivered. [3] dead letters encountered. If this is not an expected behavior then Actor[akka://default/deadLetters] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'
elyphas commented 2 years ago

Problem solved I changed this:

  //val handler = new FullRequestHandler[ByteBuffer, String, String, Option[String]] {
  val handler = new SimpleRequestHandler[ByteBuffer, String, String, Option[String]] {

and I got some functions out

elyphas commented 2 years ago

well, sorry, no what I was hoping And the problem seems is in this function:

  val handler = new FullRequestHandler[ByteBuffer, String, String, Option[String]] {
                   override def onClientConnect(
                                          client: NotifiableClient[String, Option[String]],
                                          state: Future[Option[String]]): Unit = {
                                    //This is generating my problem and I don´t know how to solve it
                                    //client.notify ( _ => Future.successful ( "started" :: Nil ) )
                                    clients += client
                                    ()
                      }
....
cornerman commented 2 years ago

I am not sure. The class-cast exception looks suspicious to me.From the stack trace, it seems to fail serializing an outgoing message.

So, when you notify with your string-events, you are sending them downstream and serialize them. They are using boopickle and there seems to cast a List as a String. Just a guess in the wild, but could there be a cast inside on of your own picklers?

elyphas commented 2 years ago

"could there be a cast inside on of your own picklers?"

no, this is my code.

object FullRequestHandler {

  implicit val executionContext = scala.concurrent.ExecutionContext.Implicits.global

  type Payload = ByteBuffer

  val handler = new FullRequestHandler[ByteBuffer, String, String, Option[String]] {
  //val handler = new SimpleRequestHandler[ByteBuffer, String, String, Option[String]] {

    val clients = mutable.HashSet.empty[NotifiableClient[String, Option[String]]]
    val events = mutable.ArrayBuffer.empty[String]

    override val initialState = Future.successful(None)

    override def onRequest(client: NotifiableClient[String, Option[String]], state: Future[Option[String]], path: List[String], payload: ByteBuffer) = {

      def deserialize[S: Pickler](ts: ByteBuffer) = Unpickle[S].fromBytes(ts)
      def serialize[S: Pickler](ts: S) = Right(Pickle.intoBytes[S](ts))
      def value[S: Pickler](ts: S, events: List[String] = Nil) = Future.successful(ReturnValue(serialize(ts), events))
      def valueFut[S: Pickler](ts: Future[S], events: List[String] = Nil) = ts.map(ts => ReturnValue(serialize(ts), events))
      def error(ts: String, events: List[String] = Nil) = Future.successful(ReturnValue(Left(ts), events))

      path match {
        case "getAllTypeDocuments" :: Nil =>    Response(state, value(true))
        case "getAllUsers" :: Nil =>    Response(state, value(true))
      }
    }

    override def onEvent(client: NotifiableClient[String, Option[String]], state: Future[Option[String]], newEvents: List[String]) = {
        events ++= newEvents
        val downstreamEvents = newEvents.map(event => s"${event}-ok")
        downstreamEvents.foreach(println)
        Reaction(state, Future.successful(downstreamEvents))
    }

    override def onClientConnect(client: NotifiableClient[String, Option[String]], state: Future[Option[String]]): Unit = {
        client.notify ( _ => Future.successful ( "started" :: Nil ) )
        clients += client
        ()
    }

    override def onClientDisconnect( client: NotifiableClient[String, Option[String]], state: Future[Option[String]], reason: DisconnectReason): Unit = {
        clients -= client
        ()
    }
  }
}

object RESTServer extends App {

  import FullRequestHandler._

  implicit val system = ActorSystem()
  implicit val materializer = ActorMaterializer()
  implicit val executionContext = scala.concurrent.ExecutionContext.Implicits.global

  import myceliumHandler.FullRequestHandler._

  val builder = implicitly[AkkaMessageBuilder[ByteBuffer]]
  val serializer = implicitly[Serializer[ClientMessage[ByteBuffer], ByteBuffer]]
  val deserializer = implicitly[Deserializer[ServerMessage[ByteBuffer, String, String], ByteBuffer]]

  val config = WebsocketServerConfig(bufferSize = 5, overflowStrategy = OverflowStrategy.fail)

  val server = WebsocketServer.withPayload(config, handler)

  val binding = Http().newServerAt(interface, port).bindFlow(handleWebSocketMessages(server.flow()))
} 
elyphas commented 1 year ago

Well, seems is a conflict between libraries but it compiles without errors. I get back to "0.1.0" it works very fine, but when I upgrade to "0.2.0" happens the error. Thank you.