raquo / Airstream

State propagation and event streams with mandatory ownership and no glitches
MIT License
245 stars 28 forks source link

Add HTTP/websocket event sources #49

Open ajaychandran opened 3 years ago

ajaychandran commented 3 years ago

I am currently using these as HTTP event sources. Would it make sense to add these to the library?

lolgab commented 3 years ago

You can wrap Ajax requests this way:

import org.scalajs.dom.ext.Ajax
EventStream.fromFuture(Ajax.get(url))
ajaychandran commented 3 years ago

@lolgab Yes, but the Future is evaluated eagerly. In contrast, AjaxEventStream submits the request in onStart, thus delaying effects (network request) until observers are registered.

lolgab commented 3 years ago

Then maybe there should be something to integrate things lazily but on a more general level. Integrating Ajax directly in Airstream sounds too specific to me.

raquo commented 3 years ago

I think having a reference implementation of doing basic stuff like Ajax would be useful. There are three potential places for it:

Stuff that we add to Airstream should be pretty unopinionated. For example, I've been wanting to move DomEventStream from Laminar to Airstream for a while now. Given that the implementation of AjaxEventStream is pretty bare bones, I think it would be nice to include it in Airstream itself.

A few notes though:


there should be something to integrate things lazily but on a more general level

Currently one can achieve this with something like:

def fromLazyFuture[A](future: => Future[A], emitOnce: Boolean): EventStream[A] = {
  EventStream
    .fromValue((), emitOnce =emitOnce)
    .flatMap(_ => future)
}

I guess I could add this EventStream and a similar one to Signal.

raquo commented 3 years ago

Oh and, I didn't have the time to evaluate the websockets helper yet. I think websockets might require a more opinionated API, but I'll check it out a bit later.

ajaychandran commented 3 years ago

Thanks for your suggestions. I will incorporate them and submit a PR.

Regarding websockets,

raquo commented 3 years ago

Ok wrt websockets! Try to keep it unopinionated so that users can build their custom code based on it, like you did with ajax.

regarding "restart on error" – I'm guessing recoverToTry followed by flatMap can achieve this, but it could be combined into a single operator like recoverTo(err => observable) or recoverTo(observable). The latter one can be made to behave better wrt glitches. Not 100% sure about the other one, that might require a transaction boundary similar to the flatMap solution.

regarding retrys – what kind of operators / signatures are you thinking about, specifically? Kinda hard to translate ZIO stuff directly as it has many more concepts than Airstream. We have EventStream.periodic which is flexible enough to be made exponential or fibonacci, and that can probably be combined into a retry pattern again with flatMap. If there is a canonical use case for retrys in Airstream, we could include it as a standalone operator.

ajaychandran commented 3 years ago

The usecase I had in mind is to restart a websocket stream when the underlying connection stops, for instance, when the user loses connectivity. When connectivity is resumed, it may be desirable to establish a new websocket connection automatically.

One way to do this is to use a control parameter like maxReconnectAttempts in the WebSocketEventStream constructor. I was wondering if this could be generalized to a stream combinator.

raquo commented 3 years ago

Yes, sorry, I worded it poorly, I understand the use cases for retrys, but I was also wondering the same thing, whether you had a specific method signature and behaviour in mind that would be general enough to include as an operator for all streams. I'm thinking it could be a certain variation of recoverTo.

ajaychandran commented 3 years ago

How about restartOn(control: EventStream[Any])? This is flexible enough to accommodate a wide variety of usecases (restart max n times, restart after d duration, restart max n times + after d duration). Can this be implemented by calling onStop followed by onStart?

raquo commented 3 years ago

But, this would only work for streams that perform a side effect (that we want to re-run) when they start. Most streams don't do that.

Even if you're listening to a stream that depends on one of those streams that perform a side effect on start, and you stop & start that child stream, that is no guarantee that the parent stream that actually performs the side effect will be restarted (that will only happen if the child stream was its only listener).

I think the problem is, streams aren't effects. You can retry an effect like IO or Task or even Future, but retrying a stream doesn't seem to make sense in general. Or, to the extent that it does, the retry logic needs to be contained within the stream that actually performs the side effect. So if there is any reusability to be gained there, it does not seem like it will be in the form of a general purpose operator.

ajaychandran commented 3 years ago

The initial draft for the websocket implementation is available in this branch..

Need some input on the following:

I will test this implementation in my application and explore the restart-on-error usecase.

ajaychandran commented 3 years ago

