uber / petastorm

Petastorm library enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as Tensorflow, Pytorch, and PySpark and can be used from pure Python code.
Apache License 2.0
1.8k stars 284 forks source link

Petastorm "float division by zero" when applying filter predicate on a dataset which is partitioned on more than one column #487

Open jamesprinc3 opened 4 years ago

jamesprinc3 commented 4 years ago

Hello,

I spotted an error when running some code which I've managed to reproduce by modifying one of the petastorm tests:

from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType

from petastorm import make_reader
from petastorm.codecs import ScalarCodec
from petastorm.etl.dataset_metadata import materialize_dataset
from petastorm.predicates import in_lambda
from petastorm.unischema import dict_to_spark_row, Unischema, UnischemaField

import numpy as np
import pytest

def test_predicate_on_partitioned_dataset(tmpdir):
    """
    Generates a partitioned dataset and ensures that readers evaluate the type of the partition
    column according to the type given in the Unischema.
    """
    TestSchema = Unischema('TestSchema', [
        UnischemaField('id', np.int32, (), ScalarCodec(IntegerType()), False),
        UnischemaField('id2', np.int32, (), ScalarCodec(IntegerType()), False),
        UnischemaField('test_field', np.int32, (), ScalarCodec(IntegerType()), False),
    ])

    def test_row_generator(x):
        """Returns a single entry in the generated dataset."""
        return {'id': x,
                'id2': x+1,
                'test_field': x*x}

    rowgroup_size_mb = 256
    dataset_url = "file://{0}/partitioned_test_dataset".format(tmpdir)

    spark = SparkSession.builder.config('spark.driver.memory', '2g').master('local[2]').getOrCreate()
    sc = spark.sparkContext

    rows_count = 10
    with materialize_dataset(spark, dataset_url, TestSchema, rowgroup_size_mb):

        rows_rdd = sc.parallelize(range(rows_count))\
            .map(test_row_generator)\
            .map(lambda x: dict_to_spark_row(TestSchema, x))

        spark.createDataFrame(rows_rdd, TestSchema.as_spark_schema()) \
            .write \
            .partitionBy('id', 'id2') \
            .parquet(dataset_url)

    with make_reader(dataset_url, predicate=in_lambda(['id'], lambda x: x == 3)) as reader:
        assert next(reader).id == 3
    with make_reader(dataset_url, predicate=in_lambda(['id'], lambda x: x == '3')) as reader:
        with pytest.raises(StopIteration):
            # Predicate should have selected none, so a StopIteration should be raised.
            next(reader)

    print("all okay")

import tempfile
tmpfile = tempfile.TemporaryDirectory()
tmpdir = tmpfile.name
print(tmpdir)

test_predicate_on_partitioned_dataset(tmpdir)

tmpfile.cleanup()

The error (note the line numbers are a little different because I've added some printlns whilst debugging):

  File "/home/jamespr/horovod-env/venv/lib64/python3.6/site-packages/petastorm/py_dict_reader_worker.py", line 221, in _load_rows_with_predicate
    shuffle_row_drop_partition)
  File "/home/jamespr/horovod-env/venv/lib64/python3.6/site-packages/petastorm/py_dict_reader_worker.py", line 283, in _read_with_shuffle_row_drop
    partition_indexes = np.floor(np.arange(num_rows) / (float(num_rows) / min(num_rows, num_partitions)))
ZeroDivisionError: float division by zero

I've logged out the values of num_partitions and num_rows , the latter seems to be the suspect which is causing the division by zero error.

I've had a look through the code in py_dict_reader_worker.py but I'm not particularly familiar with a lot of the petastorm APIs, I'm hoping someone might have seen something similar before which will make it easier to get a fix out.

Versions:

pyarrow==0.15.1 petastorm==0.8.2

jamesprinc3 commented 4 years ago

After more digging I think I'm understanding what's going on a little more. The partition filter is resolved higher up the callstack (i.e. not on the worker), and at the moment only one level of partitioning is supported:

https://github.com/uber/petastorm/blob/a7d30cce377192ce2c748dda3a00e8e01565df92/petastorm/reader.py#L535

I think I've fallen into an interesting edge case where I'm partitioning by 2 columns but only filtering on one of them.

jamesprinc3 commented 4 years ago

Seems like a trivial fix to change line 535 mentioned in the comment above to read:

if set(predicate_fields).issubset(dataset.partitions.partition_names):

It works for my example but this feels far too easy

jamesprinc3 commented 4 years ago

I've opened this PR as a starting point: https://github.com/uber/petastorm/pull/488

I've not been able to get the relevant tests to run locally yet, so maybe Travis will give me some feedback in the meantime.