uber / petastorm

Petastorm library enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as Tensorflow, Pytorch, and PySpark and can be used from pure Python code.
Apache License 2.0
1.8k stars 284 forks source link

Make `make_batch_reader` TransformSpec support output multi-dimensional array type. #504

Closed WeichenXu123 closed 4 years ago

WeichenXu123 commented 4 years ago

Make make_batch_reader TransformSpec support output multi-dimensional array type.

Why we need this feature ?

The project: Simplify data conversion from Spark to TensorFlow: Spark converter basic implementation is implemented via make_batch_reader API. And user may want to do some preprocessing and return some tensor (multi-dimensional array) in the preprocess function.

Currently, make_batch_reader TransformSpec func only allow return one-dimensional array because of pyarrow format limitation.

How does the PR address it ?

This PR address this issue. The approach is:

Flatten the multi-dimensional array returned by TransformSpec func, and in ArrowReaderWorkerResultsQueueReader loading data code, reshape back to the specified shape. Note reshape is a in-place operation so it won't affect performance.

Manual test code

from petastorm import make_batch_reader
from petastorm.transform import TransformSpec

import os
import pandas as pd
import sys
import numpy as np
from pyspark.sql.functions import pandas_udf

@pandas_udf('array<float>')
def gen_array(v):
  return v.map(lambda x: np.random.rand(10))

df1 = spark.range(6).withColumn('v', gen_array('id'))

data_url = 'file:///tmp/t0001'
data_path = '/tmp/t0001'
df1.repartition(2).write.mode('overwrite').option("compression", "uncompressed").option("parquet.block.size", 1024 * 1024).parquet(data_url)

def preproc_fn(x):
  # reshape column 'v' to (2, 5) shape.
  x2 = pd.DataFrame({'v': x['v'].map(lambda x: x.reshape((2, 5))), 'id': x['id'] + 10000})
  return x2

spec = TransformSpec(
  preproc_fn,
  [('v', np.float32, (2, 5), False), ('id', np.int64, (), False)]
)

reader = make_batch_reader(data_url, num_epochs=1, transform_spec=spec)
for i in reader:
    print(i)
selitvin commented 4 years ago

Nice feature!

codecov[bot] commented 4 years ago

Codecov Report

Merging #504 into master will increase coverage by 0.23%. The diff coverage is 90.47%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #504      +/-   ##
==========================================
+ Coverage   85.77%   86.00%   +0.23%     
==========================================
  Files          79       81       +2     
  Lines        4190     4331     +141     
  Branches      665      683      +18     
==========================================
+ Hits         3594     3725     +131     
- Misses        494      500       +6     
- Partials      102      106       +4     
Impacted Files Coverage Δ
petastorm/reader.py 90.82% <ø> (ø)
petastorm/arrow_reader_worker.py 91.72% <90.47%> (-0.28%) :arrow_down:
petastorm/spark/__init__.py 100.00% <0.00%> (ø)
petastorm/spark/spark_dataset_converter.py 93.27% <0.00%> (ø)

Continue to review full report at Codecov.

Legend - Click here to learn more Δ = absolute <relative> (impact), ø = not affected, ? = missing data Powered by Codecov. Last update a61fe13...e918bad. Read the comment docs.

WeichenXu123 commented 4 years ago

@selitvin Ready. :)