The above issues have been resolved for now.

  • The type of dom.MessageEvent.data is Any. This would require a cast at call site, event.data.asInstanceOf[String]. Is this fine?

Added project parameter and Builder type to handle this.

  • When transmitting messages, if it is detected that the underlying socket was closed, should this be reported downstream as an error? If so, then what type of error?

Redefined error type to support transmission and termination errors.

  • What to do with the errors on the transmission channel?

These are not propagated.

TODO Test implementation and open PR.

raquo commented 3 years ago

As I'm adding more params to AjaxStreamRequest, I'm getting more and more annoyed by having to duplicate all them in the six factory methods (apply / get / post / etc.). How about we just have one apply factory that covers all http methods?

// Usage
AjaxEventStream(_.GET, url, ...)

where the first param would be HttpMethod.type => HttpMethod and

@js.native
trait HttpMethod extends js.Any

object HttpMethod {
  val GET = "GET".asInstanceOf[HttpMethod]
  val POST = "POST".asInstanceOf[HttpMethod]
  val PUT = "PUT".asInstanceOf[HttpMethod]
  val PATCH = "PATCH".asInstanceOf[HttpMethod]
  val DELETE = "DELETE".asInstanceOf[HttpMethod]
}
ajaychandran commented 3 years ago

How about something similar to this pattern?

object AjaxEventStream {

  def apply(
    url: String,
    data: dom.ext.Ajax.InputData = null,
    timeout: Int = 0,
    headers: Map[String, String] = Map.empty,
    withCredentials: Boolean = false,
    responseType: String = "",
    progressObserver: Observer[(dom.XMLHttpRequest, dom.ProgressEvent)] = Observer.empty,
    readyStateChangeObserver: Observer[dom.XMLHttpRequest] = Observer.empty
  ) = new Builder(url, data, timeout, headers,  withCredentials, responseType, progressObserver, readyStateChangeObserver)

  final class Builder(
    url: String,
    data: dom.ext.Ajax.InputData,
    timeout: Int,
    headers: Map[String, String],
    withCredentials: Boolean,
    responseType: String,
    progressObserver: Observer[(dom.XMLHttpRequest, dom.ProgressEvent)],
    readyStateChangeObserver: Observer[dom.XMLHttpRequest]
  ) {

    def get: EventStream[dom.XMLHttpRequest] = ???
    def post: EventStream[dom.XMLHttpRequest] = ???
    // and so on...
  }
}
raquo commented 3 years ago

What would be in those ??? then? I assume it's new AjaxEventStream(...) with all the same params again for each method. So it only saves ~50% of the boilerplate.

raquo commented 3 years ago

I guess AjaxEventStream could accept builder as a single param, but I don't like that kind of coupling. Having a lambda for one param in one factory, that doesn't leak into AjaxEventStream itself, would be simpler imo.

(This is as concerning Ajax, I haven't had a change to review Websockets stuff yet. I should probably look at that first so that the API ends up more or less consistent if that makes sense)

ajaychandran commented 3 years ago

What would be in those ??? then? I assume it's new AjaxEventStream(...) with all the same params again for each method.

Correct. I prefer the usage pattern it promotes.

AjaxEventStream(url).get
// versus
AjaxEventStream(_.GET, url, ...))

So it only saves ~50% of the boilerplate.

An alternate encoding could be

object AjaxEventStream {
  sealed abstract class method(name: String) {
    final def apply(
      url: String,
      data: dom.ext.Ajax.InputData = null,
      timeout: Int = 0,
      headers: Map[String, String] = Map.empty,
      withCredentials: Boolean = false,
      responseType: String = "",
      progressObserver: Observer[(dom.XMLHttpRequest, dom.ProgressEvent)] = Observer.empty,
      readyStateChangeObserver: Observer[dom.XMLHttpRequest] = Observer.empty): EventStream[dom.XMLHttpRequest] =
    new AjaxEventStream(name, url, data, timeout, headers, withCredentials, responseType, progressObserver, readyStateChangeObserver)
  }

  final object get extends method("GET")
  final object post extends method("POST")
  // and so on
}

// usage
AjaxEventStream.get(???)
raquo commented 3 years ago

Hm that last one is pretty neat. If it works ok with editor autocomplete (I'll check intellij), that's a winner.

Side note, final would be redundant in this case even on Scala 2.12 I think.

ajaychandran commented 3 years ago

Side note, final would be redundant in this case even on Scala 2.12 I think.

Thanks. I had extended this best-practice to non-case objects and followed it blindly. Time to self correct!