astronomy-commons / hats

Hierarchical Progressive Survey Catalog
https://hats.readthedocs.io/
BSD 3-Clause "New" or "Revised" License
17 stars 5 forks source link

Partitioning column dtypes conflict with Pyarrow's handling of Hive partitioning #367

Open troyraen opened 2 weeks ago

troyraen commented 2 weeks ago

Bug report

Expected behavior

Call to pandas.read_parquet works without having to explicitly specify the partitioning. I expect that to work because it uses partitioning='hive' by default and hipscat/hats seems to use Hive partitioning.

Actual behavior

That call throws an error.

Minimal reproducible examples

import pandas as pd

# assuming we're in the hipscat-import root directory
small_sky_object_catalog = "tests/hipscat_import/data/small_sky_object_catalog"
ignore_prefixes = [".", "_", "catalog_info.json", "partition_info.csv", "point_map.fits", "provenance_info.json"]

# simplest call that I hope will work
pd.read_parquet(small_sky_object_catalog, ignore_prefixes=ignore_prefixes)

The above throws:

ArrowTypeError: Unable to merge: Field Norder has incompatible types: uint8 vs dictionary<values=int32, indices=int32, ordered=0>

The simplest way to make that call work without throwing an error is to tell it to ignore the partitioning:

pd.read_parquet(small_sky_object_catalog, ignore_prefixes=ignore_prefixes, partitioning=None)

The above is fine when users do not need/want to add a filter to the read. But if filters are wanted, which is likely to be necessary for large catalogs, the calls will be much more efficient when they include a filter on a recognized partition column(s).

The simplest call that results in pyarrow (which is used under the hood) actually recognizing the partitions is:

import pyarrow
import pyarrow.dataset

# NB: Npix will not be recognized as a partition even if it is included here. See issue #368 for details.
partitioning_fields = [
    pyarrow.field(name="Norder", type=pyarrow.uint8()), pyarrow.field(name="Dir", type=pyarrow.uint64())
]
partitioning = pyarrow.dataset.partitioning(schema=pyarrow.schema(partitioning_fields), flavor="hive")

pd.read_parquet(small_sky_object_catalog, ignore_prefixes=ignore_prefixes, partitioning=partitioning)

The efficiency gain is hard to demonstrate with small_sky_object_catalog so I did a test with the PanSTARRS catalog that is in S3. The call using partitioning=None took about 55 times longer. Here is a screenshot of the results:

Screenshot 2024-10-02 at 5 24 05 PM

To reproduce that, use:

import pyarrow.fs

panstarrs_basedir = "stpubdata/panstarrs/ps1/public/hipscat/otmo"
fs = pyarrow.fs.S3FileSystem(region="us-east-1", anonymous=True)
filters = [("Norder", "==", 5), ("Dir", "==", 10000), ("Npix", "==", 10013)]
# plus ignore_prefixes and partitioning from above

Why is this happening

I think what's happening under the hood is:

  1. Pyarrow doesn't expect a Hive-partitioned dataset to contain the partitioning columns in the files themselves. Upon read, it reconstructs those columns by parsing the file paths.
  2. When it finds the partitioning columns in the files it tries to merge with the reconstructed columns but trips over the data types and throws the error. It used dictionary types for the reconstructed columns (see Reading from Partitioned Datasets) but the files did not.

Possible solutions

  1. Use the expected dictionary data types for the partitioning columns when writing the files.
  2. Don't store the partitioning columns in the files themselves.
  3. Ask Apache to support non-dictionary types if that's how they are defined in the files.

Option 1 seems simplest because I'm guessing option 2 would get significant push back from folks who want the files to be able to stand alone. A drawback with either would be that, after the data is loaded, if the user wants to perform operations that require numeric types (+, -, etc.) on those columns they would have to convert them first. To me, that would be preferable to the current situation because the user intervention would be both easier (just df.astype({'Norder': int}) rather than full specification of the partitioning) and would be required far less of often.


Before submitting Please check the following: