uber / petastorm

Petastorm library enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as Tensorflow, Pytorch, and PySpark and can be used from pure Python code.
Apache License 2.0
1.78k stars 285 forks source link

[WIP] Auto infer schema (including fields shape) from the first row #512

Open WeichenXu123 opened 4 years ago

WeichenXu123 commented 4 years ago

What issues does the PR addresses ?

There're 2 issues in make_batch_reader, one is critical and another is less critical but a pain point.

(Critical) Inferring schema in make_batch_reader cannot infer fields' shape information

Because there's no shape information, when make tensorflow dataset from the reader, if we make some tensorflow dataset operations, such as unroll, batch, and reshape field, error may occur. Tensorflow graph operator depends on field shape information heavily.

(Pain point) The TransformSpec need to specify edit/removed fields manually

We hope user can only provide a transform function, and petastorm can automatically infer the result schema from the output pandas dataframe of the transform function.

The approach in the PR

Add a method ArrowReaderWorker. infer_schema_from_first_row which can read a row first and infer the schema from the row. So that we can infer the accurate shape information. Add a param infer_schema_from_first_row into make_batch_reader (default off, so won't break API behavior)

Limitations:

Test

Unit test to be added. But it is ready for first review.

Example code

import os
import pandas as pd
import sys
import numpy as np
from pyspark.sql.functions import pandas_udf
import tensorflow as tf

from petastorm import make_batch_reader
from petastorm.transform import TransformSpec
from petastorm.spark import make_spark_converter
spark.conf.set('petastorm.spark.converter.parentCacheDirUrl', 'file:/tmp/converter')

data_url = 'file:/tmp/0001'
data_path = '/tmp/t0001'

@pandas_udf('array<float>')
def gen_array(v):
  return v.map(lambda x: np.random.rand(10))

df1 = spark.range(10).withColumn('v', gen_array('id')).repartition(2)
cv1 = make_spark_converter(df1)

# we can auto infer one-dim array shape
with cv1.make_tf_dataset(batch_size=4, num_epochs=1) as dataset:
    iter = dataset.make_one_shot_iterator()
    next_op = iter.get_next()
    with tf.Session() as sess:
        for i in range(3):
            batch = sess.run(next_op)
            print(batch)

def preproc_fn(x):
  # reshape column 'v' to (2, 5) shape.
  x2 = pd.DataFrame({'v': x['v'].map(lambda x: x.reshape((2, 5))), 'id': x['id'] + 10000})
  return x2

# now we can auto infer multi-dim array shape.
with cv1.make_tf_dataset(batch_size=4, preprocess_fn=preproc_fn, num_epochs=1) as dataset:
    iter = dataset.make_one_shot_iterator()
    next_op = iter.get_next()
    with tf.Session() as sess:
        for i in range(3):
            batch = sess.run(next_op)
            print(batch)
codecov[bot] commented 4 years ago

Codecov Report

Merging #512 into master will decrease coverage by 0.16%. The diff coverage is 72.91%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #512      +/-   ##
==========================================
- Coverage   86.02%   85.86%   -0.17%     
==========================================
  Files          81       81              
  Lines        4402     4442      +40     
  Branches      704      713       +9     
==========================================
+ Hits         3787     3814      +27     
- Misses        504      511       +7     
- Partials      111      117       +6
Impacted Files Coverage Δ
petastorm/tf_utils.py 80.91% <ø> (ø) :arrow_up:
petastorm/spark/spark_dataset_converter.py 87.5% <25%> (-3.13%) :arrow_down:
petastorm/reader.py 90.32% <77.77%> (-0.68%) :arrow_down:
petastorm/arrow_reader_worker.py 90.34% <83.87%> (-1.66%) :arrow_down:

Continue to review full report at Codecov.

Legend - Click here to learn more Δ = absolute <relative> (impact), ø = not affected, ? = missing data Powered by Codecov. Last update 0b70510...529cb83. Read the comment docs.

WeichenXu123 commented 4 years ago

I create a simple PR to address issue 1, https://github.com/uber/petastorm/pull/517 We can merge that one first. This PR could be a long-term work.