Open jcrist opened 2 years ago
cc'ing a few people who may be able to provide answers to some of the above: @rjzamora, @martindurant, @mariusvniekerk, @MrPowers, @hayesgb, @fjetter, @xhochy, @cpcloud, @adbreind. Feel free to ignore/unsubscribe.
Also, if anyone knows people who might be able to provide use case stats for the above, please forward this on to them.
Rick's out this week, but I believe he's back next week. Also shared offline with a few folks
I am far too biased to give details of the "real world" here, but I can give some insight on history and where the pain points have been that have let to the current structure of the code. I'll let users speak first, though, as those pain points may well be out of date.
I use dask to read partitioned parquet files (which I build using pyarrow). I generally work with many small parquet files of roughly the same size (order 100mb each) and read them from Azure blob storage. I use the metadata files and write my own custom metadata. Given the large number of files I have, it is much faster to load the data in parallel.
I can read the files using a dask distributed client as long as they're stored on a local filesystem, but when I try to read them from ADLS I get an exception. It seems to be an issue with pickling/unpickling the data from blob storage when using a distributed client (it works fine without the client, but it takes a long time to load the data). I'm not sure if this happens with other cloud filesystems as well. I wasn't able to find an existing issue for this, let me know if you'd like me to create one.
I wasn't able to find an existing issue for this, let me know if you'd like me to create one.
@mitchellwood yes, please! If you can provide a minimal reproducer and (ideally, but not necessary) a public sample dataset, that would be super helpful to try and reproduce the issue!
I wasn't able to find an existing issue for this, let me know if you'd like me to create one.
@mitchellwood yes, please! If you can provide a minimal reproducer and (ideally, but not necessary) a public sample dataset, that would be super helpful to try and reproduce the issue!
I can't provide a public URL for my AZ storage account because its a corporate account, but I can provide you with a sample partitioned dataset that reproduces the issue if you have access to your own Azure account?
I wasn't able to find an existing issue for this, let me know if you'd like me to create one.
@mitchellwood yes, please! If you can provide a minimal reproducer and (ideally, but not necessary) a public sample dataset, that would be super helpful to try and reproduce the issue!
Issue has been raised: #235
I've moved that issue over to dask/distributed#6060 as that is probably the right place for it to be resolved.
_For datasets you encounter, what tools were used to write those files (e.g. dask, pandsa, spark, ...)? Did you generate this data yourself, or did some other team/org/person generate it for you (i.e. how much control do you have over the dataset properties)?_
We use pyarrow to write to a dataset (use_legacy=False)
_Do you generally work with single large parquet files, or multiple files grouped into a dataset?_
Multiple files, group by at least 2-3 partition columns
_Are your files/datasets ever appended to/modified, or are they generated once and then read only?_
Yes ususally we merge daily files into monthly buckets into a consolidated.parquet. Else the files would be just around 1MB
_Where do your parquet datasets normally live (e.g. local filesystem, s3, ...)?_
Azure storage v2
_What is the average file size (in bytes) for your parquet files (order of magnitude is fine here)?_
30 MB, zipped parquet
_If using multiple files in a dataset, are all file roughly the same size or is there some skew to the distribution?_
Roughly the same size
If reading multiple files, how many files are common for your datasets?
Not sure what is referred to, but all files in a dataset we force to the same schema.
How many row groups are common per file? How large is a single row group (either number of rows, or some rough approximation for in-memory bytes)? You may find https://arrow.apache.org/docs/python/parquet.html#inspecting-the-parquet-file-metadata useful for determining this information.
These are from our most commonly used datasets. The first one is a consolidated transaction file partitioned on a monthly basis. The second file is a daily snapshot of a database table. Quite some columns.
created_by: parquet-cpp-arrow version 6.0.1
num_columns: 58
num_rows: 237959
num_row_groups: 1
format_version: 1.0
serialized_size: 28912
<pyarrow._parquet.FileMetaData object at 0x0000020FE7CBA360>
created_by: parquet-cpp-arrow version 6.0.1
num_columns: 266
num_rows: 418167
num_row_groups: 1
format_version: 1.0
serialized_size: 149793
_How many columns are common per file?_
50-300 depending on the dataset
_When reading multiple files, do all files usually have the same schema, or do the schemas differ per file?_
Same schema
_Do you knowingly make use of a _metadata or _common_metadata file for your datasets?_
No performance was slower with dask when we tried with metadata (1y ago), compared to using pyarrow to read a dataset with filters. So today we only use dask in specific use cases.
Is there anything else about your parquet usage that you think is relevant to us?
Let me know if you want more info, happy to help since dask is such an great library. Mostly I use dask delayed functions paired together with pyarrow read dataset. Which is an awesome combo.
30 MB, zipped parquet
Meaning filename.parquet.zip
or using pyarrow's gzip compression? If former, any particular reason for preferring it over latter?
Sorry, pyarrows gzip compression! Still having the name filename.parquet
I took the 5GB h2o groupby CSV dataset and wrote it out as 5 Parquet files with Dask & PySpark to compare the characteristics of the Parquet files that are generated.
Dask outputs these files:
_common_metadata
_metadata
part.0.parquet
part.1.parquet
...
PySpark outputs these files:
_SUCCESS
part-00000-c0fc14dc-8165-4db1-941e-9df62834f14c-c000.snappy.parquet
part-00001-c0fc14dc-8165-4db1-941e-9df62834f14c-c000.snappy.parquet
...
The Spark generated files are 406.3 MB whereas the Dask generated files are 460.1 MB... ???
Let's use PyArrow to inspect the contents of a file generated by Dask and a file generated by PySpark to investigate the differences.
Dask Parquet file
parquet_file = pq.ParquetFile('../tmp/mrpowers-h2o/groupby-1e7/parquet-dask/part.0.parquet')
parquet_file.metadata
<pyarrow._parquet.FileMetaData object at 0x16b19b270>
created_by: parquet-cpp-arrow version 6.0.1
num_columns: 10
num_rows: 20000000
num_row_groups: 1
format_version: 1.0
serialized_size: 5299
parquet_file.metadata.row_group(0)
<pyarrow._parquet.RowGroupMetaData object at 0x16b1ed450>
num_columns: 10
num_rows: 20000000
total_byte_size: 728526800
parquet_file.schema
<pyarrow._parquet.ParquetSchema object at 0x16b1fa900>
required group field_id=-1 schema {
optional binary field_id=-1 id1 (String);
optional binary field_id=-1 id2 (String);
optional binary field_id=-1 id3 (String);
optional int64 field_id=-1 id4;
optional int64 field_id=-1 id5;
optional int64 field_id=-1 id6;
optional int64 field_id=-1 v1;
optional int64 field_id=-1 v2;
optional double field_id=-1 v3;
optional int64 field_id=-1 __null_dask_index__;
}
PySpark generated Parquet file:
parquet_file = pq.ParquetFile('../tmp/mrpowers-h2o/groupby-1e7/parquet-pyspark/part-00000-c0fc14dc-8165-4db1-941e-9df62834f14c-c000.snappy.parquet')
parquet_file.metadata
<pyarrow._parquet.FileMetaData object at 0x10ca928b0>
created_by: parquet-mr version 1.12.1 (build 2a5c06c58fa987f85aa22170be14d927d5ff6e7d)
num_columns: 9
num_rows: 20000000
num_row_groups: 4
format_version: 1.0
serialized_size: 4926
parquet_file.metadata.row_group(0)
<pyarrow._parquet.RowGroupMetaData object at 0x10c9dd220>
num_columns: 9
num_rows: 6610100
total_byte_size: 196985924
parquet_file.schema
<pyarrow._parquet.ParquetSchema object at 0x16b1bd200>
required group field_id=-1 spark_schema {
optional binary field_id=-1 id1 (String);
optional binary field_id=-1 id2 (String);
optional binary field_id=-1 id3 (String);
optional int32 field_id=-1 id4;
optional int32 field_id=-1 id5;
optional int32 field_id=-1 id6;
optional int32 field_id=-1 v1;
optional int32 field_id=-1 v2;
optional double field_id=-1 v3;
}
Key differences:
Here's a pull request in case you'd like to replicate these computations on your local machine. I wrote a blog post on fetching Parquet file metadata with PyArrow that you might also find useful.
We host several parquet datasets on the Planetary Computer. All the datasets are can be accessed anonymously. They're best accessed from the Azure West Europe region, which is where the data lives. Ping me if an account on the Planetary Computer Hub would be helpful for benchmarking / testing.
A bunch of links to more metadata, examples. Happy to go into specifics as desired.
@o-smirnov, @jskenyon and @bennahugo might have an interesting use case here.
@TomAugspurger Thanks for all that info! A few specific questions:
- If the single file gets above a few hundred MB then I prefer to split it.
Insofar as this is a personal opinion, can you talk about how you arrived at that opinion? What tradeoffs are there in this space that you're aware of? I'm a total n00b with Parquet, so trying to learn what I can!
- We do have some datasets (e.g. GBIF) that are updated monthly. We just write a brand-new dataset rather than try to update inplace on blob storage
When you write new datasets, do you have a "delete then write" workflow (like Dask with the overwrite
keyword) or "write then move" workflow?
- We have some datasets coming online that will be higher frequency (daily or weekly)
Do you know whether you're planning to continue the previous workflow or switch to appending for these data?
Insofar as this is a personal opinion, can you talk about how you arrived at that opinion? What tradeoffs are there in this space that you're aware of? I'm a total n00b with Parquet, so trying to learn what I can!
I'm also not super familiar with parquet, so that's mainly informed by Dask / pandas (too large of chunks is bad for pandas, too small of chunks is bad for Dask. Somewhere in the 100 MB - few GB range is generally OK)
When you write new datasets, do you have a "delete then write" workflow (like Dask with the overwrite keyword) or "write then move" workflow?
We write them to a new directory (2022-01-01/, 2022-02-02/, ...).
For the highest frequency data, which will be updated daily, we're investigating how best to do it. I think the plan right now is to only rewrite the last partition.
Simply partitioning the files by day wouldn't be great: that'd lead to too many / too small files for Dask to process efficiently. So we'll roll them up somehow (weekly?, monthly? whatever gives a good partition size). The first day of the new period would write the new partition. Subsequent days would update that partition (either by appending, or by reading and rewriting with the old + new data concatenated). Either way, only the latest partition is being touched (and whatever global metadata we need to update). The older files won't be touched. That's the plan anyway.
@bryanwweber - The small file problem is one of the many common problems of managing a plain vanilla Parquet data lake. I'll write a blog post to fully flesh out the common issues. Ben from NVIDIA also gave a really good overview of some of these issues at the last PyData Global conference.
Small files make reads slower. Compacting small files is problematic in plain vanilla Parquet lakes because there is duplicate data while the compaction process is taking place and downstream readers may incorrectly classify the newly compacted files as "new data". This is particularly common when you create downstream processes to watch for new files and periodically perform incremental updates.
The Delta Lake approach to solving the problem is to use a dataChange=True
flag when new data is added to the system and a dataChange=False
flag when small files are compacted into larger files. This lets downstream incremental update systems differentiate between newly added data and new files that are just compacted versions of existing data. Ideally we'd just give @TomAugspurger an interface to say "I want 100 MB files, compact the small ones every night", so that's not something that'd have to be managed manually.
This dovetails into the metadata
and global_metadata
file discussion. Delta Lake is a bunch of Parquet files & a transaction log. The transaction log is similar to the metadata
file, but contains other useful information as well. Once you use Delta Lake, you never really want to go back to a plain vanilla Parquet lake. Delta Lake solves all the common issues of managing a plain vanilla Parquet data lake. Delta Lake also solve the problem of scaling the metadata
file.
Thanks everyone for your answers so far. Keep 'em coming.
Just to clarify, while having a general discussion about the parquet usage/performance space may be interesting, what we're really interested in is collecting answers to the original 12 questions at the top of this issue. We're aware that people use parquet in all sorts of ways, but determining some common stats will be useful as we tune performance defaults.
The conda download numbers are posted in parquet files to s3 for the curious
My answer is based on my experience in my previous job. We built a specific library for this that helped us handle large scale datasets. It allowed us to index and to mutate entire datasets with an atomic operation (not full ACID since we didn't manage concurrent reads). See https://github.com/data-engineering-collective/plateau (It's similar to delta lake imo)
For datasets you encounter, what tools were used to write those files (e.g. dask, pandsa, spark, ...)? Did you generate this data yourself, or did some other team/org/person generate it for you (i.e. how much control do you have over the dataset properties)?
We always generated the datasets and files ourselves. The backend used Dask for job scheduling but the writer was custom logic + pyarrow. I was part of a team that generated these datasets and other teams consumed them. That caused some challenge for the actual properties of the files (partition + file size, indexing, etc.) but overall we optimized them for downstream users as good as we could.
Do you generally work with single large parquet files, or multiple files grouped into a dataset?
Always large scale datasets. I only used single files for micro benchmarking.
Are your files/datasets ever appended to/modified, or are they generated once and then read only?
Files are always WORM (write once read many) in my experience. Parquet itself isn't a mutable data format and if you try to change something within the file, the best approach is to rewrite the entire thing. Most dataset libraries (iceberg, plateau, delta lake, etc.) use soft deletes for this, i.e. write the new file with a new name, reference the new one, dereference the old one.
We started with immutable read-only dataset that were not allowed to be mutated but quickly realized that this introduces scalability limits. Apart from scalability there are also tons of workloads where you would like to append or replace data. Doing this consistently without corrupting your dataset (e.g. leave dangling zombie files after an exception) is a challenge.
"in-file" mutations were not common but happened (database style insert into operations)
Where do your parquet datasets normally live (e.g. local filesystem, s3, ...)?
Azure Blob Store
What is the average file size (in bytes) for your parquet files (order of magnitude is fine here)?
File sizes ranged from 20MB to 500MB, depending on how well the compression worked. We tried to shoot for 500MB in-memory partitions on average but went far larger for really big datasets
If using multiple files in a dataset, are all file roughly the same size or is there some skew to the distribution?
From my experience, there is always a skew in the distribution for the binary parquet files since the compression algorithms are quite sensitive to the distribution in your data (we achieved all kinds of compression rates from ~1.5 up to 1k).
In-memory was relatively homogeneous because this was part of our dataset creation logic. I would be surprised if this is "standard" since that requires quite some effort.
If your datasets are composed of multiple parquet files, how many files do you commonly have (order of magnitude is fine here)?
I think we typically truncated datasets after they reached 100k-200k files. "Properly" partitioned datasets rarely reached this size since we used larger partitions for very large datasets, if possible.
How many row groups are common per file? How large is a single row group (either number of rows, or some rough approximation for in-memory bytes)? You may find https://arrow.apache.org/docs/python/parquet.html#inspecting-the-parquet-file-metadata useful for determining this information.
We tried optimizing this a bunch of times and mostly arrived at about 50k rows. Everything below this typically gets killed by overhead.
How many columns are common per file?
Most of our datasets were relatively thin with <50 columns
When reading multiple files, do all files usually have the same schema, or do the schemas differ per file?
Do you knowingly make use of a _metadata or _common_metadata file for your datasets?
https://github.com/data-engineering-collective/plateau implemented a JSON (or msgpack) based "database" which can be thought of as a thinner _metadata
. We heavily used the _common_metadata
for all sorts of schema validation checks (e.g. are you allowed to append file X to the dataset or is this incompatible)
Is there anything else about your parquet usage that you think is relevant to us?
ColumnA=Foo/ColumnB=Bar/UUID.parquet
) but nowadays I would recommend against this, assuming you have some kind of indexing / table pruning mechanismcc @steffen-schroeder-by @crepererum (if either of you no longer works with parquet files, feel free to ignore)
Thanks for the ping. I'm still in the parquet file business (InfluxDB IOx, Apache DataFusion), but out of office this week. I'll have a look and will write up a longer answer next week.
For datasets you encounter, what tools were used to write those files (e.g. dask, pandsa, spark, ...)? Did you generate this data yourself, or did some other team/org/person generate it for you (i.e. how much control do you have over the dataset properties)?
My experience is informed by my previous job with data scientists. We were consumers of parquet datasets created by a data engineering team, and usually had minimal control over the properties of the datasets. The data engineering team preferred files written by spark, usually processed incrementally, and periodically optimized with delta lake
Do you generally work with single large parquet files, or multiple files grouped into a dataset?
Usually parquet datasets, processed incrementally.
Are your files/datasets ever appended to/modified, or are they generated once and then read only?
Appends and modifications were common.
Where do your parquet datasets normally live (e.g. local filesystem, s3, ...)?
Cloud storage. Primarily Azure Blob Storage
What is the average file size (in bytes) for your parquet files (order of magnitude is fine here)?
100-400MB
If using multiple files in a dataset, are all file roughly the same size or is there some skew to the distribution?
Very non-uniform, depending if the datasets were being optimized. Files as small as a few KB on disk were not uncommon. Likewise, we occasionally encountered files as large as a few GB.
If your datasets are composed of multiple parquet files, how many files do you commonly have (order of magnitude is fine here)?
10's to 1000's
How many row groups are common per file? How large is a single row group (either number of rows, or some rough approximation for in-memory bytes)? You may find https://arrow.apache.org/docs/python/parquet.html#inspecting-the-parquet-file-metadata useful for determining this information.
1-5
How many columns are common per file?
<50
When reading multiple files, do all files usually have the same schema, or do the schemas differ per file?
Differences were not uncommon
Do you knowingly make use of a _metadata or _common_metadata file for your datasets?
As a consumer,
_metadata
was not available. We wrote them by default, but occasionally had to disable due to dataset sizes.
Is there anything else about your parquet usage that you think is relevant to us?
Like @fjetter , we made use of filtering with Hive partitioning when moving to production-deployments to reduce the amount of data loaded. The downside was this could be very slow as the size of the dataset grew or the depth of the partitioning increased.
I would really like to use dask more often. Here's my answer.
I am working on InfluxDB IOx. The system is written in Rust, has multiple concurrent writers and readers (per dataset), and uses Apache Parquet to store payload data within some object store (S3, GCP, Azure Blob, file). We use Apache DataFusion for execution but use our own dataset format which uses PostgreSQL as a catalog storage.
With the risk of stating the obvious but also ignoring quite some complexity, let me quickly define what -- from my PoV -- what tools / parts are available to build a dataset (which are then mostly constrained by standards like Apache Hive, Delta Lake, Apache Iceberg):
_common_metadata
, Delta Lake uses a JSON-based transaction log, Iceberg uses a multi-level log based file structure, InfluxDB IOx uses a PostgreSQL database. For dataset formats that list files within the catalog, you might have parquet files that exist on disk/S3/... but are NOT part of the dataset.InfluxDB IOx writes the data, the payload is customer-generated by highly specific to the InfluxDB use case (currently time series data with field-level upsert semantics, will be extended in the future). Due to the upsert semantic which is partly resolved on read and due to the PostgreSQL-driven catalog we don't expect any external party to read the data directly. Instead we'll likely offer some export tool that converts the data to a standard Hive / Delta Lake / whatever format.
Multiple files.
Write-once, never overwrite. We use UUIDs to ensure that. Deletions are allowed but announced via the catalog with a timer, so that we can (with a reasonable certainty) ensure that readers don't use them when we actually prune the parquet file.
Local FS for testing/dev work, S3/GCP/Azure Blob for prod.
We're still tuning that, but we aim for at least 10MB, better 100MB.
We aim for a rather even distribution (with a few outliers on the lower end, max. size is soft-limited).
Depends on the customer, let's say...10K?
We're still working on that part. I think for 100MB files we'll have at least 10 row groups.
2 at least, maybe 10 for a normal file, but you can also have 100 or even 1000 in extreme cases.
They differ regarding the column set, but not regarding the column types. Absent columns are treated as NULL.
_metadata
or _common_metadata
file for your datasets?For the internal dataset not, because these files are not transaction-safe. For exported datasets we could create them if that makes consumption easier.
See "Datasets" section above.
Since it's 2022 and the community still hasn't settled on a single dataset format (also due to the fact that software stacks and usage patterns are wildly different), I would always recommend you to use some adapter/interface approach to datasets and implement it for a few common formats (like "single file", Hive, Delta Lake) and then grow from there. This is also what we'll aim for in Apache DataFusion where we actually have a very similar issue.
We (myself, @ian-r-rose, and @bryanwweber) are planning on doing some general work around dask & parquet in the coming weeks. Robustness, performance, and good default settings are all things we're thinking about.
As part of this effort, it would help us to get a better understanding of what's common for parquet files & datasets used in the real world. I suspect the answer to the following questions are different for different tools/orgs/users, but any information people can provide us about their real world experience would be helpful here.
Most of these questions have to do with metrics for various properties of individual files and datasets - even order-of-magnitude answers for these questions would be useful for us.
Questions:
For datasets you encounter, what tools were used to write those files (e.g. dask, pandsa, spark, ...)? Did you generate this data yourself, or did some other team/org/person generate it for you (i.e. how much control do you have over the dataset properties)?
Do you generally work with single large parquet files, or multiple files grouped into a dataset?
Are your files/datasets ever appended to/modified, or are they generated once and then read only?
Where do your parquet datasets normally live (e.g. local filesystem, s3, ...)?
What is the average file size (in bytes) for your parquet files (order of magnitude is fine here)?
If using multiple files in a dataset, are all file roughly the same size or is there some skew to the distribution?
If your datasets are composed of multiple parquet files, how many files do you commonly have (order of magnitude is fine here)?
How many row groups are common per file? How large is a single row group (either number of rows, or some rough approximation for in-memory bytes)? You may find https://arrow.apache.org/docs/python/parquet.html#inspecting-the-parquet-file-metadata useful for determining this information.
How many columns are common per file?
When reading multiple files, do all files usually have the same schema, or do the schemas differ per file?
Do you knowingly make use of a
_metadata
or_common_metadata
file for your datasets?Is there anything else about your parquet usage that you think is relevant to us?