uber / petastorm

Petastorm library enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as Tensorflow, Pytorch, and PySpark and can be used from pure Python code.
Apache License 2.0
1.8k stars 284 forks source link

Simplify data conversion from Spark to TensorFlow: Spark converter basic implementation. #496

Closed WeichenXu123 closed 4 years ago

WeichenXu123 commented 4 years ago

What does this PR do?

This is the first PR for project: Simplify data conversion from Spark to TensorFlow. This PR include basic implementation, including basic API for spark dataset converter.

API

def make_spark_converter(
        df,
        cache_dir_url=None,
        parquet_row_group_size_bytes=DEFAULT_ROW_GROUP_SIZE_BYTES,
        compression_codec=None):
    """
    Convert a spark dataframe into a :class:`SparkDatasetConverter` object.
    It will materialize a spark dataframe to a `cache_dir_url`.
    The returned `SparkDatasetConverter` object will hold the materialized
    dataframe, and can be used to make one or more tensorflow datasets or
    torch dataloaders.

    :param df: The :class:`DataFrame` object to be converted.
    :param cache_dir_url: A URL string denoting the parent directory to store
        intermediate files. Default None, it will fallback to the spark config
        "petastorm.spark.converter.defaultCacheDirUrl".
    :param parquet_row_group_size_bytes: An int denoting the number of bytes
        in a parquet row group.
    :param compression_codec: Specify compression codec.
        It can be one of 'uncompressed', 'bzip2', 'gzip', 'lz4', 'snappy', 'deflate'.
        Default None. If None, it will leave the data uncompressed.

    :return: a :class:`SparkDatasetConverter` object that holds the
        materialized dataframe and can be used to make one or more tensorflow
        datasets or torch dataloaders.
    """

class SparkDatasetConverter(object):
    """
    A `SparkDatasetConverter` object holds one materialized spark dataframe and
    can be used to make one or more tensorflow datasets or torch dataloaders.
    The `SparkDatasetConverter` object is picklable and can be used in remote
    processes.
    See `make_spark_converter`
    """

    def __init__(self, cache_dir_url, dataset_size):
        """
        :param dataset_size: An int denoting the number of rows in the
            dataframe.
        """

    def __len__(self):
        """
        :return: dataset size
        """
    def make_tf_dataset(self):
        """
        Make a tensorflow dataset.

        This method will do the following two steps:
          1) Open a petastorm reader on the materialized dataset dir.
          2) Create a tensorflow dataset based on the reader created in (1)

        :return: a context manager for a `tf.data.Dataset` object.
                 when exit the returned context manager, the reader
                 will be closed.
        """
    def delete(self):
        """
        Delete cache files at self.cache_dir_url.
        """

Example code:

from petastorm import make_spark_converter
import tensorflow as tf

# specify a cache dir first.
# the dir is used to save materialized spark dataframe files
spark.conf.set('petastorm.spark.converter.parentCacheDirUrl', 'hdfs:/...')

df1 = ... # `df1` is a spark dataframe

# create a converter from `df1`
# it will materialize `df1` to cache dir.
converter1 = make_spark_converter(df1)

# make a tensorflow dataset from `converter1
with converter1.make_tf_dataset() as dataset:
    # the `dataset` is `tf.data.Dataset` object
    # we can train/evaluate model on `dataset`
    # when exit the with context, the reader of the dataset will be closed
    ...

# delete the cached files of the dataframe.
converter1.delete()

Follow-up PRs

codecov[bot] commented 4 years ago

Codecov Report

Merging #496 into master will increase coverage by 0.21%. The diff coverage is 93.38%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #496      +/-   ##
==========================================
+ Coverage   85.77%   85.98%   +0.21%     
==========================================
  Files          79       81       +2     
  Lines        4190     4311     +121     
  Branches      665      674       +9     
==========================================
+ Hits         3594     3707     +113     
- Misses        494      499       +5     
- Partials      102      105       +3
Impacted Files Coverage Δ
petastorm/spark/__init__.py 100% <100%> (ø)
petastorm/spark/spark_dataset_converter.py 93.27% <93.27%> (ø)
petastorm/unischema.py 94.58% <0%> (ø) :arrow_up:

Continue to review full report at Codecov.

Legend - Click here to learn more Δ = absolute <relative> (impact), ø = not affected, ? = missing data Powered by Codecov. Last update b3120b1...fa8ebb6. Read the comment docs.

CLAassistant commented 4 years ago

CLA assistant check
All committers have signed the CLA.

WeichenXu123 commented 4 years ago

@tgaddair @selitvin Address all comments. Ready for review. :)

WeichenXu123 commented 4 years ago

@selitvin Ready for review.

WeichenXu123 commented 4 years ago

@selitvin Ready.