twosigma / flint

A Time Series Library for Apache Spark
Apache License 2.0
995 stars 184 forks source link

Fix parquet file loading #16

Open jm771 opened 7 years ago

jm771 commented 7 years ago

As of Spark 2.0, spark stopped guaranteeing parquet files will be loaded in the order that they were saved to disk. This is discussed here: https://issues.apache.org/jira/browse/SPARK-20144 with the issue being reported by @icexelloss I believe. This issue is illustrated in the test "it should "load from parquet"" in TimeSeriesRDDSpec.scala which fails without the rest of the pull request.

However, this issue can be resolved by sorting the partitions by the headers we've already loaded for them. The logic for this sorting is found in HeaderOrdering in RangeDependency.scala. I've fallen back to the old behaviour of sorting by partition index in the case where both partitions contain identical keys, because without this in place a lot of unit tests fail.

This logic is tested in RangeDependencySpec.scala

The rest of the pull request is just swapping from sorting by partition index to sorting using this new ordering, and in Conversion.scala preventing the PartitionsIterator from undoing our hard work sorting things. Given we've now sorted our headers by their keys, the check that headers are sorted by keys is no longer necessary in RangeDependency.scala

CLA has been emailed in.

icexelloss commented 7 years ago

@jm771 Thank you for the patch. This makes sense. Internally we have patched Spark to fix this issue but this makes sense to do it in Flint.

This somewhat changes the meaning of "sorted" from "data is sorted" to "data inside each partition is sorted". I think this is fine but need to think more carefully about any possible undesirable behavior.

jm771 commented 7 years ago

I believe that we don't really change that understanding at any other level than the lowest. At the end of fromSortedRDD we wrap up our newly normalised RDD in our a fresh OrderedRDD, this RDD has its partitions sorted so that data value is increasing in index. So I guess now for a "sorted RDD", that we are loading from, we don't assume that the partitions will be ordered, but an OrderedRDD definitely still will have sorted partitions.

The other thing worth mentioning is I only fixed the path that TimeSeriesRDD.fromParquet takes. So if you ever wanted to write a method called say "fromNormalizedParquet" then the fromNormalizedSortedRDD method would also need tweaking.

dgrnbrg commented 4 years ago

Hey @icexelloss, if I wanted to revive and finish this PR, I see that you had a comment about which keys are sorted. Is that, as far as you know, the only block to this PR landing?