Open jroper opened 7 years ago
This approach works:
val flow = Flow.fromSinkAndSourceMat(Sink.asPublisher[Message](fanout = false), Source.asSubscriber[Message])(Keep.both)
val (responseFuture, (publisher, subscriber)) = http.singleWebSocketRequest(
WebSocketRequest(wsUri, extraHeaders = filterHeaders(request.headers),
upgrade.requestedProtocols.headOption),
flow
)
responseFuture.map {
case ValidUpgrade(response, chosenSubprotocol) =>
val webSocketResponse = upgrade.handleMessages(
Flow.fromSinkAndSource(Sink.fromSubscriber(subscriber), Source.fromPublisher(publisher)),
chosenSubprotocol
)
webSocketResponse.withHeaders(webSocketResponse.headers ++ filterHeaders(response.headers))
case InvalidUpgradeResponse(response, cause) =>
log.debug("WebSocket upgrade response was invalid: {}", cause)
response
}
Thanks, @jroper. The observation is correct. I think these issues are all consequences of materialization. Materialization means that a stream has to be fully connected in one phase. If you have several phases where you want to connect streams dynamically, you will have to make use of those adapter pieces like hubs or Sink.asPublisher
that allow carrying over an open end from one materialization to the next.
I guess we could implement that singleRequest
signature you propose with basically the same technique that you propose for proxying those requests. IIRC one of the reasons not to expose this flow would be that it could be used only once (though, we have the a similar pattern in the TCP server API where the single-use flows are exposed).
An alternative could be to just document how to make it work, and maybe provide a flow adapter out of the box that already packages the publisher/subscriber pair?
def dynamicFlow[In,Out]: Flow[In, Out, Flow[In, Out, NotUsed]] =
Flow.fromSinkAndSourceMat(Sink.asPublisher[Message](fanout = false), Source.asSubscriber[Message])(Keep.both)
.mapMaterializedValue { case (publisher, subscriber) =>
Flow.fromSinkAndSource(Sink.fromSubscriber(subscriber), Source.fromPublisher(publisher))
}
I don't think we should implement that singleRequest
signature, it was one possible solution, not necessarily a good one. Another option is to write something that takes a Flow[Message, Message, _] => T
, for example:
singleWebSocketRequest[T](request: WebSocketRequest)(f: (WebSocketResponse, Flow[Message, Message, NotUsed]) => T): Future[T]
While in practice this isn't too much different to returning the flow directly, since the user could just return the flow as T
, the fact that the user is providing a callback to handle the flow rather than just being given the flow gives a hint that the user is supposed to use that flow once then and there. So proxying would just look something like this:
case Some(upgrade: UpgradeToWebSocket) =>
http.singleWebSocketRequest(WebSocketRequest(uri)) { (response, flow) =>
response match {
case ValidUpgrade(response) => upgrade.handleMessages(flow)
case InvalidUpgradeResponse(response, _) => response
}
}
It might also make sense to put the flow in the valid upgrade case class (of course it would probably have to be a different class to maintain backwards compatibility).
Anyway, I've needed to use something like dynamicFlow
on more than one occasion, so I wouldn't object to that being provided out of the box.
Hey, folks, I was just having the exact issue described here - and Frederico Pellegatta's response at the bottom of this Stack Overflow issue helped
I am still experiencing some hiccups, but for the most part things are transmitting on a long-term websocket connection using an adapted version of his approach. I'm leaving this note here for two reasons:
The APIs currently offered by Akka HTTP for WebSockets don't allow a WebSocket to be cleanly proxied (at least as far as I can tell). This is particularly a problem if building a general purpose HTTP proxy.
To correctly proxy a WebSocket, you need to get the response from the downstream server before you send a response to the client. There are a few options for Akka HTTP WebSocket client handling, but I think none of them work cleanly:
I think in order to implement an HTTP proxy in akka-http that's capable of proxying WebSockets, that either the regular HTTP client flow must support WebSocket requests (via setting an upgrade header on a regular request, in this case, a proxied request could be forwarded as is - I haven't actually checked whether this is supported, I assumed not because the documentation doesn't mention it), or the client WebSocket API needs to offer something along the lines of: