tensorflow / datasets

TFDS is a collection of datasets ready to use with TensorFlow, Jax, ...
https://www.tensorflow.org/datasets
Apache License 2.0
4.29k stars 1.54k forks source link

Adding Beam Dataset, Out of Disk Space on Dataflow using GCS #1689

Open Ouwen opened 4 years ago

Ouwen commented 4 years ago

What I need help with / What I was wondering I would like to add a dataset for later contribution: https://github.com/Ouwen/datasets/tree/tfds

What I've tried so far Small slices of data locally and on Google Cloud Dataflow work fine. I run the following script:

DATASET_NAME=duke_ultranet/dynamic_rx_beamformed
GCP_PROJECT=my-project
GCS_BUCKET=bucket-location
echo "git+git://github.com/ouwen/datasets@tfds" > /tmp/beam_requirements.txt

python3 -m tensorflow_datasets.scripts.download_and_prepare \
  --datasets=$DATASET_NAME \
  --data_dir=$GCS_BUCKET/tensorflow_datasets \
  --beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=duke-ultranet-dynamic-rx-beamformed-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt,region=us-east1,"\
"autoscaling_algorithm=NONE,num_workers=20,"\
"machine_type=n1-highmem-16,experiments=shuffle_mode=service"

I get the following errors using Data Shuffle on the full dataset (3.85 TB).

IOError: [Errno 28] No space left on device
at
copyfile (/usr/lib64/python2.7/shutil.py:84)
at
copy (/usr/lib64/python2.7/shutil.py:119)
at
SetConfiguredUsers (/usr/lib64/python2.7/site-packages/google_compute_engine/accounts/accounts_utils.py:293)
at
HandleAccounts (/usr/lib64/python2.7/site-packages/google_compute_engine/accounts/accounts_daemon.py:263)
at
WatchMetadata (/usr/lib64/python2.7/site-packages/google_compute_engine/metadata_watcher.py:196)
RuntimeError: tensorflow.python.framework.errors_impl.InternalError: Could not write to the internal temporary file. [while running 'train/WriteFinalShards']
at
__exit__ (/usr/local/lib/python3.7/site-packages/tensorflow_core/python/framework/errors_impl.py:554)
at
close (/usr/local/lib/python3.7/site-packages/tensorflow_core/python/lib/io/tf_record.py:246)
at
__exit__ (/usr/local/lib/python3.7/site-packages/tensorflow_core/python/lib/io/tf_record.py:227)
at
_write_tfrecord (/usr/local/lib/python3.7/site-packages/tensorflow_datasets/core/tfrecords_writer.py:121)
at
_write_final_shard (/home/jupyter/datasets/tensorflow_datasets/core/tfrecords_writer.py:351)
at
<lambda> (/opt/conda/lib/python3.7/site-packages/apache_beam/transforms/core.py:1437)
at
apache_beam.runners.common.SimpleInvoker.invoke_process (common.py:498)
at
apache_beam.runners.common.DoFnRunner.process (common.py:883)
at
raise_with_traceback (/usr/local/lib/python3.7/site-packages/future/utils/__init__.py:421)
at
apache_beam.runners.common.DoFnRunner._reraise_augmented (common.py:956)
at
apache_beam.runners.common.DoFnRunner.process (common.py:885)
at
apache_beam.runners.common.DoFnRunner.receive (common.py:878)
at
apache_beam.runners.worker.operations.DoOperation.process (operations.py:658)
at
apache_beam.runners.worker.operations.DoOperation.process (operations.py:657)
at
apache_beam.runners.worker.operations.SingletonConsumerSet.receive (operations.py:178)
at
apache_beam.runners.worker.operations.Operation.output (operations.py:304)
at
dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process (shuffle_operations.py:268)
at
dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process (shuffle_operations.py:261)
at
apache_beam.runners.worker.operations.SingletonConsumerSet.receive (operations.py:178)
at
apache_beam.runners.worker.operations.Operation.output (operations.py:304)
at
dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start (shuffle_operations.py:84)
at
dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start (shuffle_operations.py:80)
at
dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start (shuffle_operations.py:79)
at
dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start (shuffle_operations.py:64)
at
dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start (shuffle_operations.py:63)
at
execute (/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py:176)
at
do_work (/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py:648)

If I run without DataShuffle but with 500gb disks on 10 workers (5TB for a 2.5TB dataset)

return dill.loads(s): 
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 275, in loads
 return load(file, ignore, **kwds) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 270, in load
 return Unpickler(file, ignore=ignore, **kwds).load() File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 472, in load
 obj = StockUnpickler.load(self) File "/usr/local/lib/python3.7/site-packages/tensorflow_datasets/core/dataset_builder.py", line 210, in __setstate__
 self.__init__(**state) File "/usr/local/lib/python3.7/site-packages/tensorflow_datasets/core/dataset_builder.py", line 1116, in __init__
 super(BeamBasedBuilder, self).__init__(*args, **kwargs) 
File "/usr/local/lib/python3.7/site-packages/tensorflow_datasets/core/api_utils.py", line 53, in disallow_positional_args_dec
 return fn(*args, **kwargs)
 File "/usr/local/lib/python3.7/site-packages/tensorflow_datasets/core/dataset_builder.py", line 204, in __init__
 self.info.initialize_from_bucket() 
