softwaremill / tapir

Rapid development of self-documenting APIs
https://tapir.softwaremill.com
Apache License 2.0
1.35k stars 405 forks source link

How to handle an error with fs2.Stream based endpoint #3940

Open jenwirth opened 1 month ago

jenwirth commented 1 month ago

Hi,

I am struggling with adding an error out to a stream based endpoint. I want to return a 404 when the Stream has a UnknownAvatar error (inside the getAvatar i am adapting an IOException to UnknownAvatar)

Here's the endpoint definition:

def avatar[F[_]] =
  endpoint.get
    .in("user" / path[UserId].name("user-id").example(UserId(UUID.fromString("596cf534-9af8-47f5-ab7d-69f70e5e76a0"))) / "avatar.png")
    .errorOut(
      oneOf[AvatarError](
        oneOfVariant(
          statusCode(StatusCode.NotFound)
            .and(jsonBody[UnknownAvatar.type].description("no avatar found for user"))
        )
      )
    )
    .out(streamBinaryBody(Fs2Streams[F])(new CodecFormat {
      override def mediaType: MediaType = MediaType.ImagePng
    }))

The endpoint implementation:

val avatarSEP = UserProfileEndpoint.avatar[F].serverLogic { id =>
  val value: fs2.Stream[F, Byte] = profileManager.getAvatar(id)

  value
    .onError { case t: Throwable =>
      fs2.Stream.eval(Logger[F].debug(s"error encountered (1) ${t.getClass.getName}"))
    }
    .pure[F]
    .onError { case t: Throwable => Logger[F].debug(s"error encountered  (2) ${t.getClass.getName}") }
    .attemptNarrow[AvatarError]
}

This does not work and a left value is still returned to the client. Resulting in a "Connection prematurely closed DURING response". The only debug message i see is the first one.

Thanks so much for any help!

adamw commented 1 month ago

I think the problem is that the error is only returned as part of a stream. So: you are always returning a fs2.Stream. This means that you always have to return the Right-hand side, as the result of the server logic. Tapir interprets it as a "successful response", and uses the successful output (here: a streaming one). Now, as part of generating the stream, there might be an error - which happens in your case - causing the stream to become broken.

However, it's already too late to change how the response is generated, as we've already chosen the successful output. Note that in theory, such a stream error might occur also when e.g. half of the avatar is already transmitted.

The proper solution would be to change the signature of the profileManager.getAvatar method. It should return e.g. a F[fs2.Stream[F, Byte]], where the outer left would represent the effect of looking up the avatar. Then you could recover from that, and use the error output. The streaming output would only be used if the avatar has been found.

adamw commented 1 month ago

Here's a working example, using IO instead of F[_]. Note that the successful result of getAvatar is mapped to a Right, and the errors are recovered using recoverWith:

import cats.effect.*
import cats.syntax.all.*
import io.circe.generic.auto.*
import org.http4s.HttpRoutes
import org.http4s.blaze.server.BlazeServerBuilder
import org.http4s.server.Router
import sttp.capabilities.fs2.Fs2Streams
import sttp.model.{MediaType, StatusCode}
import sttp.tapir.*
import sttp.tapir.generic.auto.*
import sttp.tapir.json.circe.jsonBody
import sttp.tapir.server.http4s.Http4sServerInterpreter

import scala.concurrent.ExecutionContext

object HelloWorldHttp4sServer extends IOApp:
  sealed class AvatarError extends Exception
  case object UnknownAvatar extends AvatarError

  def avatar[F[_]] =
    endpoint.get
      .in("user" / path[String] / "avatar.png")
      .errorOut(
        oneOf[AvatarError](
          oneOfVariant(
            statusCode(StatusCode.NotFound)
              .and(jsonBody[UnknownAvatar.type].description("no avatar found for user"))
          )
        )
      )
      .out(streamBinaryBody(Fs2Streams[F])(new CodecFormat {
        override def mediaType: MediaType = MediaType.ImagePng
      }))

  def getAvatar(id: String): IO[fs2.Stream[IO, Byte]] =
    if id == "ok" then IO.pure(fs2.Stream.fromIterator[IO]("picture".getBytes.iterator, 16)) else IO.raiseError(UnknownAvatar)

  def avatarSEP = avatar.serverLogic { id =>
    val value = getAvatar(id)

    value
      .map(stream => Right(stream))
      .recoverWith { case UnknownAvatar =>
        IO(println("error encountered")).map(_ => Left(UnknownAvatar))
      }
  }

  val helloWorldRoutes: HttpRoutes[IO] = Http4sServerInterpreter[IO]().toRoutes(avatarSEP)

  implicit val ec: ExecutionContext = scala.concurrent.ExecutionContext.Implicits.global

  override def run(args: List[String]): IO[ExitCode] =
    // starting the server
    BlazeServerBuilder[IO]
      .withExecutionContext(ec)
      .bindHttp(8080, "localhost")
      .withHttpApp(Router("/" -> helloWorldRoutes).orNotFound)
      .resource
      .use { _ => IO.never }
      .as(ExitCode.Success)