plokhotnyuk / jsoniter-scala

Scala macros for compile-time generation of safe and ultra-fast JSON codecs + circe booster
MIT License
745 stars 99 forks source link

Parallel Processing with JSON Lines files #1174

Open yyaarix opened 3 months ago

yyaarix commented 3 months ago

Hi,

I'm writing a stream processing application that reads files from S3, gzips them, and processes the data. The files are in JSON lines format, meaning each line in the file is a separate JSON object that I need to parse.

Currently, I decode each line into a string and parse each line in parallel. However, this approach is inefficient in terms of memory (creating many string objects results in significant GC pressure) and CPU usage.

I would like to explore the option of using scanJsonValuesFromStream but I'm unsure how to parallelize the work similarly to splitting the lines after decoding to a string (i.e., parsing each line of the file in parallel).

I've read the thread in this issue, which seems to have a somewhat similar format, but I couldn't figure out how to further parallelize the work.

Additionally, I am uncertain about the appropriate values for preferredBufSize and preferredCharBufSize.

For context, I am using ZIO Streams for stream processing.

Any help would be much appreciated.

Thanks!

plokhotnyuk commented 3 months ago

@yyaarix Hi Yuval!

To suggest the most optimal way to speed up the whole process or parsing only I need to know much more about your particular case.

A great example would be 1BRC were solutions that are based on knowledge about possible input smashed out generic ones.

In your case I would start from measurements of how much time and CPU cycles spent on each phase of your process: reading from S3, unzipping, parsing to instances in memory and further processing of them.

As an example, contemporary CPU and JDKs are able to parse JSON to case class instances in GB/sec speed (gigabytes, not gigabits) and if speed of downloading, unzipping and handling of parsed objects is slower than parsing then parallelizing of parsing will not help you a lot, please see Universal Scalability Law.

yyaarix commented 3 months ago

Thanks @plokhotnyuk

My use case is quite simple as you wrote.

  1. get notifications of new s3 objects
  2. fetch s3 objects
  3. unzip
  4. process json lines

in pseudo code it looks something like:

StreamOfS3Notifications
  .mapZioPar(fetchs3Object.via(unzip).via(toUtf8).via(splitLines).collect) //fetchs3Object returns a stream of bytes
  .mapZioPar(parseLinesInPar) //use readFromString

In this case, JSON parsing a file of 100K+ lines takes around 100 ms, while the unzipping + UTF-8 decoding + splitting lines takes ~80 ms.

StreamOfS3Notifications
  .mapZioPar(fetchs3Object.via(unzip).toInputStream) //fetchs3Object returns a stream of bytes which is transformed to an input stream
  .mapZioPar(parseInputStream) //use scanJsonValuesFromStream 

Here, JSON parsing the same file takes ~1 second, while the unzipping only takes <1 ms (so I might be able to get to the JSON parsing stage quickly and thus still exhaust the CPU, as you suggested).

I still need to check the S3 latency, which requires me to change the code quite a bit, as I need to deploy it to a server that runs on AWS It's cumbersome to separate the file/S3 reading from the unzipping and lines splitting as it is all done on the same stream of bytes so ive figured running the test on local file should be okish for this stage of my tests.

All in all, it seems like the total latency is improved when not UTF-8 decoding and splitting, but I was wondering if I can improve the JSON parsing even further, as I know each line is a separate object and they can all be parsed in parallel.

Thanks!

plokhotnyuk commented 3 months ago

@yyaarix Do you have an ability to create and publish some project where you will unzip, parse and handle parsed data structures using realistic samples that reproduces your challenges?

If no then try to use async-profiler in CPU Cycles and Wall Clock modes (-e cpu and -e wall) to spot the source of problem.

yyaarix commented 3 months ago

@plokhotnyuk im not sure there is an issue, it is expected that if i scan the file sequentially it will be slower to parse then if i parse each json line in parallel. The question is if the latter is possible without decoding the bytes to strings?

plokhotnyuk commented 3 months ago

Yes, parsing from arrays of bytes is faster then from strings. Use parsing from strings only for testing or evaluation.