gnieh / fs2-data

streaming data parsing and transformation library
https://fs2-data.gnieh.org
Apache License 2.0
152 stars 27 forks source link

Streaming over http seems to be broken in v1.11 #624

Open thomasl8 opened 1 month ago

thomasl8 commented 1 month ago

Hello,

We use this library (the circe version) to stream a large JSON payload over HTTP.

It has been working great with version 1.10, however when we upgrade to version 1.11 the server seems to want to collect the entire payload before returning any data to the UI (so basically not streaming). It could be that we are using the library incorrectly for our purposes. If it makes a difference, we are using tapir as our HTTP library.

This is our code (some stuff renamed). Maybe we are doing something wrong? Any help will be much appreciated! Thanks.

          myService
            .getBigFs2Stream
            .through(ast.tokenize)
            .through(wrap.asTopLevelArray)
            .through(render.compact)
            .through(text.utf8.encode)
            .pure[IO]
satabin commented 1 month ago

Hey, thanks for reporting. This sounds like a real regression indeed. In 1.11.0, we changed the way rendering is done, and I may have introduced this regression.

Can you try to replace the rendering pipe line with

.through(fs2.data.text.render.pretty(0)(fs2.data.json.Token.compact))

And let me know if it fixes it?

satabin commented 1 month ago

I am investigating the issue, and added a test (see #627). It looks like the compact pipe emits chunk really early (only singletons for now, even), even if the input is a single big chunk.

Can I see how the resulting stream is used in tapir? Can you confirm that you still get several chunks right after the .through(render.compact)?

ybasket commented 1 month ago

I wrote a little test program that tries to come as close as possible to the original report, but without tAPIr:

import cats.effect.*
import fs2.*
import fs2.data.json.*
import io.circe.Json

import scala.concurrent.duration.*

object EndlessMain extends IOApp.Simple {

  override def run: IO[Unit] =
    countingJson
      .meteredStartImmediately(0.001.seconds)
      .through(ast.tokenize)
      .through(wrap.asTopLevelArray)
      .through(render.compact)
      // .through(text.utf8.encode)
      .evalMap(IO.println(_))
      .compile
      .drain

  def countingJson: Stream[IO, Json] =
    Stream.iterate(0)(_ + 1).map(Json.fromInt)
}

When run, it outputs immediately, like this:

[
0
,
1
,
2
,
3
,
4
,
5
,
6
,
7
,
8

And as the Stream is infinite by construction, it's guaranteed nothing is fully loaded into memory before output. So as @satabin pointed out, it seems worth looking into the tAPIr integration and ensure chunking is preserved.

satabin commented 5 days ago

Hi @thomasl8, did you have a chance to look into this issue?