uber / petastorm

Petastorm library enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as Tensorflow, Pytorch, and PySpark and can be used from pure Python code.
Apache License 2.0
1.78k stars 285 forks source link

"Currently do not support resetting a reader while in the middle of iteration." #610

Open tadas-subonis opened 3 years ago

tadas-subonis commented 3 years ago

I have this piece of code:

batch_size = 8
with converter_train.make_torch_dataloader(transform_spec=transform, batch_size=batch_size, num_epochs=1) as dataloader_train, \
  converter_test.make_torch_dataloader(transform_spec=transform, batch_size=batch_size, num_epochs=1) as dataloader_test:
      trainer.fit(
        model, 
        dataloader_train,
        dataloader_test,
      )

when it gets to the validation bit, it throws the previously mentioned

Currently do not support resetting a reader while in the middle of iteration.

I am using PyTorch Lightning

The stack looks like this:

---------------------------------------------------------------------------
NotImplementedError                       Traceback (most recent call last)
<command-128477> in <module>
     65         model,
     66         dataloader_train,
---> 67         dataloader_test,
     68       )

/local_disk0/.ephemeral_nfs/envs/pythonEnv-af2dcbc2-ffc2-4df8-b9aa-f92890c53c30/lib/python3.7/site-packages/pytorch_lightning/trainer/states.py in wrapped_fn(self, *args, **kwargs)
     46             if entering is not None:
     47                 self.state = entering
---> 48             result = fn(self, *args, **kwargs)
     49 
     50             # The INTERRUPTED state can be set inside the run function. To indicate that run was interrupted

/local_disk0/.ephemeral_nfs/envs/pythonEnv-af2dcbc2-ffc2-4df8-b9aa-f92890c53c30/lib/python3.7/site-packages/pytorch_lightning/trainer/trainer.py in fit(self, model, train_dataloader, val_dataloaders, datamodule)
   1071             self.accelerator_backend = GPUBackend(self)
   1072             model = self.accelerator_backend.setup(model)
-> 1073             results = self.accelerator_backend.train(model)
   1074 
   1075         elif self.use_tpu:

/local_disk0/.ephemeral_nfs/envs/pythonEnv-af2dcbc2-ffc2-4df8-b9aa-f92890c53c30/lib/python3.7/site-packages/pytorch_lightning/accelerators/gpu_backend.py in train(self, model)
     49 
     50     def train(self, model):
---> 51         results = self.trainer.run_pretrain_routine(model)
     52         return results
     53 

/local_disk0/.ephemeral_nfs/envs/pythonEnv-af2dcbc2-ffc2-4df8-b9aa-f92890c53c30/lib/python3.7/site-packages/pytorch_lightning/trainer/trainer.py in run_pretrain_routine(self, model)
   1237 
   1238         # CORE TRAINING LOOP
-> 1239         self.train()
   1240 
   1241     def _run_sanity_check(self, ref_model, model):

/local_disk0/.ephemeral_nfs/envs/pythonEnv-af2dcbc2-ffc2-4df8-b9aa-f92890c53c30/lib/python3.7/site-packages/pytorch_lightning/trainer/training_loop.py in train(self)
    392                 # RUN TNG EPOCH
    393                 # -----------------
--> 394                 self.run_training_epoch()
    395 
    396                 if self.max_steps and self.max_steps <= self.global_step:

/local_disk0/.ephemeral_nfs/envs/pythonEnv-af2dcbc2-ffc2-4df8-b9aa-f92890c53c30/lib/python3.7/site-packages/pytorch_lightning/trainer/training_loop.py in run_training_epoch(self)
    514             should_check_val = self.should_check_val(batch_idx, is_last_batch)
    515             if should_check_val:
--> 516                 self.run_evaluation(test_mode=False)
    517 
    518             # -----------------------------------------

/local_disk0/.ephemeral_nfs/envs/pythonEnv-af2dcbc2-ffc2-4df8-b9aa-f92890c53c30/lib/python3.7/site-packages/pytorch_lightning/trainer/evaluation_loop.py in run_evaluation(self, test_mode)
    580 
    581         # run evaluation (val_step + val_step_end + val_epoch_end)
--> 582         eval_results = self._evaluate(self.model, dataloaders, max_batches, test_mode)
    583 
    584         # log the final eval loop metrics

