mjakubowski84 / parquet4s

Read and write Parquet in Scala. Use Scala classes as schema. No need to start a cluster.
https://mjakubowski84.github.io/parquet4s/
MIT License
283 stars 65 forks source link

[Question] get a listing of parquet files? #341

Closed calvinlfer closed 6 months ago

calvinlfer commented 9 months ago

Hello, it’s me again! I was wondering if it’s possible to get a listing of all parquet files (and their partition info) in a partitioned read?

To add some context: I have a use case for $work where I’m taking unstructured data from S3 that’s in partitioned parquet files and sinking them into Kafka topics after parsing the unstructured data into structured data. The input data itself doesn’t yield well at all to identifying duplicate data that could occur during failure scenarios (when machines go down when producing this data to Kafka) so I’m trying to use a combination of the partition information + parquet file + line number in the parquet file (using zipWithIndex) and attach all this information to each record so that downstream consumers can recognize this scheme and detect whether duplicate data is present and do something about it.

If I had this capability, I would be able to have the context of each file and turn each file into an fs2 stream and attach the relevant context (partition info and file info and the line info) and produce that into Kafka

is this something we can support? I would love to hear your thoughts and if there’s a better way to solve this.

mjakubowski84 commented 9 months ago

Hi! I would rather not add support for such a use case directly to the library. I just do not want the library to swell with functionality that would be rarely used but would require maintenance. However, maybe I could give you some hints as the topic sounds pretty familiar to me.

  1. The easiest solution - if you have control over data stored in S3 - add a UUID column to each unstructured record.
  2. Dynamically assign a unique ID based on the content of the file, e.g. partition information and the content of the unstructured data. I usually use a hash digest for deduplication of unstructured docs - check https://solr.apache.org/__root/docs.solr.apache.org/docs/9_4_1/solrj/org/apache/solr/common/util/Hash.html or similar.
  3. Use io api of Parquet4s to support your own solution. First, call https://github.com/mjakubowski84/parquet4s/blob/master/fs2/src/main/scala/com/github/mjakubowski84/parquet4s/parquet/io.scala#L55 to obtain paths and partitions. And then use https://github.com/mjakubowski84/parquet4s/blob/master/fs2/src/main/scala/com/github/mjakubowski84/parquet4s/parquet/reader.scala#L147 to iterate over the directory and read individual files. Check https://github.com/mjakubowski84/parquet4s/blob/master/fs2/src/main/scala/com/github/mjakubowski84/parquet4s/parquet/reader.scala#L307 for reference.
calvinlfer commented 9 months ago

Thanks @mjakubowski84, that makes sense! It seems like you have already provided all the building blocks to do this so I will be able to make progress. Thank you so much 🙏

mjakubowski84 commented 9 months ago

@calvinlfer I have a feeling that, as you are reading partitions, you might be interested in the latest release: https://github.com/mjakubowski84/parquet4s/releases/tag/v2.17.0. The internals and API of io package are changed. Old functions are deprecated and I recommend using new ones to benefit from better performance.

calvinlfer commented 9 months ago

This is great! I am reading partitions and this will help a lot, thank you so much 🙏 You are really doing phenomenal work!