Open asfimport opened 3 years ago
Weston Pace / @westonpace: Another potential fix could be to modify pyarrow.parquet.write_metadata. The function currently takes the table schema (which will have the partition columns) and the collected metadata (which do not). So it could add the columns from the table schema to the collected metadata.
Weston Pace / @westonpace: One question to consider also is whether we need to fix the behavior when use_legacy=True or require users to migrate away from legacy if they want this fix. Right now both fail, but it would be easier to fix use_legacy=False than the older code I think.
Ben Kietzman / @bkietz: arrow::dataset::FileWriter should probably have a partition_expression property (containing the partition information with which the file was written), which would probably help here (though not in the case of use_legacy=True)
Joris Van den Bossche / @jorisvandenbossche: I think the metadata object should reflect the file that is written. So if the file doesn't contain the partition columns, then also the metadata shouldn't contain them?
Joris Van den Bossche / @jorisvandenbossche:
It took me a while to understand the actual issue with partition columns not being included in the metadata, but so to get concrete, we currently have an example in the docs about how you can write a _metadata
file using the metadata_collector and write_metadata utilities (https://arrow.apache.org/docs/python/parquet.html#writing-metadata-and-common-medata-files)
# Write a dataset and collect metadata information of all written files
metadata_collector = []
pq.write_to_dataset(table, root_path, metadata_collector=metadata_collector)
# Write the ``_common_metadata`` parquet file without row groups statistics
pq.write_metadata(table.schema, root_path / '_common_metadata')
# Write the ``_metadata`` parquet file with row groups statistics of all files
pq.write_metadata(
table.schema, root_path / '_metadata',
metadata_collector=metadata_collector
)
Now, this example doesn't actually use a partitioned dataset. And once you add a partition_cols
argument to the write_to_dataset
call, the above snippet will fail (so it's a not very useful example ..).
For example:
import pathlib
root_path = pathlib.Path.cwd() / "test_metadata"
table = pa.table({'Month': [5, 5, 5, 5, 6, 6, 6, 6], 'Day': [1, 1, 2, 2, 1, 1, 2, 2], 'Temp': range(8)})
metadata_collector = []
pq.write_to_dataset(table, root_path, partition_cols=["Month", "Day"], metadata_collector=metadata_collector)
# Write the ``_metadata`` parquet file with row groups statistics of all files
pq.write_metadata(
table.schema, root_path / '_metadata',
metadata_collector=metadata_collector
)
gives:
---------------------------------------------------------------------------
RuntimeError Traceback (most recent call last)
<ipython-input-18-f3324e50d348> in <module>
6
7 # Write the ``_metadata`` parquet file with row groups statistics of all files
----> 8 pq.write_metadata(
9 table.schema, root_path / '_metadata',
10 metadata_collector=metadata_collector
~/scipy/repos/arrow/python/pyarrow/parquet.py in write_metadata(schema, where, metadata_collector, **kwargs)
2082 if metadata_collector is not None:
2083 # ParquetWriter doesn't expose the metadata until it's written. Write
-> 2084 # it and read it again.
2085 metadata = read_metadata(where)
2086 for m in metadata_collector:
~/scipy/repos/arrow/python/pyarrow/_parquet.pyx in pyarrow._parquet.FileMetaData.append_row_groups()
RuntimeError: AppendRowGroups requires equal schemas.
Because indeed the table.schema
passed to write_metadata
is not matching the schemas of the collected metadata (and thus those can't be appended)
Joris Van den Bossche / @jorisvandenbossche:
But so the question is: do we actually need to include the partition columns in the metadata, or do we need to update the example to work with a partitioned dataset by removing the partition columns from the table.schema passes to write_metadata
.
Comparing with what dask does, using the above example:
import dask.dataframe as dd
ddf = dd.from_pandas(table.to_pandas(), npartitions=1)
root_path2 = pathlib.Path.cwd() / "test_metadata_dask"
ddf.to_parquet(root_path2, partition_on=["Month", "Day"], engine="pyarrow")
meta = pq.read_metadata(root_path2 / "_metadata")
Here, the metadata also doesn't include the partition columns:
>>> meta = pq.read_metadata(root_path2 / "_metadata")
>>> meta.schema
<pyarrow._parquet.ParquetSchema object at 0x7f1459573ac0>
required group field_id=-1 schema {
optional int64 field_id=-1 Temp;
optional int64 field_id=-1 __null_dask_index__;
}
Joris Van den Bossche / @jorisvandenbossche: Sidenote: there is also the question whether we should drop partition columns at all from the written files for a partitioned dataset. Based on a previous conversation on the mailing list, it seems there are other systems that don't exclude those columns. At the time I opened an issue to check that we can read such datasets (with duplicate information between partitioning and file columns) -> ARROW-10347. But we should maybe also consider if we want to be able to write such datasets.
Weston Pace / @westonpace: If the intent is for the metadata object to only contain the columns written then that does make things simpler. I suppose the same question holds for common_metadata. Should common_metadata contain only the columns written? Or should it also contain the partition columns? Also, I think we need to support individual files having different metadata? That should probably be tested as part of this feature as well.
Joris Van den Bossche / @jorisvandenbossche:
I suppose the same question holds for common_metadata. Should common_metadata contain only the columns written? Or should it also contain the partition columns?
The _common_metadata file typically doesn't hold any information about file (no row group info) but just the schema, so the intent here is to reflect the schema of the "full" dataset (so including partitioning columns).
At least, that's what dask does. Continuing the example from above (reading the file written by dask):
>>> meta = pq.read_metadata(root_path2 / "_common_metadata")
>>> meta
<pyarrow._parquet.FileMetaData object at 0x7f1449286720>
created_by: parquet-cpp-arrow version 5.0.0-SNAPSHOT
num_columns: 4
num_rows: 0
num_row_groups: 0
format_version: 1.0
serialized_size: 2433
>>> meta.schema
<pyarrow._parquet.ParquetSchema object at 0x7f14368d5c80>
required group field_id=-1 schema {
optional int64 field_id=-1 Month;
optional int64 field_id=-1 Day;
optional int64 field_id=-1 Temp;
optional int64 field_id=-1 __null_dask_index__;
}
And to be clear, all this _metadata
and _common_metadata
is "implementation-defined", there is not a standard description or specification about what this is expected to do, there are just some implementations that use such files (spark in the past, dask still right now).
(some discussion about this in eg ARROW-1983)
Also, I think we need to support individual files having different metadata? That should probably be tested as part of this feature as well.
Currently, if you combine multiple Parquet FileMetadata objects into a single one (appending the row groups), we require matching schemas. There was some discussion about that when this restriction was introduced (the change was introduced in https://github.com/apache/arrow/pull/7180 (ARROW-8062), some discussion of it on the dask side: https://github.com/dask/dask/issues/6243).
Personally I think it could be fine to have the restriction that if you want to use the _metadata
approach you can't have schema evolution (there are other "data lake management" tools that are more advanced than a metadata file).
Joris Van den Bossche / @jorisvandenbossche:
@westonpace I don't fully understand your updated title. In the doc example, the _common_metadata
and _metadata
files are written with two separate pq.write_metadata
calls, and so you can pass a different schema to each (eg table.schema
for _common_metadata
and table.select([.. all column except partitioning columns ..]).schema
for _metadata
).
Weston Pace / @westonpace:
I see now. It's all part of the single call to create the _metadata
file. So it is a matter of updating the example and not a problem with the call. I'll fix the title. Thanks for the help understanding.
If there are partition columns specified then the writers will only write the non-partition columns and thus they will not contain the fields used for the partition.
Reporter: Weston Pace / @westonpace
Related issues:
_metadata
file (is related to)PRs and other links:
Note: This issue was originally created as ARROW-13269. Please see the migration documentation for further details.