softwaremill / tapir

Rapid development of self-documenting APIs
https://tapir.softwaremill.com
Apache License 2.0
1.36k stars 418 forks source link

[BUG] Fs2/Akka WebSocket client truncates fragmented frame #917

Closed tom91136 closed 3 years ago

tom91136 commented 3 years ago

Tapir version: 0.17.1

Scala version: 2.13.4

Describe the bug

While using the Fs2 based WebSocket client implementation (backed by JDK HttpClient), I noticed long text messages are truncated. In my case I was using circe to encode/decode the messages so it wasn't obvious this was happening.

Digging deeper, I found:

https://github.com/softwaremill/tapir/blob/2b06de7e804802083651c4efcbd919bccb3bd555/client/sttp-client/src/main/scalajvm/sttp/tapir/client/sttp/ws/fs2/WebSocketToFs2Pipe.scala#L22

So client's send implementation doesn't handle fragmentation, which is fine as WebSocket doesn't impose a meaningful base frame size limit. For comparison, the Tapir's Http4s fs2 WebSocket server implementation fragments messages into 16KB frames.

We do have an actual bug in the receives stream where a Binary or Text frame is decoded as-is without proper fragmentation handling:

https://github.com/softwaremill/tapir/blob/2b06de7e804802083651c4efcbd919bccb3bd555/client/sttp-client/src/main/scalajvm/sttp/tapir/client/sttp/ws/fs2/WebSocketToFs2Pipe.scala#L31-L36

To verify this, I've implemented a custom WebSocketToPipe[R] where the receiving end handles fragmentation appropriately:

import cats.MonadError
import cats.effect.Concurrent
import cats.syntax.all._
import fs2.Stream
import sttp.capabilities.fs2.Fs2Streams
import sttp.capabilities.{Effect, WebSockets}
import sttp.tapir.client.sttp.WebSocketToPipe
import sttp.tapir.{DecodeResult, WebSocketBodyOutput, WebSocketFrameDecodeFailure}
import sttp.ws.{WebSocket, WebSocketFrame}

import scala.reflect.ClassTag

object TapirPatches {

  object fragmentedWs {
    implicit def webSocketsSupportedForFs2Streams[F[_]: Concurrent]
        : WebSocketToPipe[Fs2Streams[F] with WebSockets] =
      new WebSocketToFs2Pipe[F, Fs2Streams[F] with WebSockets]
    implicit def webSocketsSupportedForFs2StreamsAndEffect[F[_]: Concurrent]
        : WebSocketToPipe[Effect[F] with Fs2Streams[F] with WebSockets] =
      new WebSocketToFs2Pipe[F, Effect[F] with Fs2Streams[F] with WebSockets]
  }

  class WebSocketToFs2Pipe[_F[_]: Concurrent, R <: Fs2Streams[_F] with WebSockets]
      extends WebSocketToPipe[R] {
    override type S = Fs2Streams[F]
    override type F[X] = _F[X]

    override def apply[REQ, RESP](
        s: Any
    )(ws: WebSocket[F], o: WebSocketBodyOutput[Any, REQ, RESP, _, Fs2Streams[F]]): Any = {
      in: Stream[F, REQ] =>
        val sends = in
          .map(o.requests.encode)
          .evalMap(ws.send(_, isContinuation = false)) // TODO support fragmented frames

        def decode(frame: WebSocketFrame) =
          o.responses.decode(frame) match {
            case failure: DecodeResult.Failure =>
              MonadError[F, Throwable].raiseError(
                new WebSocketFrameDecodeFailure(frame, failure)
              )
            case DecodeResult.Value(v) => (Right(Some(v)): Either[Unit, Option[RESP]]).pure[F]
          }

        def raiseBadAccumulator(acc: WebSocketFrame, current: WebSocketFrame) =
          MonadError[F, Throwable].raiseError(
            new WebSocketFrameDecodeFailure(
              current,
              DecodeResult.Error(
                "Bad frame sequence",
                new Exception(
                  s"Invalid accumulator frame: $acc, it can't be concatenated with $current"
                )
              )
            )
          )

        def concatOrDecode[A <: WebSocketFrame: ClassTag](
            acc: Option[WebSocketFrame],
            frame: A,
            last: Boolean
        )(f: (A, A) => A): F[(Option[WebSocketFrame], Either[Unit, Option[RESP]])] =
          if (last) (acc match {
            case None       => decode(frame)
            case Some(x: A) => decode(f(x, frame))
            case Some(bad)  => raiseBadAccumulator(bad, frame)
          }).map(none[WebSocketFrame] -> _)
          else
            (acc match {
              case None       => frame.some.pure[F]
              case Some(x: A) => f(x, frame).some.pure[F]
              case Some(bad)  => raiseBadAccumulator(bad, frame)
            }).map(acc => acc -> ().asLeft)

        val receives = Stream
          .repeatEval(ws.receive())
          .evalMapAccumulate[F, Option[WebSocketFrame], Either[Unit, Option[RESP]]](
            none[WebSocketFrame]
          ) { // left - ignore; right - close or response
            case (acc, _: WebSocketFrame.Close) if !o.decodeCloseResponses =>
              (acc -> (Right(None): Either[Unit, Option[RESP]])).pure[F]
            case (acc, _: WebSocketFrame.Pong) if o.ignorePong =>
              (acc -> (Left(()): Either[Unit, Option[RESP]])).pure[F]
            case (acc, WebSocketFrame.Ping(p)) if o.autoPongOnPing =>
              ws.send(WebSocketFrame.Pong(p))
                .map(_ => acc -> (Left(()): Either[Unit, Option[RESP]]))
            case (prev, frame @ WebSocketFrame.Text(_, last, _)) =>
              concatOrDecode(prev, frame, last)((l, r) => r.copy(payload = l.payload + r.payload))
            case (prev, frame @ WebSocketFrame.Binary(_, last, _)) =>
              concatOrDecode(prev, frame, last)((l, r) => r.copy(payload = l.payload ++ r.payload))
            case (_, frame) =>
              MonadError[F, Throwable].raiseError(
                new WebSocketFrameDecodeFailure(
                  frame,
                  DecodeResult.Error(
                    "Unrecognised frame type",
                    new Exception(s"Unrecognised frame type: ${frame.getClass}")
                  )
                )
              )
          }
          .collect { case (_, Right(d)) => d }
          .unNoneTerminate

        sends.drain.merge(receives)
    }
  }

}

And everything works as expected. Akka version looks to have identical issues.

If we only want to address the receive fragmentation bug I can open a PR for this is you guys are cool with that, although I don't have much experience on the Akka side.

adamw commented 3 years ago

Sorry for the late response :) As you have the code ready, if you could do a PR which fixes the problem that would be great. Do you have a test for that as well? If so, this could guide the Akka implementation

tom91136 commented 3 years ago

Thanks for getting back! I'll prepare a PR and add some tests.

adamw commented 3 years ago

Fixed in https://github.com/softwaremill/tapir/pull/1034

tom91136 commented 3 years ago

Thanks for getting this through and sorry about the silence, lots of paper deadlines last month.