uber / petastorm

Petastorm library enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as Tensorflow, Pytorch, and PySpark and can be used from pure Python code.
Apache License 2.0
1.8k stars 284 forks source link

issue with Unischema to spark schema conversion #451

Open msaisumanth opened 4 years ago

msaisumanth commented 4 years ago

I'm trying to create a parquet dataset using petastorm (following the instructions here). H

here's my Unischema definition:

schema = Unischema('x', 
      [UnischemaField('intent', np.int32, (), ScalarCodec(IntegerType()), False), 
       UnischemaField('text_char', np.int8, (265,1), NdarrayCodec()), 
        UnischemaField('text_word', np.int8, (67,1), NdarrayCodec())
])

Basically I have an integer type column and 2 arrays (of integers). Here's the same schema converted to spark schema using schema.as_spark_schema():

StructType(List(
StructField(intent,IntegerType,false),
StructField(text_char,BinaryType,false),
StructField(text_word,BinaryType,false)))

This leads to a failure in creating the dataset:

Traceback (most recent call last):
  File "<console>", line 1, in <module>
  File "/Users/smiryala/ludwig/env/lib/python3.7/site-packages/pyspark/sql/session.py", line 748, in createDataFrame
    rdd, schema = self._createFromLocal(map(prepare, data), schema)
  File "/Users/smiryala/ludwig/env/lib/python3.7/site-packages/pyspark/sql/session.py", line 416, in _createFromLocal
    struct = self._inferSchemaFromList(data, names=schema)
  File "/Users/smiryala/ludwig/env/lib/python3.7/site-packages/pyspark/sql/session.py", line 348, in _inferSchemaFromList
    schema = reduce(_merge_type, (_infer_schema(row, names) for row in data))
  File "/Users/smiryala/ludwig/env/lib/python3.7/site-packages/pyspark/sql/session.py", line 348, in <genexpr>
    schema = reduce(_merge_type, (_infer_schema(row, names) for row in data))
  File "/Users/smiryala/ludwig/env/lib/python3.7/site-packages/pyspark/sql/types.py", line 1064, in _infer_schema
    fields = [StructField(k, _infer_type(v), True) for k, v in items]
  File "/Users/smiryala/ludwig/env/lib/python3.7/site-packages/pyspark/sql/types.py", line 1064, in <listcomp>
    fields = [StructField(k, _infer_type(v), True) for k, v in items]
  File "/Users/smiryala/ludwig/env/lib/python3.7/site-packages/pyspark/sql/types.py", line 1038, in _infer_type
    raise TypeError("not supported type: %s" % type(obj))
TypeError: not supported type: <class 'numpy.int8'>

The two array definitions are being interpreted as BinaryTypes. Am I doing something wrong?

selitvin commented 4 years ago

It seems that you are trying to store np.uint8 directly into a spark schema. The row needs to be 'encoded' using dict_to_spark_row function as shown here: https://github.com/uber/petastorm/blob/7f073b9c8b15bf0aae226f5606a2f4d7cc413bfc/examples/hello_world/petastorm_dataset/generate_petastorm_dataset.py#L56

This function will convert np.uint8 into the binary type expected. Hope this helps.

msaisumanth commented 4 years ago

@selitvin I fixed that.

rdd.first()
Row(intent=2, text_char=array([14,  3,  9,  2, 14,  3,  9,  2, 14,  3,  9,  2, 14,  3,  9,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0], dtype=int8), text_word=array([7, 7, 7, 7, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0], dtype=int16))

The trouble is with the schema. When I convert the unischema to spark schema (using as_spark_schema()) , the array type is being converted to BinaryType. So when I try to save the parquet file, I get the following error:

Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/smiryala/ludwig/env/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/Users/smiryala/ludwig/env/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/Users/smiryala/ludwig/env/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 393, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/Users/smiryala/ludwig/env/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "/Users/smiryala/ludwig/env/lib/python3.7/site-packages/pyspark/sql/session.py", line 730, in prepare
    verify_func(obj)
  File "/Users/smiryala/ludwig/env/lib/python3.7/site-packages/pyspark/sql/types.py", line 1389, in verify
    verify_value(obj)
  File "/Users/smiryala/ludwig/env/lib/python3.7/site-packages/pyspark/sql/types.py", line 1363, in verify_struct
    verifier(obj[f])
  File "/Users/smiryala/ludwig/env/lib/python3.7/site-packages/pyspark/sql/types.py", line 1389, in verify
    verify_value(obj)
  File "/Users/smiryala/ludwig/env/lib/python3.7/site-packages/pyspark/sql/types.py", line 1383, in verify_default
    verify_acceptable_types(obj)
  File "/Users/smiryala/ludwig/env/lib/python3.7/site-packages/pyspark/sql/types.py", line 1278, in verify_acceptable_types
    % (dataType, obj, type(obj))))
TypeError: field text_char: BinaryType can not accept object array([14,  3,  9,  2, 14,  3,  9,  2, 14,  3,  9,  2, 14,  3,  9,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0], dtype=int8) in type <class 'numpy.ndarray'>

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:244)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
    ... 10 more
selitvin commented 4 years ago

Would be happy to see a functional code snippet. Did you end up calling dict_to_spark_row (this is the by-design way of doing what you were trying to do) ?

msaisumanth commented 4 years ago

data is of this format:

{
'a': [...],
'b': [...],
'c': [...]
}

row generator returns a dictionary like {'a': '..', 'b': '..', 'c': '..'}

def save_parquet(data_fp, data, num_rows):

    def row_generator(i):
        out = {}
        for key in data:
            out[key] = data[key][i]
        return out

    spark = SparkSession.builder.config(
        'spark.driver.memory', '2g').master('local[2]').getOrCreate()
    sc = spark.sparkContext
    schema = Unischema(
        'ParquetSchema', [UnischemaField(key, data[key].dtype, data[key].shape, ) for key in data]
    )
    output_url = 'file://{}'.format(data_fp)

    with materialize_dataset(spark, output_url, schema):
        rdd = sc.parallelize(range(num_rows)).map(row_generator).map(
            lambda x: dict_to_spark_row(schema, x)
        )
        spark.createDataFrame(rdd, schema.as_spark_schema()).coalesce(10).write.mode('overwrite').parquet(output_url)

You can see the data in the rdd as shown in the previous comment.

msaisumanth commented 4 years ago

actually, i used the following schema:

schema = Unischema('x', 
[UnischemaField('intent', np.int32, (), ScalarCodec(IntegerType()), False), UnischemaField('text_char', np.int8, (265,1), NdarrayCodec()), 
UnischemaField('text_word', np.int8, (67,1), NdarrayCodec())]
)
selitvin commented 4 years ago

I can try to reproduce the issue and help you figure out the issue. Would be greate if you could paste here a fully runnable code sample I could run to reproduce the issue. The Unischema('x', ... in the second code snippet does not match the {'a': '..', 'b': '..', 'c': '..'} input data layout from the first one, so I am a little bit confused about what exactly you are running.

selitvin commented 4 years ago

@msaisumanth, is there anything left to investigate in this issue, or we can close it?