softwaremill / tapir

Rapid development of self-documenting APIs
https://tapir.softwaremill.com
Apache License 2.0
1.34k stars 405 forks source link

[BUG] Connection hangs with no useful logs when response zio stream fails #3540

Open maximskripnik opened 6 months ago

maximskripnik commented 6 months ago

Tapir version: 1.9.10

Scala version: 3.3.1

Connection hangs with no useful logs when response zio stream fails

When using zio streams to serve http response using ZioHttpInterpreter, the server does not properly handle stream failures. The client that sent request hangs, so apparently the server doesn't even close the connection. On top of that, server logs are full of Netty internal exceptions instead of the root cause exception that failed the stream. Additionally, elements from the stream preceding the failure are not sent in the response as well

How to reproduce?

Server code

This example spins up a server with one endpoint /test that returns a simple plain text stream that fails after two elements are emitted

// ThisBuild / scalaVersion := "3.3.1"

// libraryDependencies ++= Seq(
//  "com.softwaremill.sttp.tapir" %% "tapir-zio-http-server" % "1.9.10"
// )

package tapirsample

import sttp.capabilities.zio.ZioStreams
import sttp.tapir.CodecFormat
import sttp.tapir.Schema
import sttp.tapir.server.ziohttp.ZioHttpInterpreter
import sttp.tapir.ztapir.*
import zio.*
import zio.http.HttpApp
import zio.http.Server
import zio.stream.ZStream

import java.nio.charset.StandardCharsets

object Sample extends ZIOAppDefault {

  val badEndpoint: ZServerEndpoint[Any, ZioStreams] = endpoint
    .in("test")
    .get
    .out(
      streamBody(ZioStreams)(
        summon[Schema[Chunk[String]]],
        CodecFormat.TextPlain(),
        Some(StandardCharsets.UTF_8)
      )
    )
    .zServerLogic { _ =>
      val stream = ZStream("foo", "bar") ++ ZStream.fail(new RuntimeException("boom"))
      val byteStream = stream.mapConcat(_.getBytes)
      ZIO.succeed(byteStream)
    }

  val routes: HttpApp[Any] =
    ZioHttpInterpreter().toHttp(badEndpoint)

  override def run: ZIO[Any, Any, Any] =
    Server.serve(routes).provide(Server.defaultWithPort(9000))

}

Running the server

sbt run
[info] welcome to sbt 1.8.2 (Eclipse Adoptium Java 17.0.6)
[info] loading settings for project tapir-stream-issue-build-build from metals.sbt ...
[info] loading project definition from /Users/mskripnik/projects/tapir-stream-issue/project/project
[info] loading settings for project tapir-stream-issue-build from metals.sbt ...
[info] loading project definition from /Users/mskripnik/projects/tapir-stream-issue/project
[success] Generated .bloop/tapir-stream-issue-build.json
[success] Total time: 1 s, completed 28 Feb 2024, 11:27:59
[info] loading settings for project tapir-stream-issue from build.sbt ...
[info] set current project to tapir-stream-issue (in build file:/Users/mskripnik/projects/tapir-stream-issue/)
[info] running tapirsample.Sample

Sending the request