/local_disk0/.ephemeral_nfs/envs/pythonEnv-af2dcbc2-ffc2-4df8-b9aa-f92890c53c30/lib/python3.7/site-packages/pytorch_lightning/trainer/evaluation_loop.py in _evaluate(self, model, dataloaders, max_batches, test_mode)
    303             dl_max_batches = max_batches[dataloader_idx]
    304 
--> 305             for batch_idx, batch in enumerate(dataloader):
    306                 if batch is None:
    307                     continue

/local_disk0/.ephemeral_nfs/envs/pythonEnv-af2dcbc2-ffc2-4df8-b9aa-f92890c53c30/lib/python3.7/site-packages/petastorm/pytorch.py in __iter__(self)
    113             raise RuntimeError(_PARALLEL_ITER_ERROR)
    114         if self._in_iter is not None:
--> 115             self.reader.reset()
    116             logger.warning('Start a new pass of Petastorm DataLoader, reset underlying Petastorm reader to position 0.')
    117         self._in_iter = True

/local_disk0/.ephemeral_nfs/envs/pythonEnv-af2dcbc2-ffc2-4df8-b9aa-f92890c53c30/lib/python3.7/site-packages/petastorm/reader.py in reset(self)
    456             # drop these in-flight samples? Or just ignore it? What would happen if we have two concurrent ventilators
    457             # that are emitting load requests at the same time?
--> 458             raise NotImplementedError('Currently do not support resetting a reader while in the middle of iteration. '
    459                                       'You can call reset only after all samples were consumed.')
    460         self.last_row_consumed = False

NotImplementedError: Currently do not support resetting a reader while in the middle of iteration. You can call reset only after all samples were consumed.

Lightning trainer config:

trainer = Trainer(
  max_epochs=50, 
  gpus=1, 
  logger=logger,
  check_val_every_n_epoch=5,
  progress_bar_refresh_rate=250,
)

Is there something that I am doing wrong?

selitvin commented 3 years ago

The error indicates that we were trying to reset() a reader before the number of epochs specified during its creation was consumed. It's hard to say what went wrong without seeing how do you create the reader/dataloader. Can you please post that code? Also, please post the version of pytorch_lightning that you use.

tadas-subonis commented 3 years ago
petastorm[torch]  (0.9.5)
pytorch-lightning==0.9.0

Creation code (a bit simplified)

indexed_dataset = some_data \
  .zipWithUniqueId()

def is_validation(x):
  return (x[1] % 100) >= 80

def not_(f):
  def step(x):
    return not f(x)
  return step

validation_dataset = indexed_dataset \
  .filter(is_validation) \
  .map(lambda x: x[0])

training_dataset = indexed_dataset \
  .filter(not_(is_validation)) \
  .map(lambda x: x[0])

def rdd_to_df(rdd):
  rdd = rdd.map(lambda x: Row(**(x)))
  return spark.createDataFrame(rdd)

training_dataset_rdd = training_dataset.map(vectorize).map(create_pytorch_dict_with_numpy_bytes)
validation_dataset_rdd = validation_dataset.map(vectorize).map(create_pytorch_dict_with_numpy_bytes)

converter_train = make_spark_converter(rdd_to_df(training_dataset_rdd))
converter_test = make_spark_converter(rdd_to_df(validation_dataset_rdd))
tadas-subonis commented 3 years ago

For now, I've loaded the validation dataset into memory and used the regular DataLoader and it seems to work fine.

But I've also noticed a strange behavior just recently - if I start training from "scratch", the process will train as it is supposed to - it will go from epoch 1 to 50. However, if I reuse the same session run and run the pipeline without modifications, it will run just the last epoch and terminate.

@selitvin do you have any ideas why that could happen? :)

Thanks

selitvin commented 3 years ago

Hmm, I am not sure. I am not familiar with pytorch-lighting and how they interact with DataLoader.

LeonardoEmili commented 3 years ago

I am facing the same issue using a custom petastorm.Dataloader (actually I only provide a custom collate function, not changing the Dataloader at all) and feeding it to pytorch-lightning Trainer. @tadas-subonis were you able to fix it or we shall rely on the default Dataloader if using pytorch-lightning? By chance, did you experiment with PyTorch to see if in that case, it works as expected?

darwinharianto commented 3 years ago

Is there any progress on this?

LeonardoEmili commented 3 years ago

Is there any progress on this?

I had to switch to plain PyTorch, was not able to do it using Pytorch Lightning,

maxisawesome commented 2 years ago

