softwaremill / sttp

The Scala HTTP client you always wanted!
https://sttp.softwaremill.com
Apache License 2.0
1.46k stars 309 forks source link

Add Ox integration: SSE & WebSockets? #2158

Closed adamw closed 5 months ago

adamw commented 6 months ago

The default backend is already fully usable for direct style Scala, along with streaming (via InputStream bodies), and web sockets (using the WebSocket's blocking .send() and .receive() methods).

However, what we are missing is integration with SSE, streaming websockets (sometimes its more convenient to work this way) and high-level streaming (mapping over byte chunks or lines, for example).

That's why it might reasonable to have an Ox integration module. In fact, this is a joint Ox+sttp-client issue, as it requires changes in both. Going from the end:

  1. for support high-level streaming operations (on the byte-chunk or line level), we could add some I/O capabilities to Ox's Source, such as two-way conversions between an InputStream and a Source[byte chunk]; between an InputStream and a Source[String] (lines); and finally writing such sources to files/reading from files. What remains to be determined here is what's a good representation of a byte chunk. A simple Array[Byte]? ByteBuffer? Pekko's BytString?
  2. for web sockets, we could provide a fairly simple conversion between a WebSocket and a (Source[WebSocketFrame], Sink[WebSocketFrame]
  3. finally, for SSE, I do have some code, but it might need polishing. And it would be great to include it in the integration module:
def parseSse(is: InputStream)(using Ox): Source[ServerSentEvent] =
  val chunks = StageCapacity.newChannel[Array[Byte]]
  fork {
    try
      repeatWhile {
        val a = new Array[Byte](1024)
        val r = is.read(a)
        if r == -1 then
          chunks.done()
          false
        else
          chunks.send(a.take(r))
          true
      }
    catch case t: Throwable => chunks.errorSafe(t)
  }

  chunks
    .mapStatefulConcat(() => Array.empty[Byte]) { case (buffer, nextChunk) =>
      @tailrec
      def splitChunksAtNewLine(buf: Array[Byte], chunk: Array[Byte], acc: Vector[Array[Byte]])
          : (Array[Byte], Vector[Array[Byte]]) =
        val newlineIdx = chunk.indexOf('\n')
        if newlineIdx == -1 then (buf ++ chunk, acc)
        else
          val (chunk1, chunk2) = chunk.splitAt(newlineIdx + 1)
          splitChunksAtNewLine(Array.empty[Byte], chunk2, acc :+ (buffer ++ chunk1))

      val (newBuffer, toEmit) = splitChunksAtNewLine(buffer, nextChunk, Vector.empty)

      (newBuffer, toEmit)
    }
    .mapAsView(new String(_))
    .mapStatefulConcat(() => Vector.empty[String]) { case (acc, el) =>
      if el.isBlank then (Vector.empty, Some(acc)) else (acc :+ el.dropRight(1), Nil)
    }
    .map(lines => ServerSentEvent.parse(lines.asInstanceOf[Vector[String]].toList))

Example usage:

@main def sseClient(): Unit =
  supervised {
    basicRequest
      .post(uri"http://localhost:51823/sse/echo3")
      .body("1234567890")
      .response(asInputStreamAlways { is =>
        parseSse(is).foreach(el => println(s"Got: $el"))
        ()
      })
      .send()
      .body
  }
lbialy commented 6 months ago

Wouldn't that be a asSSEAlways?

adamw commented 6 months ago

For all other streaming approaches we simply offer a SSE-parsing stream stage, e.g.: https://sttp.softwaremill.com/en/stable/backends/akka.html#server-sent-events

asSSEAlways is hard to do generically, as you don't know what's the stream type underneath. So it could be done, but would need to be backend-specific