renaissance-benchmarks / renaissance

The Renaissance Benchmark Suite
https://renaissance.dev
GNU General Public License v3.0
312 stars 59 forks source link

Spark worker scaling parameters #260

Open ceresek opened 3 years ago

ceresek commented 3 years ago

Currently, the performance of the Spark benchmarks does not change with the configured number of executors, except for ALS, which partitions the input data based on the configuration. This may be a relevant note in the Spark documentation:

Spark properties mainly can be divided into two kinds: one is related to deploy, like “spark.driver.memory”, “spark.executor.instances”, this kind of properties may not be affected when setting programmatically through SparkConf in runtime, or the behavior is depending on which cluster manager and deploy mode you choose, so it would be suggested to set through configuration file or spark-submit command line options; another is mainly related to Spark runtime control, like “spark.task.maxFailures”, this kind of properties can be set in either way.

This is quite vague, but may explain why our code (https://github.com/renaissance-benchmarks/renaissance/blob/510b3b9e8f01dc397f35b64a7b5f8a943b75d012/benchmarks/apache-spark/src/main/scala/org/renaissance/apache/spark/SparkUtil.scala#L66) does not behave as expected.

ceresek commented 3 years ago

workers-page-rank

ceresek commented 3 years ago

... seeing how our Spark instance is hardcoded to run locally (https://github.com/renaissance-benchmarks/renaissance/blob/510b3b9e8f01dc397f35b64a7b5f8a943b75d012/benchmarks/apache-spark/src/main/scala/org/renaissance/apache/spark/SparkUtil.scala#L61) I'm not sure what we're trying to do with the worker instance count ?

lbulej commented 3 years ago

I was under the impression that the number of executors influenced the default number of partitions created by default when creating RDDs directly from files on disk. It was also one of the reasons I avoided creating RDDs by just preparing data collections in memory and calling parallelize() on them, which would avoid I/O, but use a different class underneath. Maybe it's something we should actually do (or maybe repartition the existing RDDs explicitly).

lbulej commented 3 years ago

... seeing how our Spark instance is hardcoded to run locally (

https://github.com/renaissance-benchmarks/renaissance/blob/510b3b9e8f01dc397f35b64a7b5f8a943b75d012/benchmarks/apache-spark/src/main/scala/org/renaissance/apache/spark/SparkUtil.scala#L61 ) I'm not sure what we're trying to do with the worker instance count ?

This was in the original code from the very beginning. I probably did a cursory check of the documentation at some point to see what that means, but at this point I only recall that the whole idea was to control the number of executors (thread pools) and the number of threads per executor. If this does not work as expected, then we should revisit this completely.

ceresek commented 3 years ago

I think in local mode there is always only one (in process) Executor (with one thread pool) created here https://github.com/apache/spark/blob/7ce7aa47585f579a84dc6dd8f116a48174cba988/core/src/main/scala/org/apache/spark/SparkContext.scala#L2873 and here https://github.com/apache/spark/blob/7ce7aa47585f579a84dc6dd8f116a48174cba988/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala#L62. We can only tweak the number of threads.

Not sure if some other functions (like partitioning) react to the setting of spark.executor.instances even in local mode, my guess is this would be unlikely (since there is also the option to allocate workers dynamically and therefore static value is possibly wrong).

If we aim for single JVM execution, then I think we can drop the executor count thing, as well as few other config bits, and just set the master to local[N] where N is the number of cores used.

lbulej commented 3 years ago

The thing with controlling the number of executor instances appears to originate from #145 and #147 and at that time, it seemed to work for @farquet.

Looking at the allowed master URLs, the recommended setting for local master appears to be to set the number of worker threads to the number of cores on the machine, i.e., local[*], which however led to the problem @farquet observed in #145 (on a big machine) in the first place.

I wonder if we should perhaps set the master to local[*] and see if we can limit the level of parallelism by explicitly setting the number of partitions? It's probably not much different from using local[N] with a benchmark-specific N, I was just thinking that if partitioning can control the parallelism, I would not push it into Spark configuration and instead make it part of "job description".

lbulej commented 3 years ago

274 removes the configuration of executor instances (along with the benchmark parameter) as well as explicit input data partitioning (for now).

I was wondering whether it would make sense to have a parameter, e.g., input_partititions_scaling, as a multiplier of spark_thread_count to control the number of input data partitions. Reasonable values of the parameter would be probably limited to 0 (producing 1 partition), 0.5, 1, 1.5 (maybe), and 2.

lbulej commented 3 years ago

I have updated the PR and the measurement bundle (plugins work now). For testing, I added als-ml benchmark, which uses ml.ALS instead of mllib.ALS. Both do a conversion to RDD, but the mllib version seems more efficient.

ceresek commented 3 years ago

Adding a whole bunch of plots showing how performance depends on the number of threads for individual benchmarks, collected on dual socket Xeon Gold 6230 (2 packages, 20 cores per package, 2 threads per core).


threads-als


threads-als-ml


threads-chi-square


threads-dec-tree


threads-gauss-mix


threads-log-regression


threads-movie-lens


threads-naive-bayes


threads-page-rank

ceresek commented 3 years ago

Assuming other machines behave similarly, I think we should cap the number of threads used as follows (with a warning if more cores are available):

That is until we tackle the scaling issue more systematically.