Open bharatsingh-tc opened 6 years ago
Good point! The request handler does not close subscriptions it has made on behalf of a client. But it should, the subscriptions should be canceled as soon as the client disconnects. I will work on it.
@Bharat1103 If you have any feedback on the usage of the setup or the Dsl part specifcally, please let me know! We are likely going to refactor this soon, so this would be very interesting.
Setup for streaming is not as intuitive as request-response. It will be good if we could implement the streaming interface like
object StreamingImpl1 extends Streaming[Observable] {
override def from(a: Int): Observable[String] =
Observable.fromIterator(Iterator.from(a)).map(_.toString)
}
//or
object StreamingImpl2 extends Streaming[Source[_, Any]] {
override def from(a: Int): Source[String, Any] =
Source.fromIterator(() => Iterator.from(a)).map(_.toString)
}
We can let the user do the state management or error handling.
On the client-side, it will be good if method call directly return the stream.
val stream: Observable[String] = wsClient.from(23)
Which means each call results in a new websocket connection. Reusing the connection should be an optimization (via configs, implicits or different api).
Thanks for the feedback! I agree that the current way of emitting events is not really intuitive for building a pure streaming API -- but I will work on it!
This project was extracted from another application. There it makes sense for the client to consume events for the whole API in one event stream while still getting a single return value from each method.
We can let the user do the state management or error handling.
What kind of interface would you expect there? Does the error handling on the Observable itself suffice?
Which means each call results in a new websocket connection. Reusing the connection should be an optimization (via configs, implicits or different api).
I think, it is even possible to do this without needing multiple websocket connections. We could extend mycelium (which handles requests/responses over websocket messages) to use Observable
instead of Future
for representing a response to a request. In the client, we can then have a response observable. This will be completed when no further messages are to be expected and emit an error if something went wrong.
Maybe it is even possible to remove this global event observable on the client and just rely on such a streaming API - just need to rewrite some code :)
@Bharat1103 please see the update in #1, whether it helps your use-case.
I created a small repo where we can
sbt "runMain streaming.StreamingServer"
sbt "runMain streaming.JvmStreamingApp"
.Even after I disconnect the client (ctrl-C), I see that server keeps producing the values and stream is not closed.
Is it expected? Should the server not close the Monix stream automatically once client is gone?