File "/usr/local/lib/python3.7/site-packages/tensorflow_datasets/core/dataset_info.py", line 428, in initialize_from_bucket
 tmp_dir = tempfile.mkdtemp("tfds") 
File "/usr/local/lib/python3.7/tempfile.py", line 366, in mkdtemp
 _os.mkdir(file, 0o700) 
OSError: [Errno 28] No space left on device: '/tmp/tmpereoz1ljtfds'

It seems to process everything, but only have trouble writing to the final GCS directory is the temp GCS bucket location being respected?

image

Environment information

Eshan-Agarwal commented 4 years ago

@Ouwen please have a look on #1345. I think temp location is not accessible for all workers.

Ouwen commented 4 years ago

@Eshan-Agarwal what would cause the dataset to work on smaller slices with the same number of workers?

Eshan-Agarwal commented 4 years ago

@Ouwen Okay I think problem is not with issue I mentioned sorry for that, If you are providing data in smaller slices it works than maybe problem is with tfrecords_writer.py, I am looking into it and let you know if i find anything. what is size for small slices of dataset also can you provide code for dataset

Ouwen commented 4 years ago

code is here https://github.com/Ouwen/datasets/tree/tfds

Edit: the smaller slices are around 500 examples. If I increase the local disk of data flow workers, the dataset completes successfully. However, I am wondering if the behavior should be streaming into gcs, not a local tmp file

Ouwen commented 4 years ago

@Conchylicultor sorry to bother you with this. I'm just wondering if beam datasets will write to a local tmp directory before writing into a GCS path.

Conchylicultor commented 4 years ago

The issue seems to be that we pre-load the metadata locally by creating a temporary folder: https://github.com/tensorflow/datasets/blob/e4aa686a4d623464cc5861464c6fbee6f3152e17/tensorflow_datasets/core/dataset_info.py#L428

From the comment, it seems that was done because of colab. However, it seems that dataflow workers do not have temporary disk.

The solutions I see would be to update the code to not copy to temp file.

Could you try replacing the initialize_from_bucket function by:

def initialize_from_bucket(self):
  """Initialize DatasetInfo from GCS bucket info files."""
  info_path = 'gs://tfds-data/dataset_info/' + self.full_name
  if not if.io.gfile.exists(info_path):
    return
  logging.info("Loading info from GCS for %s", self.full_name)
  self.read_from_directory(info_path)

It will avoid creating the tmp dir. If this works, we can then think about making this works for colab too.

Edit: To be honest, I'm not sure why an error is raise here. The tmp dir size should be minimal (and here as this is not an official dataset, content should be empty). Or is it because the tmp folder is never deleted ?

Conchylicultor commented 4 years ago

To answer your question, Dataflow may create intermediate artefacts to checkpoint the different step of the Beam pipeline or if the pipeline computation do not fit in memory. We do not have control over this.

However from TFDS side, the final data is written directly in the data_dir path which should be a GCS path.

To be complete, the data is generated in a temp directory (data_dir + .tmpsuffix) which is renamed if the generation is successful. However, this tmp dir is located on the same disk as the final destination. https://github.com/tensorflow/datasets/blob/master/tensorflow_datasets/core/dataset_builder.py#L329

Conchylicultor commented 4 years ago

Also what is this duke_ultranet/dynamic_rx_beamformed dataset ? It doesn't seems to be an official dataset is it ? Could it be possible that the implementation write to temp dir ?

Ouwen commented 4 years ago

The duke_ultranet/dynamic_rx_beamformed is a new dataset I'm in the process of creating for a manuscript. The current implementation reads from a GCS bucket and writes to a GCS bucket using google cloud dataflow (their apache beam implementation).

When I run dataflow with Google's data shuffle feature there may be some out of disk error that occurs. However, my understanding is that data shuffle should automatically handle the disk needs. When I increase the disk size for data shuffle, this error goes away (however, I don't have visibility as to what disk requirement is needed). For now, I'm fine with the successful run by increasing disk space. This may be a DataFlow datashuffle error

If I turn off datashuffle, the /tmp error appears which seems related to #1345.

Ouwen commented 4 years ago

One last question for you, once I create the dataset, the shard size is about 700MB each. However, when I load the dataset it seems to take up over 20GB of RAM on my machine. Does tfds perform implicit prefetching that we can turn off?

Each individual record should only be about 107.28 MB

Conchylicultor commented 4 years ago

If I turn off datashuffle, the /tmp error appears which seems related to #1345.

From the stacktrace, you copied above, this seems unrelated. The error is about the metadata loading when the builder gets unpickled. If you're usingBeamMetadataDict in your dataset, you may encounter #1345, but difficult to know without the source code.

Does tfds perform implicit prefetching that we can turn off?

Currently it does in: https://github.com/tensorflow/datasets/blob/master/tensorflow_datasets/core/dataset_builder.py#L566

You could add an option https://github.com/tensorflow/datasets/blob/master/tensorflow_datasets/core/utils/read_config.py to disable automatic pre-fetching. However, this may not be the issue, so please check that this indeed the issue.

Ouwen commented 4 years ago

Thanks for the help. The ReadConfig interleave parallel and block length defaulting to 16 was the source of greedy ram usage. Simply decreasing these params solved the ram issues.