curl -v 'http://localhost:9000/test'
*   Trying [::1]:9000...
* Connected to localhost (::1) port 9000
> GET /test HTTP/1.1
> Host: localhost:9000
> User-Agent: curl/8.4.0
> Accept: */*
>
< HTTP/1.1 200 OK
< Content-Type: text/plain; charset=UTF-8
< transfer-encoding: chunked
<

You can observe two issues here:

Server logs

[info] running tapirsample.Sample
timestamp=2024-02-28T10:29:01.638818Z level=WARN thread=#zio-fiber-39 message="Fatal exception in Netty" cause="Exception in thread "zio-fiber-" io.netty.handler.codec.EncoderException: java.lang.IllegalStateException: unexpected message type: DefaultFullHttpResponse, state: 2
    at io.netty.handler.codec.http.HttpObjectEncoder.write(HttpObjectEncoder.java:108)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:881)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:863)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:968)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:856)
    at io.netty.channel.ChannelDuplexHandler.write(ChannelDuplexHandler.java:115)
    at io.netty.handler.codec.http.HttpServerKeepAliveHandler.write(HttpServerKeepAliveHandler.java:87)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:879)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:863)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:968)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:856)
    at io.netty.channel.ChannelDuplexHandler.write(ChannelDuplexHandler.java:115)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:879)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:940)
    at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1247)
    at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
    at io.netty.channel.kqueue.KQueueEventLoop.run(KQueueEventLoop.java:300)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:833)
    Suppressed: java.lang.IllegalStateException: unexpected message type: DefaultFullHttpResponse, state: 2
        at io.netty.handler.codec.http.HttpObjectEncoder.throwUnexpectedMessageTypeEx(HttpObjectEncoder.java:348)
        at io.netty.handler.codec.http.HttpObjectEncoder.encodeFullHttpMessage(HttpObjectEncoder.java:305)
        at io.netty.handler.codec.http.HttpObjectEncoder.encode(HttpObjectEncoder.java:162)
        at io.netty.handler.codec.http.HttpObjectEncoder.write(HttpObjectEncoder.java:97)
        at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:881)
        at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:863)
        at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:968)
        at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:856)
        at io.netty.channel.ChannelDuplexHandler.write(ChannelDuplexHandler.java:115)
        at io.netty.handler.codec.http.HttpServerKeepAliveHandler.write(HttpServerKeepAliveHandler.java:87)
        at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:879)
        at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:863)
        at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:968)
        at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:856)
        at io.netty.channel.ChannelDuplexHandler.write(ChannelDuplexHandler.java:115)
        at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:879)
        at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:940)
        at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1247)
        at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
        at io.netty.channel.kqueue.KQueueEventLoop.run(KQueueEventLoop.java:300)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:833)"
Feb 28, 2024 11:29:01 AM io.netty.channel.DefaultChannelPipeline onUnhandledInboundException
WARNING: An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
io.netty.handler.codec.EncoderException: java.lang.IllegalStateException: unexpected message type: DefaultFullHttpResponse, state: 2
    at io.netty.handler.codec.http.HttpObjectEncoder.write(HttpObjectEncoder.java:108)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:881)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:863)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:968)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:856)
    at io.netty.channel.ChannelDuplexHandler.write(ChannelDuplexHandler.java:115)
    at io.netty.handler.codec.http.HttpServerKeepAliveHandler.write(HttpServerKeepAliveHandler.java:87)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:879)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:863)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:968)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:856)
    at io.netty.channel.ChannelDuplexHandler.write(ChannelDuplexHandler.java:115)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:879)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:940)
    at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1247)
    at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
    at io.netty.channel.kqueue.KQueueEventLoop.run(KQueueEventLoop.java:300)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.IllegalStateException: unexpected message type: DefaultFullHttpResponse, state: 2
    at io.netty.handler.codec.http.HttpObjectEncoder.throwUnexpectedMessageTypeEx(HttpObjectEncoder.java:348)
    at io.netty.handler.codec.http.HttpObjectEncoder.encodeFullHttpMessage(HttpObjectEncoder.java:305)
    at io.netty.handler.codec.http.HttpObjectEncoder.encode(HttpObjectEncoder.java:162)
    at io.netty.handler.codec.http.HttpObjectEncoder.write(HttpObjectEncoder.java:97)
    ... 22 more

Meanwhile the server just spits this large pile of Netty internals. The big issue here is the lack of information about the throwable that failed the response stream (no 'boom' string seen anywhere in the logs)

Additional information

Expected behavior

The expected behavior in this case is that the server:

Working example

As a side note, this example shows that such behavior did exist in tapir v1.2.3

// ThisBuild / scalaVersion := "3.3.1"

// libraryDependencies ++= Seq(
//   "com.softwaremill.sttp.tapir" %% "tapir-zio-http-server" % "1.2.3"
// )

package tapirsample

import sttp.capabilities.zio.ZioStreams
import sttp.tapir.CodecFormat
import sttp.tapir.Schema
import sttp.tapir.server.ziohttp.ZioHttpInterpreter
import sttp.tapir.ztapir.*
import zio.*
import zio.http.HttpApp
import zio.http.Server
import zio.http.ServerConfig
import zio.stream.ZStream

import java.nio.charset.StandardCharsets

object Sample extends ZIOAppDefault {

  val badEndpoint: ZServerEndpoint[Any, ZioStreams] = endpoint
    .in("test")
    .get
    .out(
      streamBody(ZioStreams)(
        summon[Schema[Chunk[String]]],
        CodecFormat.TextPlain(),
        Some(StandardCharsets.UTF_8)
      )
    )
    .zServerLogic { _ =>
      val stream = ZStream("foo", "bar") ++ ZStream.fail(new RuntimeException("boom"))
      val byteStream = stream.mapConcat(_.getBytes)
      ZIO.succeed(byteStream)
    }

  val routes: HttpApp[Any, Throwable] =
    ZioHttpInterpreter().toHttp(badEndpoint)

  override def run: ZIO[Any, Any, Any] =
    Server.serve(routes).provide(Server.live, ZLayer.succeed(ServerConfig.default.port(9000)))

}
sbt run
[info] welcome to sbt 1.8.2 (Eclipse Adoptium Java 17.0.6)
[info] loading settings for project tapir-stream-issue-build-build from metals.sbt ...
[info] loading project definition from /Users/mskripnik/projects/tapir-stream-issue/project/project
[info] loading settings for project tapir-stream-issue-build from metals.sbt ...
[info] loading project definition from /Users/mskripnik/projects/tapir-stream-issue/project
[success] Generated .bloop/tapir-stream-issue-build.json
[success] Total time: 1 s, completed 28 Feb 2024, 11:40:27
[info] loading settings for project tapir-stream-issue from build.sbt ...
[info] set current project to tapir-stream-issue (in build file:/Users/mskripnik/projects/tapir-stream-issue/)
[info] compiling 1 Scala source to /Users/mskripnik/projects/tapir-stream-issue/target/scala-3.3.1/classes ...
[info] running tapirsample.Sample
curl -v 'http://localhost:9000/test'
*   Trying [::1]:9000...
* Connected to localhost (::1) port 9000
> GET /test HTTP/1.1
> Host: localhost:9000
> User-Agent: curl/8.4.0
> Accept: */*
>
< HTTP/1.1 200 OK
< Content-Type: text/plain; charset=UTF-8
< transfer-encoding: chunked
<
* transfer closed with outstanding read data remaining
* Closing connection
curl: (18) transfer closed with outstanding read data remaining
foobar%
[info] running tapirsample.Sample
[ERROR] KQueueEventLoopGroup-2-2 NettyRuntime HttpRuntimeException:Exception in thread "zio-fiber-" java.lang.RuntimeException: java.lang.RuntimeException: boom
Feb 28, 2024 11:40:36 AM io.netty.channel.DefaultChannelPipeline onUnhandledInboundException
WARNING: An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
java.lang.RuntimeException: boom
    at tapirsample.Sample$.$anonfun$1$$anonfun$1(Sample.scala:29)
    at zio.ZIO$.fail$$anonfun$1(ZIO.scala:3083)
    at zio.ZIO$.failCause$$anonfun$1(ZIO.scala:3089)
    at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:1126)
    at zio.internal.FiberRuntime.evaluateEffect(FiberRuntime.scala:384)
    at zio.internal.FiberRuntime.start(FiberRuntime.scala:1380)
    at zio.Runtime$UnsafeAPIV1.fork(Runtime.scala:155)
    at zio.Runtime$UnsafeAPIV1.fork(Runtime.scala:138)
    at zio.http.netty.NettyRuntime.run(NettyRuntime.scala:48)
    at zio.http.netty.NettyRuntime.run$(NettyRuntime.scala:11)
    at zio.http.netty.NettyRuntime$$anon$4.run(NettyRuntime.scala:112)
    at zio.http.netty.server.ServerInboundHandler.channelRead0(ServerInboundHandler.scala:200)
    at zio.http.netty.server.ServerInboundHandler.channelRead0(ServerInboundHandler.scala:32)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200)
    at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93)
    at io.netty.handler.codec.http.HttpServerKeepAliveHandler.channelRead(HttpServerKeepAliveHandler.java:64)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:336)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:308)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at io.netty.channel.kqueue.AbstractKQueueStreamChannel$KQueueStreamUnsafe.readReady(AbstractKQueueStreamChannel.java:544)
    at io.netty.channel.kqueue.AbstractKQueueChannel$AbstractKQueueUnsafe.readReady(AbstractKQueueChannel.java:383)
    at io.netty.channel.kqueue.KQueueEventLoop.processReady(KQueueEventLoop.java:213)
    at io.netty.channel.kqueue.KQueueEventLoop.run(KQueueEventLoop.java:291)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:833)

