uber / petastorm

Petastorm library enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as Tensorflow, Pytorch, and PySpark and can be used from pure Python code.
Apache License 2.0
1.8k stars 284 forks source link

Use of transform_spec in make_batch_reader leads to tensorflow error when column is missing values #744

Open oby1 opened 2 years ago

oby1 commented 2 years ago

The conversion back to arrow from pandas in ArrowReaderWorker._load_rows() loses type information when all rows in the loaded row group are missing values for a given column. From the pyarrow.Table.from_pandas documentation:

Be aware that Series of the object dtype don’t carry enough information to always lead to a meaningful Arrow type. In the case that we cannot infer a type, e.g. because the DataFrame is of length 0 or the Series only contains None/nan objects, the type is set to null. This behavior can be avoided by constructing an explicit schema and passing it to this function.

The result is the following error when reading the corresponding batch in the TensorFlow Dataset:

InternalError: Unsupported object type NoneType

Example

from pathlib import Path
import numpy as np
import pandas as pd
import petastorm
from petastorm.unischema import UnischemaField
from petastorm import tf_utils
from petastorm.transform import TransformSpec

# Create parquet dataset
data_path = Path('/path/to/data.parquet')
data_pd = pd.DataFrame({'s': ['A', None, None, None]})
data_pd.to_parquet(data_path, row_group_size=2)

# Read with petastorm
noop_transform_spec = TransformSpec(lambda x: x, edit_fields=[UnischemaField('s', np.str_, (), nullable=True)])

reader = petastorm.make_batch_reader(data_path.as_uri(),
                                     workers_count=1,
                                     shuffle_row_groups=False,
                                     num_epochs=2,
                                     transform_spec=noop_transform_spec)

reader.next() # output: inferred_schema_transformed_view(s=array(['A', 'None'], dtype='<U4'))
reader.next() # output: inferred_schema_transformed_view(s=array([None, None], dtype=object)) <-- type is missing

# Read with tensorflow
dataset = tf_utils.make_petastorm_dataset(reader)
dataset_itr = dataset.as_numpy_iterator()

dataset_itr.next() # output: inferred_schema_transformed_view(s=array([b'A', b'None'], dtype=object))
dataset_itr.next() # InternalError: Unsupported object type NoneType

Workaround

Modify all TransformSpec funcs to replace string columns missing all values with None strings.

def transform_spec_with_workaround(rows_pd: pd.DataFrame) -> pd.DataFrame:
    ...  # custom transformation

    for c in rows_pd.columns:
        if rows_pd[c].dtype == object and rows_pd[c].isnull().all():
            rows_pd[c] = pd.Series(len(rows_pd) * ['None'])

    return rows_pd

Full Trace

---------------------------------------------------------------------------
InternalError                             Traceback (most recent call last)
~/conda/udrgpu2_86_0/lib/python3.6/site-packages/tensorflow/python/eager/context.py in execution_mode(mode)
   2101       ctx.executor = executor_new
-> 2102       yield
   2103     finally:

~/conda/udrgpu2_86_0/lib/python3.6/site-packages/tensorflow/python/data/ops/iterator_ops.py in _next_internal(self)
    757             output_types=self._flat_output_types,
--> 758             output_shapes=self._flat_output_shapes)
    759 

~/conda/udrgpu2_86_0/lib/python3.6/site-packages/tensorflow/python/ops/gen_dataset_ops.py in iterator_get_next(iterator, output_types, output_shapes, name)
   2609     except _core._NotOkStatusException as e:
-> 2610       _ops.raise_from_not_ok_status(e, name)
   2611     except _core._FallbackException:

~/conda/udrgpu2_86_0/lib/python3.6/site-packages/tensorflow/python/framework/ops.py in raise_from_not_ok_status(e, name)
   6842   # pylint: disable=protected-access
-> 6843   six.raise_from(core._status_to_exception(e.code, message), None)
   6844   # pylint: enable=protected-access

~/conda/udrgpu2_86_0/lib/python3.6/site-packages/six.py in raise_from(value, from_value)

