astronomy-commons / hipscat-import

HiPSCat import - generate HiPSCat-partitioned catalogs
https://hipscat-import.readthedocs.io
BSD 3-Clause "New" or "Revised" License
5 stars 3 forks source link

Complex arrow columns cause an error during the reducing stage #351

Open hombit opened 1 month ago

hombit commented 1 month ago

Bug report

I implemented a custom reader for HSC PDR2 data, which converts a FITS bool-array to pa.list_(pa.bool_(), 274).

On the reducing stage this ends with an error from pyarrow-pandas conversion layer, originated from hipscat_import/catalog/map_reduce.py:287, in reduce_pixel_shards(), where we convert merged pyarrow table to pandas.

I tried to reproduce this problem with a simple pyarrow table including fixed-list column, but this code does work:

import pandas as pd
import pyarrow as pa

a = pa.array([1, 2, 3])
b = pa.array([[True, False, True]] * 3, type=pa.list_(pa.bool_(), 3))
table = pa.Table.from_arrays([a, b], names=['a', 'b'])
df = table.to_pandas(types_mapper=pd.ArrowDtype)

Note this types_mapper argument here, it works even without this, but produces object dtype, I believe that we also should use it, but it may be another issue. I also tried to modify reduce_pixel_shards() to use this argument, but it produces the same error.

Details are under the spoilers:

Reader implementation ```python import numpy as np import pandas as pd import pyarrow as pa from astropy.table import Table from hipscat_import.catalog.file_readers import FitsReader class HSCFitsReader(FitsReader): """FITS reader that converts ra and dec from radians to degrees""" def __init__(self, *args, ra_column, dec_column, **kwargs): super().__init__(*args, **kwargs) self.ra_column = ra_column self.dec_column = dec_column def read(self, input_file, read_columns=None): # Mostly copy-pasted from the hipscat-import implementation self.regular_file_exists(input_file, **self.kwargs) table = Table.read(input_file, memmap=True, **self.kwargs) if read_columns: table.keep_columns(read_columns) elif self.column_names: table.keep_columns(self.column_names) elif self.skip_column_names: table.remove_columns(self.skip_column_names) total_rows = len(table) read_rows = 0 while read_rows < total_rows: chunk = table[read_rows : read_rows + self.chunksize] df_chunk = self.astropy_table_to_df(chunk) yield df_chunk read_rows += self.chunksize def astropy_table_to_df(self, table): """Data convertion, spoils input table""" # Flags wouldn't convert to pandas due to astropy's implementation limitations # So we convert them manually and delete from the table if 'flags' in table.columns: flags = table['flags'] flags_length = flags.shape[1] flags_pyarrow = pa.array(flags.tolist(), type=pa.list_(pa.bool_(), flags_length)) table.remove_column('flags') df = table.to_pandas() # Convert coords from radians to degrees df[self.ra_column] = np.degrees(df[self.ra_column]) df[self.dec_column] = np.degrees(df[self.dec_column]) # Assign flags if 'flags' in table.columns: df['flags'] = pd.Series(flags_pyarrow, dtype=pd.ArrowDtype(flags_pyarrow.type)) return df ```
Traceback ``` --------------------------------------------------------------------------- KeyError Traceback (most recent call last) File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pyarrow/types.pxi:5284, in pyarrow.lib.type_for_alias() 5283 try: -> 5284 alias = _type_aliases[name] 5285 except KeyError: KeyError: 'fixed_size_list[274]' During handling of the above exception, another exception occurred: ValueError Traceback (most recent call last) File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pandas/core/dtypes/dtypes.py:2251, in construct_from_string() 2250 try: -> 2251 pa_dtype = pa.type_for_alias(base_type) 2252 except ValueError as err: File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pyarrow/types.pxi:5286, in pyarrow.lib.type_for_alias() 5285 except KeyError: -> 5286 raise ValueError('No type alias for {0}'.format(name)) 5287 ValueError: No type alias for fixed_size_list[274] The above exception was the direct cause of the following exception: NotImplementedError Traceback (most recent call last) Cell In[3], line 29 25 with Client(n_workers=64) as client: 26 # import dask 27 # with dask.config.set(scheduler='single-threaded'), Client(processes=False, threads_per_worker=1, n_workers=1) as client: 28 display(client) ---> 29 pipeline_with_client(args, client) File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/hipscat_import/pipeline.py:60, in pipeline_with_client(args, client) 58 if args.completion_email_address: 59 _send_failure_email(args, exception) ---> 60 raise exception File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/hipscat_import/pipeline.py:44, in pipeline_with_client(args, client) 41 raise ValueError("args is required and should be subclass of RuntimeArguments") 43 if isinstance(args, ImportArguments): ---> 44 catalog_runner.run(args, client) 45 elif isinstance(args, IndexArguments): 46 index_runner.run(args, client) File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/hipscat_import/catalog/run_import.py:111, in run(args, client) 85 for ( 86 destination_pixel, 87 source_pixel_count, 88 destination_pixel_key, 89 ) in args.resume_plan.get_reduce_items(): 90 futures.append( 91 client.submit( 92 mr.reduce_pixel_shards, (...) 108 ) 109 ) --> 111 args.resume_plan.wait_for_reducing(futures) 113 # All done - write out the metadata 114 with args.resume_plan.print_progress(total=5, stage_name="Finishing") as step_progress: File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/hipscat_import/catalog/resume_plan.py:298, in ResumePlan.wait_for_reducing(self, futures) 296 def wait_for_reducing(self, futures): 297 """Wait for reducing futures to complete.""" --> 298 self.wait_for_futures(futures, self.REDUCING_STAGE, fail_fast=True) 299 remaining_reduce_items = self.get_reduce_items() 300 if len(remaining_reduce_items) > 0: File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/hipscat_import/pipeline_resume_plan.py:143, in PipelineResumePlan.wait_for_futures(self, futures, stage_name, fail_fast) 141 some_error = True 142 if fail_fast: --> 143 raise future.exception() 145 if some_error: 146 raise RuntimeError(f"Some {stage_name} stages failed. See logs for details.") File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/distributed/worker.py:3000, in apply_function_simple() 2995 with ( 2996 context_meter.meter("thread-noncpu", func=time) as m, 2997 context_meter.meter("thread-cpu", func=thread_time), 2998 ): 2999 try: -> 3000 result = function(*args, **kwargs) 3001 except (SystemExit, KeyboardInterrupt): 3002 # Special-case these, just like asyncio does all over the place. They will 3003 # pass through `fail_hard` and `_handle_stimulus_from_task`, and eventually (...) 3006 # Any other `BaseException` types would ultimately be ignored by asyncio if 3007 # raised here, after messing up the worker state machine along their way. 3008 raise File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/hipscat_import/catalog/map_reduce.py:331, in reduce_pixel_shards() 326 except Exception as exception: # pylint: disable=broad-exception-caught 327 print_task_failure( 328 f"Failed REDUCING stage for shard: {destination_pixel_order} {destination_pixel_number}", 329 exception, 330 ) --> 331 raise exception File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/hipscat_import/catalog/map_reduce.py:287, in reduce_pixel_shards() 280 if rows_written != destination_pixel_size: 281 raise ValueError( 282 "Unexpected number of objects at pixel " 283 f"({healpix_pixel})." 284 f" Expected {destination_pixel_size}, wrote {rows_written}" 285 ) --> 287 dataframe = merged_table.to_pandas() 288 if sort_columns: 289 dataframe = dataframe.sort_values(sort_columns.split(",")) File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pyarrow/array.pxi:885, in pyarrow.lib._PandasConvertible.to_pandas() 883 coerce_temporal_nanoseconds=coerce_temporal_nanoseconds 884 ) --> 885 return self._to_pandas(options, categories=categories, 886 ignore_metadata=ignore_metadata, 887 types_mapper=types_mapper) File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pyarrow/table.pxi:5002, in pyarrow.lib.Table._to_pandas() 5000 types_mapper=None): 5001 from pyarrow.pandas_compat import table_to_dataframe -> 5002 df = table_to_dataframe( 5003 options, self, categories, 5004 ignore_metadata=ignore_metadata, File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pyarrow/pandas_compat.py:774, in table_to_dataframe() 771 table = _add_any_metadata(table, pandas_metadata) 772 table, index = _reconstruct_index(table, index_descriptors, 773 all_columns, types_mapper) --> 774 ext_columns_dtypes = _get_extension_dtypes( 775 table, all_columns, types_mapper) 776 else: 777 index = _pandas_api.pd.RangeIndex(table.num_rows) File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pyarrow/pandas_compat.py:853, in _get_extension_dtypes() 848 dtype = col_meta['numpy_type'] 850 if dtype not in _pandas_supported_numpy_types: 851 # pandas_dtype is expensive, so avoid doing this for types 852 # that are certainly numpy dtypes --> 853 pandas_dtype = _pandas_api.pandas_dtype(dtype) 854 if isinstance(pandas_dtype, _pandas_api.extension_dtype): 855 if hasattr(pandas_dtype, "__from_arrow__"): File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pyarrow/pandas-shim.pxi:148, in pyarrow.lib._PandasAPIShim.pandas_dtype() 146 return self._pd.lib.infer_dtype(obj) 147 --> 148 cpdef pandas_dtype(self, dtype): 149 self._check_import() 150 try: File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pyarrow/pandas-shim.pxi:151, in pyarrow.lib._PandasAPIShim.pandas_dtype() 149 self._check_import() 150 try: --> 151 return self._types_api.pandas_dtype(dtype) 152 except AttributeError: 153 return None File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pandas/core/dtypes/common.py:1624, in pandas_dtype() 1621 return dtype 1623 # registered extension types -> 1624 result = registry.find(dtype) 1625 if result is not None: 1626 if isinstance(result, type): 1627 # GH 31356, GH 54592 File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pandas/core/dtypes/base.py:576, in find() 574 for dtype_type in self.dtypes: 575 try: --> 576 return dtype_type.construct_from_string(dtype) 577 except TypeError: 578 pass File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pandas/core/dtypes/dtypes.py:2262, in construct_from_string() 2258 except (NotImplementedError, ValueError): 2259 # Fall through to raise with nice exception message below 2260 pass -> 2262 raise NotImplementedError( 2263 "Passing pyarrow type specific parameters " 2264 f"({has_parameters.group()}) in the string is not supported. " 2265 "Please construct an ArrowDtype object with a pyarrow_dtype " 2266 "instance with specific parameters." 2267 ) from err 2268 raise TypeError(f"'{base_type}' is not a valid pyarrow data type.") from err 2269 return cls(pa_dtype) NotImplementedError: Passing pyarrow type specific parameters ([274]) in the string is not supported. Please construct an ArrowDtype object with a pyarrow_dtype instance with specific parameters. ```

Before submitting Please check the following:

delucchi-cmu commented 1 week ago

Filed https://github.com/pandas-dev/pandas/issues/59738