pytorch / torcharrow

High performance model preprocessing library on PyTorch
https://pytorch.org/torcharrow/beta/index.html
BSD 3-Clause "New" or "Revised" License
642 stars 78 forks source link

Support for arrays in torcharrow.from_arrow #500

Open grapefroot opened 1 year ago

grapefroot commented 1 year ago

Hi guys! When trying to use ParquetDataFrameLoader I ran across a problem when trying to load parquet file if it has an array field. It looks like it comes down to torcharrow.from_arrow not supporting array columns. But it seems that torcharrow already has support for array columns. Are there any plans to implement it when loading from parquet files or are there any problems which stop this from being implemented?

The error basically looks like this:

NotImplementedError                       Traceback (most recent call last)
Input In [25], in <cell line: 1>()
----> 1 next(iter(datapipe))

File /opt/conda/lib/python3.8/site-packages/torch/utils/data/datapipes/_typing.py:514, in hook_iterator.<locals>.wrap_generator(*args, **kwargs)
    512         response = gen.send(None)
    513 else:
--> 514     response = gen.send(None)
    516 while True:
    517     request = yield response

File /opt/conda/lib/python3.8/site-packages/torch/utils/data/datapipes/iter/combinatorics.py:127, in ShufflerIterDataPipe.__iter__(self)
    125 self._rng.seed(self._seed)
    126 self._seed = None
--> 127 for x in self.datapipe:
    128     if len(self._buffer) == self.buffer_size:
    129         idx = self._rng.randint(0, len(self._buffer) - 1)

File /opt/conda/lib/python3.8/site-packages/torch/utils/data/datapipes/_typing.py:514, in hook_iterator.<locals>.wrap_generator(*args, **kwargs)
    512         response = gen.send(None)
    513 else:
--> 514     response = gen.send(None)
    516 while True:
    517     request = yield response

File /opt/conda/lib/python3.8/site-packages/torchdata/datapipes/iter/util/dataframemaker.py:138, in ParquetDFLoaderIterDataPipe.__iter__(self)
    135 for i in range(num_row_groups):
    136     # TODO: More fine-grain control over the number of rows or row group per DataFrame
    137     row_group = parquet_file.read_row_group(i, columns=self.columns, use_threads=self.use_threads)
--> 138     yield torcharrow.from_arrow(row_group, dtype=self.dtype)

File /opt/conda/lib/python3.8/site-packages/torcharrow/interop.py:32, in from_arrow(data, dtype, device)
     30     return _from_arrow_array(data, dtype, device=device)
     31 elif isinstance(data, pa.Table):
---> 32     return _from_arrow_table(data, dtype, device=device)
     33 else:
     34     raise ValueError

File /opt/conda/lib/python3.8/site-packages/torcharrow/interop_arrow.py:86, in _from_arrow_table(table, dtype, device)
     83     field = table.schema.field(i)
     85     assert len(table[i].chunks) == 1
---> 86     df_data[field.name] = _from_arrow_array(
     87         table[i].chunk(0),
     88         dtype=(
     89             # pyre-fixme[16]: `DType` has no attribute `get`.
     90             dtype.get(field.name)
     91             if dtype is not None
     92             else _arrowtype_to_dtype(field.type, field.nullable)
     93         ),
     94         device=device,
     95     )
     97 return dataframe(df_data, device=device)

File /opt/conda/lib/python3.8/site-packages/torcharrow/interop_arrow.py:37, in _from_arrow_array(array, dtype, device)
     28 assert isinstance(array, pa.Array)
     30 # Using the most narrow type we can, we (i) don't restrict in any
     31 # way where it can be used (since we can pass a narrower typed
     32 # non-null column to a function expecting a nullable type, but not
   (...)
     35 # increase the amount of places we can use the from_arrow result
     36 # pyre-fixme[16]: `Array` has no attribute `type`.
---> 37 dtype_from_arrowtype = _arrowtype_to_dtype(array.type, array.null_count > 0)
     38 if dtype and (
     39     dt.get_underlying_dtype(dtype) != dt.get_underlying_dtype(dtype_from_arrowtype)
     40 ):
     41     raise NotImplementedError("Type casting is not supported")

File /opt/conda/lib/python3.8/site-packages/torcharrow/_interop.py:205, in _arrowtype_to_dtype(t, nullable)
    199 if pa.types.is_struct(t):
    200     return dt.Struct(
    201         # pyre-fixme[16]: `DataType` has no attribute `__iter__`.
    202         [dt.Field(f.name, _arrowtype_to_dtype(f.type, f.nullable)) for f in t],
    203         nullable,
    204     )
--> 205 raise NotImplementedError(f"Unsupported Arrow type: {str(t)}")

NotImplementedError: Unsupported Arrow type: list<element: float>
This exception is thrown by __iter__ of ParquetDFLoaderIterDataPipe()
myzha0 commented 1 year ago

+1, this would be super useful!

Is this something that's planned soon? If not, do you all have any pointers on how to implement this?

Seems like a easy hack would be to just make a wrapper to ListColumnCpu that wraps _from_pysequence here. I assume this requires a data copy though to a native list (e.g., call array.to_pylist() and pass to_from_pysequence)? Would you have suggestions otherwise?

wenleix commented 1 year ago

thanks @myzha0

Seems like a easy hack would be to just make a wrapper to ListColumnCpu that wraps _from_pysequence here.

Yeah that requires two copies (arrow -> Python objects -> Velox memory buffer). For that approach to unblock, can just do ta.column(array.to_pylist(), _arrowtype_to_dtype(...)) right?