twosigma / flint

A Time Series Library for Apache Spark
Apache License 2.0
995 stars 184 forks source link

fix fromNormalizedSortedRDD range split array indexes #23

Closed alazareva closed 6 years ago

alazareva commented 6 years ago

This fixes a bug in the Conversion.fromNormalizedSortedRDD. Currently rangeSplits are generated from a map (partitionId -> (parentPartitionId, K)). The ordering of the keys in the map is not guaranteed so the array of RangeSplits does not contain sorted OrderedRDDPartitions. Therefore, using the partitionId to get the range split from the rangeSplits array will return an arbitrary range split causing errors during range validation.

I modified the test for it so that the bug can be reproduced if the fix is not present.

Here the solution is to sort the range split array by the id of the partition. This is a minor fix so I didn't send in a contributor agreement thingy.

icexelloss commented 6 years ago

@alazareva Thanks for the patch!

Can you please fix this by changing https://github.com/alazareva/flint/blob/28c669181e73c263ec18b1b229cfbb30bed87d12/src/main/scala/com/twosigma/flint/rdd/Conversion.scala#L137

To:

val indexMapping = SortedMap[Int, (Int, K)]() ++ partitionToFirstKey.toSeq.sortBy(_._1).zipWithIndex.map(_.swap)

?

I prefer to fix it this way.

alazareva commented 6 years ago

@icexelloss All set.

icexelloss commented 6 years ago

LGTM, thank you for the patch!