InternalError: Unsupported object type NoneType
     [[{{node PyFunc}}]] [Op:IteratorGetNext]

During handling of the above exception, another exception occurred:

InternalError                             Traceback (most recent call last)
<ipython-input-45-55cc7ab782db> in <module>
----> 1 dataset_itr.next()

~/conda/udrgpu2_86_0/lib/python3.6/site-packages/tensorflow/python/data/ops/dataset_ops.py in next(self)
   3778 
   3779   def next(self):
-> 3780     return nest.map_structure(lambda x: x.numpy(), next(self._iterator))
   3781 
   3782   def __next__(self):

~/conda/udrgpu2_86_0/lib/python3.6/site-packages/tensorflow/python/data/ops/iterator_ops.py in __next__(self)
    734 
    735   def __next__(self):  # For Python 3 compatibility
--> 736     return self.next()
    737 
    738   def _next_internal(self):

~/conda/udrgpu2_86_0/lib/python3.6/site-packages/tensorflow/python/data/ops/iterator_ops.py in next(self)
    770   def next(self):
    771     try:
--> 772       return self._next_internal()
    773     except errors.OutOfRangeError:
    774       raise StopIteration

~/conda/udrgpu2_86_0/lib/python3.6/site-packages/tensorflow/python/data/ops/iterator_ops.py in _next_internal(self)
    762         return self._element_spec._from_compatible_tensor_list(ret)  # pylint: disable=protected-access
    763       except AttributeError:
--> 764         return structure.from_compatible_tensor_list(self._element_spec, ret)
    765 
    766   @property

~/conda/udrgpu2_86_0/lib/python3.6/contextlib.py in __exit__(self, type, value, traceback)
     97                 value = type()
     98             try:
---> 99                 self.gen.throw(type, value, traceback)
    100             except StopIteration as exc:
    101                 # Suppress StopIteration *unless* it's the same exception that

~/conda/udrgpu2_86_0/lib/python3.6/site-packages/tensorflow/python/eager/context.py in execution_mode(mode)
   2103     finally:
   2104       ctx.executor = executor_old
-> 2105       executor_new.wait()
   2106 
   2107 

~/conda/udrgpu2_86_0/lib/python3.6/site-packages/tensorflow/python/eager/executor.py in wait(self)
     65   def wait(self):
     66     """Waits for ops dispatched in this executor to finish."""
---> 67     pywrap_tfe.TFE_ExecutorWaitForAllPendingNodes(self._handle)
     68 
     69   def clear_error(self):

InternalError: Unsupported object type NoneType
     [[{{node PyFunc}}]]
selitvin commented 2 years ago

Thanks for bringing up the issue.

I tried forcing a strict schema type when converting back from pandas to pyarrow table here. Unfortunately the "trip" to pandas and back is not transparent. One type that ended up being tricky is a pa.timestamp. I was not able to make it work without implementing some weird conversion code which I am not sure if it will be robust enough.

Another approach that I tried is to use pyarrow.Table type as an argument to TransformSpec function (instead of pandas dataframe). However, working with pyarrow.Table type in the transform spec function appears to be inconvenient since pa.Table is immutable and pandas API is much more convenient for a transformation implementation.

So after doing all this, I would suggest sticking with the current implementation. While it's not perfect, I was not able to find a better alternative that would not require implementation of potentially non robust code.

Would appreciate your thought and suggestions on this matter.

oby1 commented 2 years ago

Thanks for looking into this! What was the issue with pa.timestamp? I'm not seeing the timestamp-specific conversion code in https://github.com/uber/petastorm/pull/750.

For our purposes, using the proposed workaround is not a big deal as we only use a single TransformSpec to implement nested array support as described here. The nested array support via TransformSpec is itself a bit of a hack. Has any thought been given to natively supporting nested arrays?

selitvin commented 2 years ago

The issue with timestamps I ran into was the automatic conversion of the timestamp into a datetime object - it would not be automatically converted back into pa.timestamp64. However, I just noticed that there is a date_as_object=False object (*.to_pandas(date_as_object=False)) that let me keep dates from being converted to datetimes. Reopened the #750 - let's see if I can get all tests to pass now.