apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.89k stars 4.27k forks source link

Implement parquetio for Go SDK #21525

Open damccorm opened 2 years ago

damccorm commented 2 years ago

The naive approach would be reading the whole parquet file into memory, because processing parquet files requires io.Seeker Or implement filesystem.go Interface to return io.ReadSeekCloser, but it would not be trivial for gcs

Imported from Jira BEAM-14304. Original Jira may contain additional context. Reported by: nguyennk92.

wizardist commented 1 year ago

Is it done given the #17347?

lostluck commented 1 month ago

Not technically. That implementation reads an entire file into memory first, which can be very hard to work with.

This issue talks about working with Parquet and Google Cloud Storage.

https://github.com/googleapis/google-cloud-go/issues/1124

The CDK also punted on adding a wrapper itself, pointing out that it's also problematic for both S3 and Azure storage.

https://github.com/google/go-cloud/issues/3142#issuecomment-1174483410

It is interesting that it doesn't seem that difficult to make something compatible, but it is very direct. See https://github.com/xitongsys/parquet-go-source which demonstrates convenience wrappers for each of GCS, AWS, and Azure.

(The parquet io uses parquet-go/source, on a fully-read in blob of bytes).


Basically, with the existing ParquetIO it wouldn't be hard to swap the internals with the https://github.com/xitongsys/parquet-go-source package, which is Apache 2.0 liscenced, which is handy for enabling uses. It does sort of prevent the division of labour we have.


The main issue with the existing Parquet IO is that it's not a splitting source, and like most of the filesinks in the SDK, not very compatible with streaming/windowing etc.

So, it can't naturally split a large file, and it can't gracefully write out windowed data as an output.

wizardist commented 1 month ago

The main issue with the existing Parquet IO is that it's not a splitting source, and like most of the filesinks in the SDK, not very compatible with streaming/windowing etc. So, it can't naturally split a large file, and it can't gracefully write out windowed data as an output.

Hmmm. I still have a branch where I implemented a custom Parquet IO driver with Restrictions implemented, which was, IIRC, enough for Dataflow to read them fairly quickly.

I needed to convert a 1TiB full of Parquet files to CSVs on GCS, and it worked fairly well for the job. That said, it did an fs.OpenRead() and io.ReadAll() on GCS, so it wouldn't satisfy the OP, understandably. 🥲 I didn't put effort into that because our files where fairly small to fit into worker memory – those were AWS Aurora snapshot Parquet files, there are many of them, but they are not large.

lostluck commented 1 month ago

Oh that's exciting! We do welcome contributions. PRs get more attention due to the automatic reviewer assignment, but do tag me for Go issues that need consultation.

Reading files all into memory isn't wrong BTW, it just needs to be a documented behavior for users to be able to reason about and plan for.