plokhotnyuk / jsoniter-scala

Scala macros for compile-time generation of safe and ultra-fast JSON codecs + circe booster
MIT License
734 stars 97 forks source link

[Request] Supported fs2 integration #296

Open Daenyth opened 5 years ago

Daenyth commented 5 years ago

I looked at the http4s example, but it appears to read all inputs into memory rather than streaming.

This appears to work so far, but it would be nice to have official support that is maintained over time (and hopefully improved!).

import cats.effect.implicits._
import cats.effect.{ConcurrentEffect, ContextShift, IO, Sync}
import cats.implicits._
import com.github.plokhotnyuk.jsoniter_scala.core.{
  JsonValueCodec,
  scanJsonValuesFromStream,
  writeToArray
}
import com.github.plokhotnyuk.jsoniter_scala.macros.{
  CodecMakerConfig,
  JsonCodecMaker
}
import fs2.concurrent.Queue
import fs2.{Chunk, Pipe, Stream}
import org.scalatest.{FlatSpec, Matchers}

import scala.concurrent.ExecutionContext

class JsoniterSpec extends FlatSpec with Matchers {
  import ExecutionContext.Implicits.global
  implicit val CS: ContextShift[IO] = IO.contextShift(global)

  behavior of "jsoniter"

  it should "parse streaming values" in {
    case class Foo(name: String, num: Int)
    implicit val codec: JsonValueCodec[Foo] =
      JsonCodecMaker.make[Foo](CodecMakerConfig())

    val input = Stream(Foo("fred", 1), Foo("wilma", 2))
    val bytes = input
      .map(writeToArray(_))
      .flatMap(bs => Stream.chunk(Chunk.bytes(bs)))
      .covary[IO]

    val results = bytes.through(parse[IO, Foo](global))

    results.compile.toList
      .map(result => result shouldEqual input.toList)
      .unsafeRunSync()
  }

  def parse[F[_]: ConcurrentEffect: ContextShift, A: JsonValueCodec](
      blockingExecutionContext: ExecutionContext,
      maxBuffered: Int = 1,
      shouldParseNext: A => Boolean = (_: A) => true
  ): Pipe[F, Byte, A] = { in =>
    Stream.eval(Queue.boundedNoneTerminated[F, A](maxBuffered)).flatMap { q =>
      in.through(fs2.io.toInputStream[F]).flatMap { inputStream =>
        def eachA(a: A): Boolean =
          q.enqueue1(Some(a)).as(shouldParseNext(a)).toIO.unsafeRunSync()

        val parseBytes = ContextShift[F].evalOn(blockingExecutionContext) {
          Sync[F].delay(scanJsonValuesFromStream(inputStream)(eachA))
        } *> q.offer1(None)

        val emitValues = q.dequeue
        emitValues concurrently Stream.eval(parseBytes)
      }
    }
  }
}
plokhotnyuk commented 5 years ago

@Daenyth Hi, Gavin!

Thanks for the question!

If you are talking about an example from the TechEmpower benchmark then the usage of jsoniter-scala there is overkill. According to the profiler reports, it spends less than 0.5% of CPU in the writeToArray routine.

Using async-profiler you can attach to the http4s server and see clearly what is happening under a different workload. Possible for streaming of long lists of JSON values your parse method will be more suitable, but in any case efficiency should be measured.

Daenyth commented 5 years ago

I'm less concerned with CPU usage and more concerned with memory usage. I'm not necessarily suggesting that the streaming approach should be the only one, but rather that it would be good to have it available, for cases where the input stream might be too large to fit into memory. As far as I'm aware, the TechEmpower benchmark doesn't exercise massive streaming response bodies, but in practice (at least for me) it's not uncommon, especially if processing data from non-http sources, like a stored json file somewhere.

plokhotnyuk commented 5 years ago

Currently, jsoniter-scala doesn't guarantee limited usage of memory when parsing to an arbitrary data structure.

There are configuration options which can limit values of bitsets, size of maps or disallow recursive data structured for derived codecs but it is not enough to solve the problem in general case.

scanJsonValuesFromStream and scanJsonArrayFromStream routines were designed for cases when you need parse data from trusted sources, while for other cases (when parsing from a buffer) user can (and should) control the size of the input.

Daenyth commented 5 years ago

I'm not so concerned about malicious input, I just want to, in the happy path, not load the entire input data stream into memory at once. Are the InputStream-based methods not appropriate for that?

plokhotnyuk commented 5 years ago

@Daenyth you can use the InputStream-based methods for trusted input.

For better throughput pass ReaderConfig/WriterConfig to them with tuned preferred sizes of internal read/write buffers.

steven-lai commented 3 years ago

I think I have a similar use case. I have a file (in the GB ranges) where each line contains a complete JSON object of the same type (e.g. same case class) but each line can be of arbitrary length (e.g. it has a Map). I'd like to lazily read from the file but it doesn't seem like readFromStream supports this use case and scanJsonValuesFromStream is meant to read the entire source in one pass.

EDIT: I think we're meant to create more complicated consumers for scanJsonValuesFromStream to support the use case I described?

readFromStream(inputStream, ReaderConfig.withCheckForEndOfInput(false))
readFromStream(inputStream, ReaderConfig.withCheckForEndOfInput(false)) // Doesn't read the "2nd line"
plokhotnyuk commented 3 years ago

@steven-lai Hi Steven! Please open a separate issue If your case is not related to fs2 integration.

I'm happy to help you in finding the best solution. A good starting point would be samples of your input and examples of data structures that you use to handle them after parsing.

As an example, we can define a manually written codec that returns scala.Unit but accepts some call back function in a constructor to redirect parsed data structures to it. It will allow handling repetitive JSON structures that are nested in different ways (not just line-feed separated values or JSON arrays of values).