zio / zio-http

A next-generation Scala framework for building scalable, correct, and efficient HTTP clients and servers
https://zio.dev/zio-http
Apache License 2.0
785 stars 396 forks source link

Endpoint API: Java ClassCastException when Using ZIO HTTP Streaming #2752

Closed khajavi closed 5 months ago

khajavi commented 6 months ago

Describe the bug Encountered a ClassCastException in ZIO HTTP's streaming endpoint

To Reproduce Steps to reproduce the behaviour:

  1. Run the following HTTP app:
import zio._
import zio.http._
import zio.http.endpoint.Endpoint
import zio.http.endpoint.EndpointMiddleware.None
import zio.schema.DeriveSchema
import zio.schema.codec.JsonCodec.schemaBasedBinaryCodec
import zio.stream.ZStream

object EndpointWithStreamingOutput extends ZIOAppDefault {
  case class Event(msg: String)

  object Event {
    implicit val schema = DeriveSchema.gen[Event]
  }

  val endpoint: Endpoint[Unit, Unit, ZNothing, ZStream[Any, Nothing, Event], None] =
    Endpoint(RoutePattern.GET / "events")
      .outStream[Event]

  val streamingHandler: Handler[Any, ZNothing, Unit, ZStream[Any, Nothing, Event]] =
    Handler.succeed(
      ZStream.from(Chunk(1, 2, 3).map(_.toString).map(Event(_)))
    )

  val app = endpoint.implement(streamingHandler).toHttpApp @@ Middleware.debug

  def run = Server.serve(app).provide(Server.default, Scope.default)

}
  1. run this curl command: curl -X 'GET' 'http://127.0.0.1:8080/events'

Expected behaviour I expect the curl method to return events properly

Current Behaviour The app throws the following exception:

timestamp=2024-03-29T11:06:33.973217Z level=INFO thread=#zio-fiber-4 message="Starting the server..." location=example.endpoint.EndpointWithStreamingOutput.run file=EndpointWithStreamingOutput.scala line=29
timestamp=2024-03-29T11:06:33.984464Z level=INFO thread=#zio-fiber-4 message="Server started" location=example.endpoint.EndpointWithStreamingOutput.run file=EndpointWithStreamingOutput.scala line=29
200 GET /events 80ms
Mar 29, 2024 3:36:38 PM io.netty.channel.DefaultChannelPipeline onUnhandledInboundException
WARNING: An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
java.lang.ClassCastException: class example.endpoint.EndpointWithStreamingOutput$Event cannot be cast to class java.lang.Byte (example.endpoint.EndpointWithStreamingOutput$Event is in unnamed module of loader 'app'; java.lang.Byte is in module java.base of loader 'bootstrap')
    at scala.runtime.BoxesRunTime.unboxToByte(BoxesRunTime.java:91)
    at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:80)
    at scala.Array$.slowcopy(Array.scala:86)
    at scala.Array$.copy(Array.scala:112)
    at zio.Chunk$Arr.toArray(Chunk.scala:1727)
    at zio.Chunk.toArray(Chunk.scala:1002)
    at zio.Chunk.toArray(Chunk.scala:873)
    at zio.http.netty.NettyBodyWriter$.$anonfun$writeAndFlush$14(NettyBodyWriter.scala:123)
    at zio.http.netty.NettyFutureExecutor$.$anonfun$make$1(NettyFutureExecutor.scala:67)
    at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:904)
    at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:890)
    at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:890)
    at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:967)
    at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:948)
    at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:967)
    at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:1024)
    at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:967)
    at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:890)
    at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:1024)
    at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:890)
    at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:890)
    at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:1024)
    at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:967)
    at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:890)
    at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:1024)
    at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:1024)
    at zio.internal.FiberRuntime.evaluateEffect(FiberRuntime.scala:381)
    at zio.internal.FiberRuntime.evaluateMessageWhileSuspended(FiberRuntime.scala:504)
    at zio.internal.FiberRuntime.drainQueueOnCurrentThread(FiberRuntime.scala:220)
    at zio.internal.FiberRuntime.run(FiberRuntime.scala:139)
    at zio.internal.ZScheduler$$anon$4.run(ZScheduler.scala:478)
guersam commented 5 months ago

Something similar happens with RC6 for Server Sent Events. (scastie)

guersam commented 5 months ago

I think there's a room for improvement in stream body encoding.

For example, maybe SSE deserves a dedicated HttpContentCodec rather than the hidden Schema comparison.

guersam commented 5 months ago

I think I've spotted the cause:

  1. Endpoint#outStream[A] summons an implicit HttpContentCodec (code)
  2. New HttpContentCodec is created using the existing schema, with application/json as the first candidate (code)
  3. The JSON codec is picked up
  4. The encoder checks if the default media type is binary (code)
  5. JSON is defined as a binary codec (code)
  6. The encoder unsafely casts the stream as byte-emitting without properly encoding it (code)
  7. NettyWriter crashes as Body.StreamBody assumes it contains a byte stream
guersam commented 5 months ago

application/json has been treated as binary since 2022, unlike the other JSON variants. I think it's just a mistake.

guersam commented 5 months ago

For SSE, I took a workaround with a custom codec that might be added to the upstream once there is a proper decoder implementation.

val binaryCodec: BinaryCodec[ServerSentEvent] =
  new BinaryCodec[ServerSentEvent] {
    override def encode(value: ServerSentEvent): Chunk[Byte] =
      Chunk.fromArray(value.encode.getBytes)

    override def streamEncoder: ZPipeline[Any, Nothing, ServerSentEvent, Byte] =
      ZPipeline.fromFunction(_.mapChunks(_.flatMap(encode)))

    override def decode(whole: Chunk[Byte]): Either[DecodeError, ServerSentEvent] =
      Left(DecodeError.UnsupportedSchema(ServerSentEvent.schema, "ServerSendEvent"))

    override def streamDecoder: ZPipeline[Any, DecodeError, Byte, ServerSentEvent] =
      ZPipeline.fromSink(ZSink.fail(DecodeError.UnsupportedSchema(ServerSentEvent.schema, "ServerSendEvent")))
  }

val httpContentCodec: HttpContentCodec[ServerSentEvent] =
  HttpContentCodec(
    ListMap(
      MediaType.text.`event-stream` -> binaryCodec
    ),
    ServerSentEvent.schema
  )

val httpCodec: HttpCodec[HttpCodecType.Content, ZStream[Any, Nothing, ServerSentEvent]] =
  HttpCodec.contentStream(httpContentCodec)