pishen / sbt-lighter

SBT plugin for Apache Spark on AWS EMR
Apache License 2.0
57 stars 15 forks source link

adding support for passing spark related argument with spark submit l… #23

Closed alvinhenrick closed 7 years ago

alvinhenrick commented 7 years ago

…ike executor-cores , executor-memory etc.Check README for syntax and usage (use --sparkArgs arg0 ... and --jobArgs arg0 ...)

alvinhenrick commented 7 years ago

Just applied scalafmt to all Scala files.

pishen commented 7 years ago

@alvinhenrick I think introducing --sparkArgs and --jobArgs into the command line arguments may cause some concerns since it occupies two special keywords from the users. One cannot use --jobArgs in his application anymore while he might want to use it (actually he can, but with a confusing --jobArgs --jobArgs <arg> syntax, and it is not type-checked). And, for people who didn't read through the README carefully, he might accidentally abuse the --jobArgs argument in his own application and get an unexpected result.

How if we change the signature of

def submitJob(mainClass: String, args: Seq[String])

to

def submitJob(mainClass: String, args: Seq[String], sparkConfs: Map[String, String])

and forward the settings from sparkConfs to spark-submit in --conf <key>=<value> format? (AFAIK, most of the Spark configs including executor cores and executor memory could be setup in this format)

The new code in submitJob() may look like this:

val sparkSubmitArgs = Seq(
  "spark-submit",
  "--deploy-mode",
  "cluster",
  "--class",
  mainClass
) ++ sparkConfs.toSeq.flatMap {
  case (k, v) => Seq("--conf", k + "=" + v)
} ++ (s3Jar.toString +: args)

val step = new StepConfig()
  .withActionOnFailure(ActionOnFailure.CONTINUE)
  .withName("Spark Step")
  .withHadoopJarStep(
    new HadoopJarStepConfig()
      .withJar("command-runner.jar")
      .withArgs(sparkSubmitArgs.asJava)
  )

Then, maybe we can introduce a SettingKey called sparkConfs (the name may be ambiguous since we already have sparkEmrConfigs, but I haven't come out with a good name yet.)

val sparkConfs = settingKey[Map[String, String]]

and pass it into submitJob() from sparkSubmitJob.

And, if someone feel that it's still inconvenience, he can overwrite the behavior of sparkSubmitJob to parse --sparkArgs and --jobArgs by himself in his build.sbt.

Can this fulfill your need?

alvinhenrick commented 7 years ago

@pishen I agree with you on type safety and I thought about the exact same solution you mentioned but the the purpose was to handle the situation where we can create cluster and submit steps with different spark --conf on the command line without having to modify build.sbt and updating sparkConfs every time.Let me know if you have suggestion to handle that.

90% of the time we are tuning those parameters and poking around and having to modify sbt file will be painful for developers.

The spark confs have to be supplied before the s3Jar and jobArgs and to distinguish the arguments to pass I came up with the logic of sparkArgs and jobArgs and defaultArgs for backward compatibility.

we use sparkEmrConfigs for the properties which we apply to the whole cluster and don't change much.

I can make the change as you suggested but let's think about a little bit more before we make the change how can we accomplish this without having user to override submitJob in build.sbt parse -sparkArgs and jobsArgs.

Warm Regards, Alvin.

pishen commented 7 years ago

@alvinhenrick I'm sorry that according to the above mentioned concerns, I may not directly include the argument parser right now.

If what you want is not to modify build.sbt each time, I think overwriting sparkSubmitJob would be an acceptable workaround after the change of submitJob() since sparkSubmitJob is just a thin wrapper around submitJob() and overwriting it will be an one-time effort for each project. For example, if I want to pass the Spark confs like this:

> sparkSubmitJob --conf spark.executor.memory=2g --conf spark.driver.memory=1g arg0 arg1 arg2

I can overwrite the sparkSubmitJob as:

import sbt.complete.DefaultParsers._
import sbtemrspark.EmrSparkPlugin._
sparkSubmitJob := {
  Def.inputTaskDyn {
    implicit val log = streams.value.log
    implicit val emr = sparkEmrClientBuilder.value.build()
    val args = spaceDelimited("<arg>").parsed
    val mainClassValue = (mainClass in Compile).value.get

    val sparkConfs = args.grouped(2)
      .takeWhile(_.head == "--conf")
      .map(_.last.split("="))
      .map(a => a.head -> a.last)
      .toMap
    val jobArgs = args.grouped(2)
      .dropWhile(_.head != "--conf")
      .flatten.toList

    submitJob(mainClassValue, jobArgs, sparkConfs)
  }.evaluated
}

in my build.sbt.

Well, I'm not sure if this is a good solution and the design may still change in the future. Just writing down some ideas for now.

alvinhenrick commented 7 years ago

@pishen Sure no problem.

I will extend the plugin and override the the method for now.

By taking the approach of --conf also we are occupying one special keyword from user so you are right we might have to think about a better solution to resolve this issue with the plugin because everyone using the plugin would like to pass spark configuration related parameters.

Let me know your thoughts before I close the pull requests.

Thanks! Warm Regards, Alvin.

pishen commented 7 years ago

@alvinhenrick If you want to overwrite sparkSubmitJob in the approach I showed, you still have to add the parameter sparkConfs into sparkSubmit(). See if you would like to help on this.

BTW, maybe you already know, if you like to pass the Spark configurations from the command line, you can also parse and pass them into the SparkConf object from your application's code:

def main(args: Array[String]) = {
  val sparkConf = new SparkConf().setAll(
    args.grouped(2).toSeq
      .takeWhile(_.head == "--conf")
      .map(_.last.split("="))
      .map(arr => arr.head -> arr.last)
  )
  val sc = new SparkContext(sparkConf)
}
alvinhenrick commented 7 years ago

@pishen Correct!! You are right currently I am exactly doing that. Setting the sparkConf in code with submitWithMain because we have multiple main classes in the project. I was trying to make this plugin work for data scientists who are coding in Scala asking them to remember and refer those configs omg. I am still thinking about it how to solve it :).

I like the first approach via settingKey and in case needed we can override that.I wanted it to override with like spark.conf property file format.

val sparkConfs = settingKey[Map[String, String]]
I can create couple of property file for them like for based on the cluster we choose mx4.larger vs m4.2xlarge , m4.4xlarge etc.. different and number of nodes and inject spark conf into plugin so they can try different combination to make it work.

Thanks! Warm Regards, Alvin.

alvinhenrick commented 7 years ago

@pishen I have implemented the first approach and using sbt config created couple profiles to set sparkSubmitConfs and it works awesome .

Can you please have a look at it and merge ?

Thanks! for your suggestions.

alvinhenrick commented 7 years ago

Completed the change as requested. What are you using to format the code? Is it scalafmt ?

pishen commented 7 years ago

@alvinhenrick Could you check it again using git diff between two branches or see the diff here? I still see some unnecessary changes. For example, EmrConfig.scala and S3Url.scala should not appear in the above link since they are not related to this PR.

Yes, I use scalafmt now, but am still trying. You don't have to follow the style of scalafmt, just try to keep the contribution diff clean. I think git diff may give some help.