uber / petastorm

Petastorm library enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as Tensorflow, Pytorch, and PySpark and can be used from pure Python code.
Apache License 2.0
1.78k stars 285 forks source link

Simplify data conversion from Spark to TensorFlow: support tensorflow dataset advance arguments #506

Closed WeichenXu123 closed 4 years ago

WeichenXu123 commented 4 years ago

What changes are proposed in this PR?

Add converter.make_tf_dataset() with advanced params.

API

class SparkDatasetConverter(object):
    """
    A `SparkDatasetConverter` object holds one materialized spark dataframe and
    can be used to make one or more tensorflow datasets or torch dataloaders.
    The `SparkDatasetConverter` object is picklable and can be used in remote
    processes.
    See `make_spark_converter`
    """

    def make_tf_dataset(
            self,
            batch_size=32,
            prefetch=None,
            num_epochs=None,
            workers_count=4,
            **petastorm_reader_kwargs
    ):
        """
        Make a tensorflow dataset.

        This method will do the following two steps:
          1) Open a petastorm reader on the materialized dataset dir.
          2) Create a tensorflow dataset based on the reader created in (1)

        :param batch_size: The number of items to return per batch
        :param prefetch: Prefetch size for tensorflow dataset. If None will use
            tensorflow autotune size. Note only available on tensorflow>=1.14
        :param num_epochs: An epoch is a single pass over all rows in the dataset.
            Setting ``num_epochs`` to ``None`` will result in an infinite number
            of epochs.
        :param workers_count: An int for the number of workers to use in the
            reader pool. This only is used for the thread or process pool. Default value 4.
        :param petastorm_reader_kwargs: arguments for `petastorm.make_batch_reader()`,
            exclude these arguments: "dataset_url", "num_epochs", "workers_count".

        :return: a context manager for a `tf.data.Dataset` object.
                 when exit the returned context manager, the reader
                 will be closed.
        """

Example code

from petastorm import make_spark_converter
from petastorm.spark import SparkDatasetConverter
import torch

# specify a cache dir first.
# the dir is used to save materialized spark dataframe files
spark.conf.set(SparkDatasetConverter.PARENT_CACHE_DIR_URL_CONF, 'hdfs:/...')

df1 = ... # `df1` is a spark dataframe

# create a converter from `df1`
# it will materialize `df1` to cache dir.
converter1 = make_spark_converter(df1)

# make a tensorflow dataset from `converter1
with converter1.make_tf_dataset(
        batch_size=128, prefetch=2, num_epochs=1, worker_count=1,
        cur_shard=hvd.rank(),
        shard_count=hvd.count()) as dataset:
    # the `dataset` is `tf.data.Dataset` object
    # we can train/evaluate model on `dataset`
    # when exit the with context, the underlying petastorm reader of the dataset will be closed
    ...

# delete the cached files of the dataframe.
converter1.delete()
codecov[bot] commented 4 years ago

Codecov Report

Merging #506 into master will increase coverage by 0.03%. The diff coverage is 86.00%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #506      +/-   ##
==========================================
+ Coverage   85.98%   86.02%   +0.03%     
==========================================
  Files          81       81              
  Lines        4311     4402      +91     
  Branches      674      704      +30     
==========================================
+ Hits         3707     3787      +80     
- Misses        499      504       +5     
- Partials      105      111       +6     
Impacted Files Coverage Δ
petastorm/spark/spark_dataset_converter.py 90.62% <86.00%> (-2.66%) :arrow_down:
petastorm/codecs.py 77.77% <0.00%> (-1.17%) :arrow_down:
petastorm/arrow_reader_worker.py 92.00% <0.00%> (ø)
petastorm/etl/dataset_metadata.py 88.88% <0.00%> (ø)
petastorm/reader.py 90.99% <0.00%> (+0.17%) :arrow_up:
petastorm/fs_utils.py 91.01% <0.00%> (+2.27%) :arrow_up:

Continue to review full report at Codecov.

Legend - Click here to learn more Δ = absolute <relative> (impact), ø = not affected, ? = missing data Powered by Codecov. Last update e3acecf...0d704af. Read the comment docs.

WeichenXu123 commented 4 years ago

@selitvin Ready. Note: the unittest.mock has bug on python3 (some version) which make CI raise error. So I import built-in mock first and if failed fallback to unittest mock.

try:
  from mock import mock
except:
  from unitest import mock