uber / RemoteShuffleService

Remote shuffle service for Apache Spark to store shuffle data on remote servers.
Other
321 stars 100 forks source link

[WIP] Unsafe shuffle writer support in RSS #53

Open mayurdb opened 3 years ago

mayurdb commented 3 years ago

Key traits

Details

Implementation uses Java's unsafe APIs for acquiring large chunks of memory. Tuple of partition Id and memory location where a record is stored in memory is stored in an array. The advantage of doing this is that the data can be sorted by just sorting the metadata array on. Data before spilling is read into chunks of configurable size and sent over the network. Similar to the above two approaches, this approach also interfaces with the TMM to acquire more memory for storing records or for expanding the metadata array.

Open Source Spark has already implemented the unsafe shuffle writer and currently gets used for most executing of the shuffle writes. Components from Spark’s implementation around memory allocation, storing data in memory and metadata based sort were reused in the implementation. Logic around spilling triggers and around reading data from the memory had to be changed to be compatible and more importantly performant with RSS.

image

Performance Numbers image

TODOs:

  1. This is still a WIP and needs refactoring to be production ready
  2. ShuffleWithAggregationTest needs to be fixed as some UTs are failing with the records read/written checks
  3. The existing Scala UTs are passing. New UTs need to be added
  4. RadixSort, RssPackedRecordPointer, RssShuffleInMemorySorter, ShuffleSortDataFormat are just copy pasted from Spark code. They are package private in Spark but even after creating same package structure in RSS, they throw IllegalAccessError at runtime. This needs to be fixed, so that we don't need to add those classes
  5. General code cleanup
vladhlinsky commented 3 years ago

RadixSort, RssPackedRecordPointer, RssShuffleInMemorySorter, ShuffleSortDataFormat are just copy pasted from Spark code. They are package private in Spark but even after creating same package structure in RSS, they throw IllegalAccessError at runtime. This needs to be fixed, so that we don't need to add those classes

@mayurdb, I've deleted RadixSort, RssPackedRecordPointer, RssShuffleInMemorySorter, ShuffleSortDataFormat and modified RssShuffleExternalSorter to use Spark's classes. This worked fine in my environment with Spark version 2.4.3 and 2.4.4.

Tested locally in the spark-shell as follows:

...
scala> sc.parallelize(1 to 10, 2).map(x => (x % 3, x)).groupByKey(3).collect()
res0: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(6, 9, 3)), (1,CompactBuffer(7, 10, 1, 4)), (2,CompactBuffer(8, 2, 5)))

scala> sc.parallelize(1 to 10, 2).map(x => (x % 3, x)).reduceByKey((val1, _) => val1).collect()
res1: Array[(Int, Int)] = Array((0,6), (2,8), (1,7))

The IllegalAccessError could be thrown if the definition of a class has incompatibly changed. I think this may indicate that the Spark version used to compile RSS jars differs from the version, which is available on cluster nodes. Please, correct me If I'm missing something.

mayurdb commented 3 years ago

RadixSort, RssPackedRecordPointer, RssShuffleInMemorySorter, ShuffleSortDataFormat are just copy pasted from Spark code. They are package private in Spark but even after creating same package structure in RSS, they throw IllegalAccessError at runtime. This needs to be fixed, so that we don't need to add those classes

@mayurdb, I've deleted RadixSort, RssPackedRecordPointer, RssShuffleInMemorySorter, ShuffleSortDataFormat and modified RssShuffleExternalSorter to use Spark's classes. This worked fine in my environment with Spark version 2.4.3 and 2.4.4.

Tested locally in the spark-shell as follows:

...
scala> sc.parallelize(1 to 10, 2).map(x => (x % 3, x)).groupByKey(3).collect()
res0: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(6, 9, 3)), (1,CompactBuffer(7, 10, 1, 4)), (2,CompactBuffer(8, 2, 5)))

scala> sc.parallelize(1 to 10, 2).map(x => (x % 3, x)).reduceByKey((val1, _) => val1).collect()
res1: Array[(Int, Int)] = Array((0,6), (2,8), (1,7))

The IllegalAccessError could be thrown if the definition of a class has incompatibly changed. I think this may indicate that the Spark version used to compile RSS jars differs from the version, which is available on cluster nodes. Please, correct me If I'm missing something.

Oh that's great. These classes are just package private, so ideally they should just work for all the cases where the package structure is replicated. Also, the compile and runtime Spark jars will be different in most of the cases while using RSS.

I actually haven't looked into the issue as I just wanted to try this out and get the performance number first. I will check the details of the IllegalAccessError and get back

