akka / akka-http

The Streaming-first HTTP server/module of Akka
https://doc.akka.io/docs/akka-http
Other
1.34k stars 594 forks source link

Better "collect Strict" specifically for WebSockets Flow #65

Open akka-ci opened 8 years ago

akka-ci commented 8 years ago

Issue by ktoso Monday Mar 21, 2016 at 09:59 GMT Originally opened as https://github.com/akka/akka/issues/20096


This is what people do (and I'd do so myself):

        val flow = Flow[Message].collect {
          case TextMessage.Strict(msg) =>
            msg.parseJson.convertTo[Anything]

which can randomly break "almost work", due to even the smallest message sometimes ending up as Streamed (I've seen this with msg as small as "Hello world").

"Almost work" here is defined as – small message comes in, but ends up being streamed, the collect here drops it, processing is stalled until subscription timeout triggers killing the textStream of the streamed message.

We should provide a safe way to "I know these will be strict, or force them to be". This is somewhat related to the "auto drain" feature for HttpRequests too, however likely a different trick will have to be applied.

For reference, this is what I ended up with in a project I hackked on:

  implicit class flowTweaks[M](val wsInput: Source[Message, M]) {
    def forceTextStrict: Source[Strict, M] = wsInput
      .collect {
        case TextMessage.Strict(text)             ⇒ Future.successful(text)
        case TextMessage.Streamed(textStream)     ⇒ textStream.runFold("")(_ + _)
        case BinaryMessage.Strict(binary)         ⇒ Skip
        case BinaryMessage.Streamed(binaryStream) ⇒ binaryStream.runWith(Sink.ignore); Skip
      }
      .filterNot(_ == Skip)
      .mapAsync(1)(ConstantFun.scalaIdentityFunction)
      .map(TextMessage.Strict)
    }
akka-ci commented 8 years ago

Comment by ktoso Monday Mar 21, 2016 at 11:20 GMT


This was already brought up in a somewhat similar issue: https://github.com/akka/akka/issues/19089

akka-ci commented 8 years ago

Comment by rkuhn Monday Mar 21, 2016 at 13:55 GMT


I think it is a mistake to allow users to pattern-match on strict entities, this opens up all kinds of scenarios that “work fine” in tests but fail in production. The reason is that whether something is strict or not depends on random interactions of TCP flow control and buffer sizes, strictness is a low-level implementation detail and not dependable for user-space.

Of course I can already hear the screams of the benchmark crazies that care less about correctness and more about arcane optimizations and general geekiness.

Therefore the probable solution is that we provide a proper (i.e. working but also configurably bounded) implementation of how to convert incoming websocket messages into their strict representation.

akka-ci commented 8 years ago

Comment by ktoso Monday Mar 21, 2016 at 14:13 GMT


The reason is that whether something is strict or not depends on random interactions of TCP flow control and buffer sizes, strictness is a low-level implementation detail and not dependable for user-space

Yeah, very true - it's not predictable. One can't even assume it is Strict for a really small message ("Hello").

Also, then it seems we do not want to: https://github.com/akka/akka/pull/20087 (currently it is impossible to do the draining even - since you can't match on Streamed), so most if not all impls are wrong - assuming Strict.

We could turn our thinking about it upside down a bit, and say one should never match on either Strict or Streamed, and provide DSL or directives to do the right thing. As @jrudolph notices in the other thread, one usually works with small messages so to-strict-ing incoming messages may not be as bad as in the general setting, so we could provide directives:

handleWebSocket[TextMessage.Strict](flow)

instead... WDYT?

akka-ci commented 8 years ago

Comment by rkuhn Monday Mar 21, 2016 at 14:28 GMT


Yes, that’s good as well, but it should also have an upper bound on message size—and if deviations are wanted on a per-case basis then handing the full protocol should be possible (as you note, this requires users to be able to actually do that).

akka-ci commented 8 years ago

Comment by ktoso Monday Mar 21, 2016 at 14:29 GMT


Yes, that’s good as well, but it should also have an upper bound on message size

Yeah good point, a default from config though, otherwise a bit too clunky

akka-ci commented 8 years ago

Comment by mmacfadden Monday Apr 11, 2016 at 00:34 GMT


+1 on this issue. I think in addition to adding functionality an example somewhere in the docs on handling streamed vs strict messages would also be great.

akka-ci commented 8 years ago

Comment by antonkulaga Wednesday Jul 06, 2016 at 22:12 GMT


yes, the bug is very annoying, cause seame messages are almost always Strict on localhosts and tests but often Streamed in production enviroments =(

ac2epsilon commented 8 years ago

+1 Cześć ktoso. I tried to copy-n-paste your implicit flowTweaks, and found that ConstantFun is not public any more, is it? What is a replacement? Also I'm unsure what is Skip and where is it sitting. Looks like Streamed case have no any actual (production) example in whole net. So it will be very kind of you or somebody to give us a link to actual piece of code which is up and running on akka.latest. Kind of proof of concept. BTW I experienced heavy penchant of Windows 10 to Streamed over Strict even on reasonably small payloads, I suspect they redused buffers to be more gadget-friendly (?)

ktoso commented 8 years ago

Constant fun is just cached functions, you can simply use a normal function there (A => A). When you get a Streamed/Strict entity depends on many things, it's not strictly set by Akka AFAIR (something we could improve, sure).

cleverdeveloper commented 7 years ago

Why not just stream flatMapConcat (_.asTextMessage.getStreamedText)?

jrudolph commented 7 years ago

@cleverdeveloper because it will mix data together from different WS messages.

cleverdeveloper commented 7 years ago

@jrudolph You are right, somehow missed it. The right code should, probably, be the following, which is not much shorter than original snippet:

stream flatMapConcat { message =>
  message.asTextMessage.getStreamedText via Flow[String].fold("")(_ + _)
}
atemerev commented 6 years ago

Any news on this? I am also getting a weird error message if I am trying to do the getStreamedText..via trick (probably because getStreamingText is the part of Java API?)

Error:(44, 29) no type parameters for method flatMapConcat: (f: akka.http.scaladsl.model.ws.Message => akka.stream.Graph[akka.stream.SourceShape[T],M])Gdax.this.wsFlow.Repr[T] exist so that it can be applied to arguments (akka.http.scaladsl.model.ws.Message => akka.stream.javadsl.Source[String, _])
 --- because ---
argument expression's type is not compatible with formal parameter type;
 found   : akka.http.scaladsl.model.ws.Message => akka.stream.javadsl.Source[String, _]
 required: akka.http.scaladsl.model.ws.Message => akka.stream.Graph[akka.stream.SourceShape[?T],?M]
    val stringFlow = wsFlow.flatMapConcat(msg => msg.asTextMessage.getStreamedText via Flow[String].fold("")(_ + _))
raboof commented 6 years ago

Remember that asTextMessage will throw for binary messages.

Nowadays it might be a solution to use TestMessage::toStrict as introduced in #1980. Some convenient additional API's have been proposed in #1979.

Given those 2 other tickets I think this one can now be closed, WDYT?