finagle / finagle-websocket

Finagle Websocket clients and servers
35 stars 23 forks source link

Websocket server replying Futures #15

Open diegossilveira opened 8 years ago

diegossilveira commented 8 years ago

I'm making some experiments with finagle-websocket module. I would like the server to reply frames that it currently does not have, represented by Futures.

The code above illustrates my question:

package me.zup.tsung.cli

import java.net.URI

import com.twitter.concurrent.AsyncStream
import com.twitter.conversions.time._
import com.twitter.finagle.util.DefaultTimer
import com.twitter.finagle.websocket.{Frame, Request, Response}
import com.twitter.finagle.{Service, Websocket}
import com.twitter.util.{Await, Future}

object ClientTest {

  val client = Websocket.client.newService(":14000")
  val req = Request(new URI("/"), Map.empty, null, AsyncStream.empty[Frame])

  def start() = client(req).map(_.messages.foreach {
    case Frame.Text(m) => println(m)
    case _ => println("unknown")
  })
}

object Server extends App {
  implicit val timer = DefaultTimer.twitter

  def handler(): AsyncStream[Frame] = {
    Frame.Text("hello") +:: Frame.Text("world") +:: AsyncStream.fromFuture(Future.sleep(1.second).map((_) => Frame.Text("again")))
  }

  val server = Websocket.serve(":14000", new Service[Request, Response] {
    def apply(req: Request): Future[Response] = {
      Future.value(Response(handler()))
    }
  })

  ClientTest.start()

  Await.ready(server)
}

The expected output is:

hello
world
again  # after 1 second

But all I get is:

hello
world
<hanging>

Am I missing something?

luciferous commented 8 years ago

Yeah, I would expect that too. I'll have a look at this on the weekend.

luciferous commented 8 years ago

Hi @diegossilveira – OK, this is slightly tricky (and sorry to have taken so long to respond). The explanation for this (odd) result is that the stream is closing before the server can respond. It's closing because the end of the request stream is interpreted by the server as a close initiated by the client.

If you change your request to

val p = new Promise[AsyncStream[Frame]]
val frames = AsyncStream.flatten(AsyncStream.fromFuture(p))
val req = Request(new URI("/"), Map.empty, null, frames)

you will see the "again". However, this presents another problem: the client request needs to close eventually. Something must call p.setValue(AsyncStream.empty).