mjakubowski84 / parquet4s

Read and write Parquet in Scala. Use Scala classes as schema. No need to start a cluster.
https://mjakubowski84.github.io/parquet4s/
MIT License
283 stars 66 forks source link

Feature request: Expose partitions as a `Stream[F, Stream[F, Record]]` for FS2 #336

Closed calvinlfer closed 6 months ago

calvinlfer commented 7 months ago

Hello there,

Thank you for this immensely useful library as it prevents us from having to reach for Spark. I would like to request exposing partitions as a stream of streams so one could execute transformations in each partition in parallel. I already see that you have this capability via readPartitioned in reader.scala but the read function, sequentially flatmaps over each partition and thereby we lose the ability to parallelize in each partition. Is it possible/feasible to offer the variant where each partition is exposed as an inner stream because that would allow us to exploit parallelism in a finer-grained way.

Update: We have done some experiments by adding this feature to parquet4s and have noticed an immense speedup on our control dataset. We went from 1m 52s with the current (sequential) version to 24s with the parallel version. I'll submit a PR with the changes.

Thank you!

mjakubowski84 commented 6 months ago

Hi @calvinlfer,

Thank you for the contribution and a donation! Sorry for not getting back to you sooner, but you have caught me on vacation.

As you have already noticed, the code has undergone some refactoring, enabling parallel reading of individual files or partitions - as well as other improvements I started working on but notoriously have no time to finish.

Parallel reading that you proposed is on my list, and I am happy to see your PR. However, I have a suggestion, and I would like to know your opinion. Parquet4s also contains the Akka/Pekko module, and I would like to keep the API consistency and feature parity between all modules. Therefore, I imagined adding a new optional parameter of parallelism(N) (defaulting to 1) and implementing the parallel reading of up to N files. Internally, that would be code such as yours in the case of FS2, which is a stream of streams but par-joined with an upper bound. In the case of Akka/Pekko - that would be parallel substreams. Except for providing a consistent abstraction for multiple library modules, such an approach would prevent a situation when too many parallel connections are open, which can lead to performance degradation or depletion of resources.

calvinlfer commented 6 months ago

Hey @mjakubowski84! Thank you for this amazing library and I'm glad you are taking some well earned vacation.

I definitely agree with your suggestion. we should definitely have feature parity across both Akka/Pekko and FS2. I like the approach of parallelism(N) in the builder (which uses parJoin(N) for FS2 under the hood and the equivalent in Akka/Pekko land) as a high level API where read is an alias for readPar(1).

I would also advocate for exposing the Stream of Streams abstraction in FS2/equivalent in Akka for more more advanced users that want a handle to each file as an inner stream since parJoin variants will merge these thereby losing fine grained control.

mjakubowski84 commented 6 months ago

Released in 2.16.0

mjakubowski84 commented 6 months ago

@calvinlfer I've forgotten to ask... I mentioned you as a sponsor in Readme and the project page using your Github username and linking your Github profile. Would you prefer a different name and a link to a different page?

calvinlfer commented 6 months ago

Thank you for adding me! What you have is perfect 😄