ywilkof / spark-jobs-rest-client

Fluent client for interacting with Spark Standalone Mode's Rest API for submitting, killing and monitoring the state of jobs.
Apache License 2.0
109 stars 58 forks source link

Environment settings not in effect #2

Closed florzanetta closed 7 years ago

florzanetta commented 8 years ago

Hi,

I'm using the following code to submit a job to a spark cluster but the environment settings are not being applied.

HashMap<String,String> env = new HashMap<>();
env.put("spark.ui.showConsoleProgress", "true");
env.put("spark.executor.memory","2g");
env.put("spark.cassandra.connection.host", cassandra_host);
env.put("spark.cassandra.connection.timeout_ms", "1000");
env.put("spark.cassandra.read.timeout_ms", "1000");

cli = SparkRestClient.builder()
    .masterHost(master)
    .environmentVariables(env)
    .sparkVersion("1.6.1")
    .build();

Also, I'm getting the following messages in the stdout of the driver:

Updated spark.cassandra.connection.host to <ip-of-other-host> // not the one I set in the config
Updated spark.hadoop.cassandra.host to <ip-of-other-host>
Updated spark.master to spark://<ip-of-master>:7077

Are you aware of any issues like this or why this could be happening?

ywilkof commented 8 years ago

Hi! I believe the parameters you are sending are not environment variables but rather spark application variables. As such, they should be passed per job submit.

cli.prepareJobSubmit()
                .mainClass(mainClass())
                .appName(appName())
                .appArgs(args)
                .appResource(props.getAppResource())
                .withProperties()
                .put("spark.executor.memory", "2g") 
                .put("spark.ui.showConsoleProgress", "true) 
...
.submit();

From usage, I can say that the spark.executor.memory has an affect when sent in the correct manner. Unfortunately I do not know if the cassandra integration related parameters will have an affect, will be interested to hear once you've tried it.

florzanetta commented 8 years ago

No, the cassandra setting did not have an effect. Can you think of any workarounds? should I switch to spark-jobserver? or settle with plain command line submit? Thanks!

ywilkof commented 8 years ago

Hi @florzanetta, It seems that any properties other than the basic spark properties are not inserted into the SparkConf, as happen when using the spark-submit or setting them in the spark-defaults.conf file. Therefore, the Cassandra properties are not automatically inserted into Spark context. This is a limitation with the way that Spark guys build the submit request from this Rest API. Don't know why they chose to do this.

A workaround could be sending these params in appArgs and then extracting them from there. For example:

def getParams(args: Array[String]): Map[String, String] ={
 args.map(arg => arg.split("="))
   .filter(arr => arr.size == 2)
   .map(arg => (arg(0), arg(1)))
   .filter(t => t._2.nonEmpty).toMap
}

and setting the extracted key-value params manually in the SparkConf, as explained in the Cassadra Connector documentation:

val conf = new SparkConf(true)
        .set("spark.cassandra.connection.host", "192.168.123.10")
        .set("spark.cassandra.auth.username", "cassandra")            
        .set("spark.cassandra.auth.password", "cassandra")