Open mushtaq opened 6 years ago
Thanks for opening this issue, as the websocket API and DSL ist still totally undocumented. We split-off this library from one web application. The normal http and ws API should be quite flexible, while the DSL part is rather opinionated. But we are planning to simplify it - as we are restructuring our own backend and need some different implementation for event handling. So, you might want to wait a bit before using it too extensively :)
def fun[T](input: Observable[T]): Observable[T]
This general API is currently not supported, there are three reasons:
T
(though I am open for suggestions that allow this).Observable
is not serializable directly, but needs a special handling. But, I can imagine that we could support, e.g., the Result
type in arguments. With the Functor
and MonadError
for it, we can flatMap over the method. Alternatively: could def fun[T](input: T): Observable[T]
work for you? Then using, e.g., concatMap
in the client.Observable
as a response, but I am not quite there.What you can do is having one EventType for the whole Api, so you could write something like this:
trait Api[R[_]] {
def fun(input: Something): R[Something]
}
object Dsl extends ApiDsl[ApiEvent, ApiError, ApiState] {
}
object ApiImpl extends Api[dsl.ApiFunction] {
def fun(input: Something): ApiFunction[Something] = Action {
val result: Something = ???
val events: Observable[List[ApiEvent]] = ???
Future.successful( Returns(result, asyncEvents = events) )
}
}
Then you can consume events in the client via an Observable:
val client = WsClient[ByteBuffer, ApiEvent, ApiError](s"ws://localhost:$port", config)
val api = client.sendWithDefault.wire[Api[Future]]
// and consume events
client.observable.event.foreach { events =>
println(s"Got events from backend: $events")
}
Ok, let me digest this answer slowly. But while reading it, I realise a small mistake in my general function. I did not intent it to be generic. The emphasis was on multiple valued Observable
. So in retrospect, I just wanted to have:
def fun(input: Observable[Int]): Observable[Int]
Maybe rest of your answer has it, but thought should send this right away.
Sure and please let me know if something is unclear!
Ok, I understand it better now. Based your example, managed to create running streaming version here Thanks.
I have added support for monix Observable in API traits on the branch observable. Still working on a simplification of the event system and the Dsl part, but it should be usable like this:
trait Api {
def fun(a: Int): Observable[Int]
}
object ApiImpl extends Api {
def fun(a: Int): Observable[Int] = Observable.fromIterable(List(a, a * 2, a * 3))
}
val port = 9991
object Backend {
val router = Router[ByteBuffer, Observable]
.route[Api](ApiImpl)
def run() = {
val config = WebsocketServerConfig(bufferSize = 5, overflowStrategy = OverflowStrategy.fail)
val route = AkkaWsRoute.fromObservableRouter(router, config, failedRequestError = err => ApiError(err.toString))
Http().bindAndHandle(route, interface = "0.0.0.0", port = port)
}
}
object Frontend {
val config = WebsocketClientConfig()
val client = WsClient.streamable[ByteBuffer, Unit, ApiError](s"ws://localhost:$port/ws", config)
val api: Api = client.sendWith(requestTimeout = None).wire[Api]
}
Backend.run()
Frontend.api.fun(1).foreach(println(_)) // 1 2 3
You can get it via jitpack: "com.github.cornerman.covenant" %%% "covenant-ws" % "observable-SNAPSHOT"
Please feel free to comment on the usage and how you to intend to use it.
Thanks for the update.
We tried to use this branch in the repo created by @Bharat1103 and @mushtaq. Getting this error on the client side:
2018.06.11 09:35:17 [scala-execution-context-global-39] WARN mycelium.client.WebsocketClientWithPayload.$anon.onMessage.withRequestObserver:42 - Ignoring incoming response for '1', unknown sequence id.
Server says that client is connected. Could be a silly mistake in our configs.
Could be a silly mistake in our configs.
Or a silly mistake in my code :) Somehow the first message I have sent to the Client was a StreamCloseResponse
. This should be fixed now, please try again with covenant version 0add6d5
. Let me know if there are further problems.
Cancelling observable subscriptions on disconnect is next on my todo-list. Another question is how to combine observable and future methods in an API trait.
We also need to make client side observables lazy, such that they do not immediately initiate a request without any subscribers. Also what happens if the client unsubscribe from the observable? Should we send an unsubscribe from the client to the server? This could become quite difficult when, e.g., resubscribing again.
It works with the new fix. This API is much simpler to use than the ApiDsl approach.
One thing we learnt recently that http based protocols suddenly got a boost due to Http2 due to multiplexing. So, earlier each Server-Sent-Events from the browser used to take up one connection, by upgrading to Http2, it works with a single connection.
Unfortunately, this does not apply to websockets, where multiplexing needs to be handled by socket.io or such.
Good point about the multiplexing with http2. We are definitely interested in exploring the usage of server-sent-events with http2 (see https://github.com/cornerman/covenant/issues/6). But I am not totally sold on it yet, some points I got in the top of my head (please correct me if I am wrong):
I think it would be desireable to have one way to define you API and later decide to route it over http, http2 with server-sent-events or over websocket. Or even offering it over different protocols with different serialization. I hope to be able to do exactly this with covenant.
Agree with the point you made regarding http2. Thought would like to add that:
I had problems proxying http2 requests with nginx. We use nginx as a reverse proxy and offload TLS to it. Then we proxy incoming requests internally via http. Not sure, whether this is possible now, but afaik you need TLS for http2 to work end-to-end.
Akka-http is beginning to support Http2 without TLS. I guess they need it for akka-grpc project.
Multiple tabs with the same page: With http2, all requests might be send over the same TCP connection. This might be nice, but has the complication that all clients will get all server-sent-event on this connection. I then need to filter them out in the client or give a unique id to each client-server pair to listen only on events for this specific user.
Not sure this is the case. If we run sbt 'events-monitor-server/run'
from our repo and open https://localhost:9443/
in 10+ different tabs, browser console shows that they are only getting their own stream of events.
Covenant looks awesome!
Are you also planning to support streaming API like gRPC?
I see that the project has
Monix
dependency but unable to wrap my head around how to write the most general streaming API like this:def fun[T](input: Observable[T]): Observable[T]
I looked at the HttpSpec, but still figuring out if
DslApiImpl
allows that in some way.