The logs above contain the information about the root cause exception, even though there is also not a very useful warning from Netty

maximskripnik commented 6 months ago

It seems like the issue is with zio-http, caused by this: https://github.com/zio/zio-http/issues/2584

It's already fixed with https://github.com/zio/zio-http/pull/2599, so I guess we just need to wait for the next zio-http release and update it in tapir

TrustNoOne commented 5 months ago

zio-http was released https://github.com/zio/zio-http/releases/tag/v3.0.0-RC5

adamw commented 5 months ago

@TrustNoOne yes, but it cointains snapshot transitive dependencies, so we cannot use it

adamw commented 5 months ago

See https://github.com/softwaremill/tapir/pull/3596

dacr commented 4 months ago

I've just checked, https://github.com/zio/zio-http/issues/2584 hang issues I had previously and coming from the ZIO side, are fixed (zio 2.0.22, tapir 1.10.5, zhttp-3.0.0-RC6), but now I have something wrong when tapir try to process a stream which returns an error.

I get 200 OK with no content :

< HTTP/1.1 200 OK
< Content-Type: application/json-seq
< transfer-encoding: chunked
< 
* transfer closed with outstanding read data remaining
* Closing connection 0

instead of getting (with the same inputs) :

< HTTP/1.1 400 Bad Request
< content-length: 305
< Content-Type: application/json
< 
* Connection #0 to host 127.0.0.1 left intact
{"querySyntaxError"...

I had two similar endpoints with the same entries, the first one returns a stream and the second one the result of a backend consumed stream (the same one).

If needed I will be able to write some snippets to reproduce the issue but not before 3 weeks unfortunately.

adamw commented 4 months ago

@dacr please do, I'm not aware of any similar issues

dacr commented 3 months ago

Consider the stream logic failingGreetingStream3 bound to helloEndPoint, it summaries the issue I have, and once written like this it also gives me some hints on what's going on. I try to build a ZIO Stream from a java stream which can fail, and while reading this I realized the issue is coming from my side and the way I'm building the stream :(

  val failingGreetingStream3: ZIO[Any, String, ZStream[Any, Throwable, Byte]] = ZIO.succeed(
    ZStream.fromJavaStreamZIO(
      ZIO.fail(Exception("Can not build the stream"))
    )
  )

  val helloEndPoint =
    endpoint
      .description("Returns greeting")
      .get
      .in("hello")
      .out(streamBody(ZioStreams)(Schema.derived[Greeting], JsonSeqCodecFormat()))
      .out(statusCode(StatusCode.Ok).description("query success"))
      .errorOutVariantPrepend(oneOfVariant(StatusCode.InternalServerError, plainBody[String]))

  val helloRoute = helloEndPoint.zServerLogic[Any](_ => failingGreetingStream3)

The full script :

// ---------------------
//> using scala "3.4.1"
//> using dep "com.softwaremill.sttp.tapir::tapir-zio:1.10.7"
//> using dep "com.softwaremill.sttp.tapir::tapir-zio-http-server:1.10.7"
//> using dep "com.softwaremill.sttp.tapir::tapir-json-zio:1.10.7"
// ---------------------

import sttp.tapir.ztapir.*
import sttp.tapir.server.ziohttp.ZioHttpInterpreter
import zio.*
import zio.stream.*
import zio.json.*
import zio.http.Server
import sttp.capabilities.zio.ZioStreams
import sttp.model.{MediaType, StatusCode}
import sttp.tapir.{CodecFormat, Schema}
import sttp.tapir.generic.auto.*
import sttp.tapir.json.zio.*
import sttp.tapir.ztapir.*

case class Greeting(message:String) derives JsonCodec

case class JsonSeqCodecFormat() extends CodecFormat {
  override val mediaType: MediaType = MediaType.unsafeApply("application", "json-seq")
}

object WebApp extends ZIOAppDefault {

  // --------------------------------------------------
  val greetingStream: ZIO[Any, String, ZStream[Any, Throwable, Byte]] = ZIO.succeed(
    ZStream
      .repeat(Greeting("Hello world"))
      .schedule(Schedule.spaced(1.second))
      .flatMap(greeting => ZStream.fromIterable( (greeting.toJson+"\n").getBytes ))
  )

  val failingGreetingStream1: ZIO[Any, String, ZStream[Any, Throwable, Byte]] = ZIO.fail(
    "Can not build the stream"
  )

  val failingGreetingStream2: ZIO[Any, String, ZStream[Any, Throwable, Byte]] = ZIO.succeed(
    ZStream.fail(Exception("Can not build the stream"))
  )

  val failingGreetingStream3: ZIO[Any, String, ZStream[Any, Throwable, Byte]] = ZIO.succeed(
    ZStream.fromJavaStreamZIO(
      ZIO.fail(Exception("Can not build the stream"))
    )
  )

  val helloEndPoint =
    endpoint
      .description("Returns greeting")
      .get
      .in("hello")
      .out(streamBody(ZioStreams)(Schema.derived[Greeting], JsonSeqCodecFormat()))
      .out(statusCode(StatusCode.Ok).description("query success"))
      .errorOutVariantPrepend(oneOfVariant(StatusCode.InternalServerError, plainBody[String]))

  //val helloRoute = helloEndPoint.zServerLogic[Any](_ => greetingStream)
  val helloRoute = helloEndPoint.zServerLogic[Any](_ => failingGreetingStream3)

  val routes = ZioHttpInterpreter().toHttp(List(helloRoute))
  override def run = Server.serve(routes).provide(Server.default)
}

WebApp.main(Array.empty)
adamw commented 3 months ago

@dacr does this work when using zio-http directly, without tapir?