uber / petastorm

Petastorm library enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as Tensorflow, Pytorch, and PySpark and can be used from pure Python code.
Apache License 2.0
1.78k stars 285 forks source link

Incorrect order of row groups when reading #551

Closed hig-dev closed 4 years ago

hig-dev commented 4 years ago

My goal is to read the created dataset in the order in which I generated the rows. However if the row group size is set to a value lower than the total size of the dataset, the order when reading the dataset is wrong, despite setting shuffle_row_groups=False.

Please look at this demonstration of this problem. I would expect that the exception does not occur.

import pathlib
import numpy as np
from petastorm import make_reader
from petastorm.codecs import ScalarCodec

from petastorm.etl.dataset_metadata import materialize_dataset
from petastorm.unischema import Unischema, UnischemaField, dict_to_spark_row
from pyspark.sql import SparkSession
from pyspark.sql.types import LongType

output_directory = pathlib.Path('./_generated_demo_data')
output_url = output_directory.resolve().as_uri()

session_builder = SparkSession \
    .builder \
    .appName('Demo')

spark = session_builder.getOrCreate()
sc = spark.sparkContext

schema = Unischema('DemoSchema', [
        UnischemaField('timestamp', np.uint64, (), ScalarCodec(LongType()), False),
    ])

# Generate petastorm with timestamps in order
with materialize_dataset(spark, output_url, schema, row_group_size_mb=1):
    generator = enumerate(range(1000000))
    rows_dict_generator = map(lambda x: {'timestamp': x[0]}, generator)
    rows_spark_generator = map(lambda x: dict_to_spark_row(schema, x), rows_dict_generator)
    rows_rdd = sc.parallelize(rows_spark_generator)

    spark.createDataFrame(rows_rdd, schema.as_spark_schema()) \
        .coalesce(1) \
        .write \
        .mode('overwrite') \
        .parquet(output_url)

# Read generated petastorm and check timestamps ordering
last_timestamp = -float("inf")
with make_reader(output_url,
                 schema_fields=['timestamp'],
                 shuffle_row_groups=False) as reader:
    for row in reader:
        # ensure timestamp ordering or num_epochs handling
        if row.timestamp < last_timestamp:
            raise Exception('Timestamps in petastorm are not in order!')

        last_timestamp = row.timestamp
selitvin commented 4 years ago

This is likely to be a result of a race between multiple reader threads. Try passing make_reader(..., workers_count=1) - that should make reading order deterministic. Unfortunately, you are going to get lower throughput rate.

This problem can be properly mitigated by adding a reordering queue to petastorm implementation, but we do not have it right now.

hig-dev commented 4 years ago

Thanks for the tip. The workaround of setting workers_count=1 in make_reader did work. I will let you decide, if you want to close this issue.