mayurdb commented 3 years ago

RadixSort, RssPackedRecordPointer, RssShuffleInMemorySorter, ShuffleSortDataFormat are just copy pasted from Spark code. They are package private in Spark but even after creating same package structure in RSS, they throw IllegalAccessError at runtime. This needs to be fixed, so that we don't need to add those classes

@mayurdb, I've deleted RadixSort, RssPackedRecordPointer, RssShuffleInMemorySorter, ShuffleSortDataFormat and modified RssShuffleExternalSorter to use Spark's classes. This worked fine in my environment with Spark version 2.4.3 and 2.4.4.

Tested locally in the spark-shell as follows:

...
scala> sc.parallelize(1 to 10, 2).map(x => (x % 3, x)).groupByKey(3).collect()
res0: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(6, 9, 3)), (1,CompactBuffer(7, 10, 1, 4)), (2,CompactBuffer(8, 2, 5)))

scala> sc.parallelize(1 to 10, 2).map(x => (x % 3, x)).reduceByKey((val1, _) => val1).collect()
res1: Array[(Int, Int)] = Array((0,6), (2,8), (1,7))

The IllegalAccessError could be thrown if the definition of a class has incompatibly changed. I think this may indicate that the Spark version used to compile RSS jars differs from the version, which is available on cluster nodes. Please, correct me If I'm missing something.

@vladhlinsky did you run these commands from within the Intellij/Any other IDE or ran a spark-submit externally and passed the RSS jars? Also, if the command was ran externally, can you please confirm if RSS was used?

These classes are package private in Spark. To be able to access them, we will need same package structure and also both the interface and implementation should be loaded by same class loader. Looks like I'm hitting the second issue.

mayurdb commented 3 years ago

RadixSort, RssPackedRecordPointer, RssShuffleInMemorySorter, ShuffleSortDataFormat are just copy pasted from Spark code. They are package private in Spark but even after creating same package structure in RSS, they throw IllegalAccessError at runtime. This needs to be fixed, so that we don't need to add those classes

@mayurdb, I've deleted RadixSort, RssPackedRecordPointer, RssShuffleInMemorySorter, ShuffleSortDataFormat and modified RssShuffleExternalSorter to use Spark's classes. This worked fine in my environment with Spark version 2.4.3 and 2.4.4. Tested locally in the spark-shell as follows:

...
scala> sc.parallelize(1 to 10, 2).map(x => (x % 3, x)).groupByKey(3).collect()
res0: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(6, 9, 3)), (1,CompactBuffer(7, 10, 1, 4)), (2,CompactBuffer(8, 2, 5)))

scala> sc.parallelize(1 to 10, 2).map(x => (x % 3, x)).reduceByKey((val1, _) => val1).collect()
res1: Array[(Int, Int)] = Array((0,6), (2,8), (1,7))

The IllegalAccessError could be thrown if the definition of a class has incompatibly changed. I think this may indicate that the Spark version used to compile RSS jars differs from the version, which is available on cluster nodes. Please, correct me If I'm missing something.

@vladhlinsky did you run these commands from within the Intellij/Any other IDE or ran a spark-submit externally and passed the RSS jars? Also, if the command was ran externally, can you please confirm if RSS was used?

These classes are package private in Spark. To be able to access them, we will need same package structure and also both the interface and implementation should be loaded by same class loader. Looks like I'm hitting the second issue.


scala> Class.forName("org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter").getClassLoader()
res19: ClassLoader = sun.misc.Launcher$AppClassLoader@36aa7bc2

// Class for RSS code base
scala>  Class.forName("org.apache.spark.shuffle.sort.RssShuffleExternalSorter").getClassLoader()
res20: ClassLoader = scala.reflect.internal.util.ScalaClassLoader$URLClassLoader@2634d000```
vladhlinsky commented 3 years ago

@mayurdb, I ran a spark-shell locally and verified that RSS was used, but in my case classes were loaded by the same class loader, so looks like it's really the cause of the issue:

scala> Class.forName("org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter").getClassLoader()
res2: ClassLoader = sun.misc.Launcher$AppClassLoader@2f0e140b

scala> Class.forName("org.apache.spark.shuffle.sort.RssShuffleExternalSorter").getClassLoader()
res3: ClassLoader = sun.misc.Launcher$AppClassLoader@2f0e140b
YutingWang98 commented 1 year ago

Hi @mayurdb, we have also been experiencing memory and map stage latency issues using Rss. We plan to test and work on this implementation as well. Wondering if you have any updates about this PR that you can share with us. Many thanks :)