Also getting this issue. At first I was using "limit_train_batches" in my Pytorch Lightning trainer, but I removed it and still ended up with this error. PTL runs two validation steps at the beginning of the model, then returns to the validation dataloader when it finishes a training epoch. I assume, upon beginning the full validation run, it resets the val dataloader. If this is what's happening, it sounds like Pytorch Lightning is not supported. If so, that should be made clear somewhere.

Unfortunately, I cannot copy my stack trace due to some VPN things, but the error occurs at line 116 in petastorm/pytorch.py and then line 490 at petastorm/reader.py

maxisawesome commented 2 years ago

I solved this w/ num_sanity_val_steps=0 in my PytorchLightning Trainer! Seems like my intuition above was correct, and after turning the sanity check off, the val set doesnt need be reset, and no errors occur. Nice!

wbeardall commented 1 year ago

As an alternative fix to this, there's a relatively simple workaround that involves subclassing the Petastorm DataLoader, such that instead of being initialized with a Reader object, it is initialized with a zero-argument function which returns a Reader, so that the loader can re-create its reader if needs be. A possible implementation of this is given below:

import logging
from petastorm.pytorch import _PARALLEL_ITER_ERROR, DataLoader, decimal_friendly_collate

logger = logging.getLogger(__name__)

class SafeResetLoader(DataLoader):
    """
    A data loader adaptor for ``torch.utils.data.DataLoader``.
    This class iterates and returns items from the Reader in batches.
    Behaves the same as the Petastorm dataloader, except it takes a function which generates a reader,
    so that it can reset itself at any time.
    This loader can be used as an iterator and will terminate when the reader used in the construction of the class
    runs out of samples.
    """

    def __init__(self, reader_fn, batch_size=1, collate_fn=decimal_friendly_collate,
                 shuffling_queue_capacity=0):
        """
        Initializes a data loader object, with a default collate.
        Number of epochs is defined by the configuration of the reader argument.
        An optional shuffling queue is created if shuffling_queue_capacity is greater than 0. No samples will be
        returned to a user by the ``DataLoader`` until the queue is full. After that, batches of `batch_size`
        will be created by uniformly sampling the shuffling queue. Once no more samples are available from the data
        reader, the shuffling queue is allowed to be consumed till no further samples are available.
        Note that the last returned batch could have less then ``batch_size`` samples.
        NOTE: ``make_batch_reader`` has it's own ``shuffle_row_groups`` argument. It randomizes order in
        which parquet row-groups are loaded and has no effect on the order of rows within each row-group. To achieve
        row-level shuffling you should set shuffling_queue_capacity to a non zero value.
        :param reader: petastorm Reader instance
        :param batch_size: the number of items to return per batch; factored into the len() of this reader
        :param collate_fn: an optional callable to merge a list of samples to form a mini-batch.
        :param shuffling_queue_capacity: Queue capacity is passed to the underlying :class:`tf.RandomShuffleQueue`
          instance. If set to 0, no shuffling will be done.
        """
        super().__init__(reader_fn(),batch_size=batch_size,collate_fn=collate_fn,
            shuffling_queue_capacity=shuffling_queue_capacity)
        self._reader_fn = reader_fn

    def __iter__(self):
        if self._error is not None:
            raise RuntimeError('Cannot start a new iteration because last time iteration failed with error {err}.'
                               .format(err=repr(self._error)))
        if self._in_iter is not None and self._in_iter == True:  # noqa: E712
            raise RuntimeError(_PARALLEL_ITER_ERROR)
        if self._in_iter is not None:
            try:
                self.reader.reset()
            except NotImplementedError:
                # Called reset in the middle of an iteration.
                self.reader.stop()
                self.reader.join()
                self.reader = self._reader_fn()
            logger.warning('Start a new pass of Petastorm DataLoader, reset underlying Petastorm reader to position 0.')
        self._in_iter = True

        try:
            for batch in self._iter_impl():
                yield batch
        except Exception as e:
            self._error = e
            logger.error('Iteration on Petastorm DataLoader raise error: %s', repr(e))
            raise
        finally:
            self._in_iter = False

    def terminate(self):
        self.reader.stop()
        self.reader.join()

This loader can then be called in the following manner:

import functools

make_reader_fn = functools.partial(partial(make_reader, ...))
loader = SafeResetLoader(make_reader_fn, collate_fn=..., batch_size=...)
elbamos commented 1 month ago

Is there any progress on this issue? Its a pretty big gap if petastorm isn't compatible with pytorch lightning.

selitvin commented 1 month ago

@elbamos : I am not actively maintaining petastorm any longer. I am not aware of any active contributors at this stage.