uber / petastorm

Petastorm library enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as Tensorflow, Pytorch, and PySpark and can be used from pure Python code.
Apache License 2.0
1.8k stars 284 forks source link

Sort During URL Normalization #621

Closed voganrc closed 2 years ago

voganrc commented 3 years ago

I have the following script that converts a Spark DataFrame into a TensorFlow Dataset:

from tempfile import TemporaryDirectory

from petastorm.spark import make_spark_converter, SparkDatasetConverter
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
        .master("local[2]")
        .config(SparkDatasetConverter.PARENT_CACHE_DIR_URL_CONF, f"file://{TemporaryDirectory().name}")
        .getOrCreate()
)

df = spark.createDataFrame(
    data=[[i] for i in range(10)],
    schema=['index']
)

spark_converter = make_spark_converter(df)
for url in spark_converter.file_urls:
    print(url)

with spark_converter.make_tf_dataset(
    batch_size=2,
    shuffle_row_groups=False,
    workers_count=1,
    num_epochs=1,
) as tf_dataset:
    for batch in tf_dataset:
        print(batch.index)

Sometimes it outputs:

tf.Tensor([0 1], shape=(2,), dtype=int64)
tf.Tensor([2 3], shape=(2,), dtype=int64)
tf.Tensor([4 5], shape=(2,), dtype=int64)
tf.Tensor([6 7], shape=(2,), dtype=int64)
tf.Tensor([8 9], shape=(2,), dtype=int64)

But other times it outputs:

tf.Tensor([5 6], shape=(2,), dtype=int64)
tf.Tensor([7 8], shape=(2,), dtype=int64)
tf.Tensor([9 0], shape=(2,), dtype=int64)
tf.Tensor([1 2], shape=(2,), dtype=int64)
tf.Tensor([3 4], shape=(2,), dtype=int64)

I've found that the first happens when spark_converter.file_urls is:

[
    file://.../part-00000-7c8be26f-e43d-4432-8596-da41e883de93-c000.parquet,
    file://.../part-00001-7c8be26f-e43d-4432-8596-da41e883de93-c000.parquet
]

And the second happens when spark_converter.file_urls is:

[
    file://.../part-00001-1eb56383-3219-4545-8f85-e395ad1a5468-c000.parquet,
    file://.../part-00000-1eb56383-3219-4545-8f85-e395ad1a5468-c000.parquet
]

Sorting the file urls fixes this though, and results in a deterministic read order.

CLAassistant commented 3 years ago

CLA assistant check
All committers have signed the CLA.

codecov[bot] commented 3 years ago

Codecov Report

Merging #621 (4fb6b1a) into master (f8c427c) will not change coverage. The diff coverage is 100.00%.

Impacted file tree graph

@@           Coverage Diff           @@
##           master     #621   +/-   ##
=======================================
  Coverage   85.31%   85.31%           
=======================================
  Files          85       85           
  Lines        4929     4929           
  Branches      783      783           
=======================================
  Hits         4205     4205           
  Misses        584      584           
  Partials      140      140           
Impacted Files Coverage Δ
petastorm/reader.py 89.32% <100.00%> (ø)

Continue to review full report at Codecov.

Legend - Click here to learn more Δ = absolute <relative> (impact), ø = not affected, ? = missing data Powered by Codecov. Last update f8c427c...4fb6b1a. Read the comment docs.

selitvin commented 3 years ago

I am not sure if this PR fixes all sources of rows order determinism. Once workers_count>1, there would be a race between workers resulting in rows order determinism. It should be possible to add a reordering buffer to make sure the order of rows is deterministic (I think pytorch does something similar with its DataLoader), but that would be a bit larger effort. I am ok with landing this PR to add some more stability to the order.

selitvin commented 2 years ago

Hi. Do you plan keep working on this PR, or we should close it?

voganrc commented 2 years ago

Ok, I'll close it