uber / petastorm

Petastorm library enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as Tensorflow, Pytorch, and PySpark and can be used from pure Python code.
Apache License 2.0
1.78k stars 285 forks source link

Non deterministic fail during model training #547

Closed sonNeturo closed 4 years ago

sonNeturo commented 4 years ago

When training a model, it randomly fails with following error message:

Traceback (most recent call last):
  File "/tmp/3e264757a0e444669399e145e55ba71d/training.py", line 201, in <module>
    test_freq=1,
  File "/opt/conda/miniconda3/envs/myenv/lib/python3.6/site-packages/offline_audience/lib/utils.py", line 205, in fit_with_petastorm
    test_epoch(model, test_path, fields_to_parse, load_batch, **kwargs)
  File "/opt/conda/miniconda3/envs/myenv/lib/python3.6/site-packages/offline_audience/lib/utils.py", line 258, in test_epoch
    make_batch_reader(test_path, schema_fields=fields_to_parse, num_epochs=1),
  File "/opt/conda/miniconda3/envs/myenv/lib/python3.6/site-packages/petastorm/reader.py", line 289, in make_batch_reader
    is_batched_reader=True)
  File "/opt/conda/miniconda3/envs/myenv/lib/python3.6/site-packages/petastorm/reader.py", line 394, in __init__
    row_groups = dataset_metadata.load_row_groups(self.dataset)
  File "/opt/conda/miniconda3/envs/myenv/lib/python3.6/site-packages/petastorm/etl/dataset_metadata.py", line 244, in load_row_groups
    return _split_row_groups_from_footers(dataset)
  File "/opt/conda/miniconda3/envs/myenv/lib/python3.6/site-packages/petastorm/etl/dataset_metadata.py", line 334, in _split_row_groups_from_footers
    result = [item for f in futures_list for item in f.result()]
  File "/opt/conda/miniconda3/envs/myenv/lib/python3.6/site-packages/petastorm/etl/dataset_metadata.py", line 334, in <listcomp>
    result = [item for f in futures_list for item in f.result()]
  File "/opt/conda/miniconda3/envs/myenv/lib/python3.6/concurrent/futures/_base.py", line 432, in result
    return self.__get_result()
  File "/opt/conda/miniconda3/envs/myenv/lib/python3.6/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
  File "/opt/conda/miniconda3/envs/myenv/lib/python3.6/concurrent/futures/thread.py", line 56, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/opt/conda/miniconda3/envs/myenv/lib/python3.6/site-packages/petastorm/etl/dataset_metadata.py", line 316, in _split_piece
    metadata = compat_get_metadata(piece, fs_open)
  File "/opt/conda/miniconda3/envs/myenv/lib/python3.6/site-packages/petastorm/compat.py", line 31, in compat_get_metadata
    arrow_metadata = piece.get_metadata()
  File "/opt/conda/miniconda3/envs/myenv/lib/python3.6/site-packages/pyarrow/parquet.py", line 502, in get_metadata
    f = self.open()
  File "/opt/conda/miniconda3/envs/myenv/lib/python3.6/site-packages/pyarrow/parquet.py", line 520, in open
    reader = self.open_file_func(self.path)
  File "/opt/conda/miniconda3/envs/myenv/lib/python3.6/site-packages/pyarrow/parquet.py", line 1062, in open_file
    common_metadata=self.common_metadata)
  File "/opt/conda/miniconda3/envs/myenv/lib/python3.6/site-packages/pyarrow/parquet.py", line 130, in __init__
    self.reader.open(source, use_memory_map=memory_map, metadata=metadata)
  File "pyarrow/_parquet.pyx", line 655, in pyarrow._parquet.ParquetReader.open
  File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
pyarrow.lib.ArrowIOError: Arrow error: IOError: b''

The error happens randomly. Sometimes an epoch finishes without an issue, but fails in the next one. My code is pretty simple:

    with DataLoader(
        make_batch_reader(train_path, schema_fields=fields_to_parse, num_epochs=1),
        batch_size=kwargs["train_batch_size"],
    ) as train_loader:
        device = kwargs["device"]
        for batch_data in tqdm(train_loader, total=n_batches):
            x, y = load_batch(batch_data)
            x = x.to(device)
            y = y.to(device)
            y_pred = model(x).squeeze()

            model.optim.zero_grad()
            loss = model.loss_func(y_pred, y.squeeze(), reduction="sum")

            loss_epoch += loss.item()
            loss.backward()
            model.optim.step()

Could this error be due to the distributed settings? I'am using a dataproc cluster with 20 workers and a GPU.

versions:

Thanks for the help.

selitvin commented 4 years ago

Based on your call stack, the failure occurs inside make_batch_reader call, i.e. before you start iterating on the data. The crash occurs when pyarrow tries to open on of parquet files in a parquet dataset. This happens on a threadpool, i.e. multiple files are being opened in parallel. Immediate suspects are thread-safety issues in pyarrow or underlying hdfs driver (assuming you are using hdfs).

I'd try the following:

sonNeturo commented 4 years ago

Thanks @selitvin for your answer.

sshrdp commented 4 years ago

Does petastorm read from a single file in multiple threads? Trying to understand where the race-condition might be.

vlasenkoalexey commented 4 years ago

@sonNeturo could you please provide more information:

I run MNIST sample multiple times, and everything works fine. Could you try running following and see if issue can be reproduced for this dataset: python3 -m examples.mnist.pytorch_example --dataset-url=gs://alekseyv-scalableai-dev/petastorm_mnist or python3 -m examples.mnist.tf_example --dataset-url=gs://alekseyv-scalableai-dev/petastorm_mnist

sonNeturo commented 4 years ago

I haven't had the issue for more than a week now, for a job that runs 3x times a day... Haven't changed anything in my code. I'm closing the issue for now, unless I reencounter the problem and have more details to share. To answer @vlasenkoalexey: