uber / petastorm

Petastorm library enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as Tensorflow, Pytorch, and PySpark and can be used from pure Python code.
Apache License 2.0
1.78k stars 285 forks source link

make_spark_converter returns Numpy in binary serialized format. #582

Open apatsekin opened 4 years ago

apatsekin commented 4 years ago

Hey guys! If I combine two examples from Tutorial:

  1. to create Spark dataframe with Petastorm schema
  2. then try to convert it to TF Dataset

Numpy array field during iteration of TF Dataset is returned like: b"\x93NUMPY\x01\x00v\x00{'descr': '<i4', 'fortran_order': False, 'shape': (5, 5), } which looks like serialized binary NPY format.

How to decode it properly?

import numpy as np
from petastorm.codecs import CompressedImageCodec, NdarrayCodec, ScalarCodec
from petastorm.etl.dataset_metadata import materialize_dataset
from petastorm.unischema import Unischema, UnischemaField, dict_to_spark_row
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType
from petastorm.spark import SparkDatasetConverter, make_spark_converter

HelloWorldSchema = Unischema('HelloWorldSchema', [
    UnischemaField('id', np.int32, (), ScalarCodec(IntegerType()), False),
    UnischemaField('test_arr', np.int32, (5, 5), NdarrayCodec(), False),
])

def row_generator(x):
    """Returns a single entry in the generated dataset. Return a bunch of random values as an example."""
    return {'id': x,
            'test_arr': np.random.randint(1, 9, size=(5, 5), dtype=np.int32)}

def generate_hello_world_dataset(output_url='file:///tmp/hello_world_dataset'):
    rows_count = 10
    rowgroup_size_mb = 256
    with materialize_dataset(spark, output_url, HelloWorldSchema, rowgroup_size_mb):
        rows_rdd = sc.parallelize(range(rows_count)) \
            .map(row_generator) \
            .map(lambda x: dict_to_spark_row(HelloWorldSchema, x))

        return spark.createDataFrame(rows_rdd, HelloWorldSchema.as_spark_schema())

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
sparkDf = generate_hello_world_dataset('file:///tmp/test_dataset.spark')
spark.conf.set(SparkDatasetConverter.PARENT_CACHE_DIR_URL_CONF, 'file:///tmp/tf_dataset_cache')
converter = make_spark_converter(sparkDf)

with converter.make_tf_dataset(batch_size=1, num_epochs=1) as dataset:
    for elem in dataset:
        print(elem.test_arr.numpy())

I came up with this, but not sure it's intended approach:

with converter.make_tf_dataset(batch_size=1, num_epochs=1) as dataset:
    # parsed_dataset = dataset.map(_parse_function)
    for elem in dataset:
        bytes_ = elem.test_arr.numpy()[0]
        print(NdarrayCodec().decode(None, bytes_))