lifeomic / sparkflow

Easy to use library to bring Tensorflow on Apache Spark
MIT License
298 stars 46 forks source link

Spark settings #27

Open GeoffDuniam opened 5 years ago

GeoffDuniam commented 5 years ago

We're wanting to use SparkFlow for source detection on radio astronomy images, but even on the MNINST example jobs with larger iteration counts are failing. Given the 60,000 images in the MNINST dataset, if we split that data into train and test sets (60% 40%) we want to train on ~ 36021 images; mini batch size of 200 over 5 epochs means we need ~ 900 iterations. Job starts, executors begin processing until all but one executor fails with the error message below

19/05/06 09:26:13 INFO python.PythonRunner: Times: total = 1796882, boot = -1772, init = 1794, finish = 1796860 19/05/06 09:26:13 INFO executor.Executor: Finished task 13.0 in stage 4.0 (TID 45). 2409 bytes result sent to driver 19/05/06 09:36:16 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM 19/05/06 09:36:16 INFO storage.DiskBlockManager: Shutdown hook called 19/05/06 09:36:16 INFO util.ShutdownHookManager: Shutdown hook called

Spark session configuration for the job

spark2-submit \ --master yarn \ --deploy-mode client \ --executor-memory 6g \ --executor-cores 1 \ --driver-memory 12g \ --driver-cores 5 \ --conf spark.yarn.appMasterEnv.RGZ_ENV=true \ --conf spark.shuffle.service.enabled=true \ --conf spark.dynamicAllocation.enabled=true \ --conf spark.dynamicAllocation.executorIdleTimeout=600 \ --conf spark.executor.memoryOverhead=4096 \ --conf spark.driver.memoryOverhead=4096 \ --conf spark.default.parallelism=100 \ --conf spark.pyspark.python=/home/hduser/.virtualenvs/ClaRAN/bin/python \ --conf spark.pyspark.driver.python=/home/hduser/.virtualenvs/ClaRAN/bin/python \ RunSparkflowDemo.py >spark_submitPara10execs1Cpu.log 2>&1 &

The MNINST data has been loaded into a partitioned PARQUET table on the underlying HDFS data store. In this example, the data frame has 14 partitions.

SparkAsyncDL configuration - documentation on the partitions parameter is a little sparse, assume for best performance we match the number of partitions that the data frame has?

spark_model = SparkAsyncDL( inputCol='features', tensorflowGraph=mg, tfInput='x:0', tfLabel='y:0', tfOutput='out:0', tfLearningRate=.001, partitions=14, miniBatchSize=200, miniStochasticIters=1, shufflePerIter=False, iters=900, predictionCol='predicted', labelCol='labels', verbose=1 )

Cluster is running Spark 2.4.1 and the Python version for this job is 2.7.

Long term, we want to be able to use tensorflow for source finding and object classification on very large astronomical datasets (certainly in the tens or hundreds of terabytes range, and it's entirely possible possibly larger data products will be approaching petabyte size) on appropriately sized clusters.

Current cluster size is

Assuming our understanding of the required Spark parameters (particularly the requirement for 1 cpu per executor) and SparkAsyncDL parameters (particularly the relationship between epochs, mini batches and iterations) is correct, we're not sure why the jobs keep failing; Obviously we are missing some point and if we can;t get the MNINST example running properly it'd be counter productive to try fitting an existing tensorflow application. Any assistance you could give us would be greatly appreciated.

Regards

Geoff

dmmiller612 commented 5 years ago

I'll take a look, but we haven't tested on Spark 2.4 yet. I don't know what specifically wouldn't work, but I have personally had some issues with other Spark jobs I have managed with the bump to 2.4 (stalling jobs with some YARN issues). I am actually in the process in moving support to the new barrier executor mode, which will help even more with 2.4 support. It will just take some time, since we also want backwards compatibility.

GeoffDuniam commented 5 years ago

thanks for the quick reply - in the meantime, worthwhile trying on Spark 1.6?

dmmiller612 commented 5 years ago

I think Spark 2.0-3 should be fine (which is what we use, depending on the job on aws). 2.4 is really nice, so hopefully I can get the transition over soon.

GeoffDuniam commented 5 years ago

Brilliant, thanks - I'll keep an eye out.

dmmiller612 commented 5 years ago

No problem at all. Thanks for filing the issue. This feedback is great, since we mostly work on AWS EMR, so it is good to see how it is working on other setups.

GeoffDuniam commented 5 years ago

Hi Derek,

Ran up the tests on Spark 2.3.0, still getting the same issues, even on small numbers of iterations. FYI, the infrastructure we're running is Cloudea CDH 5.16.1 on 64 bit Ubuntu 16.04, JVM version is 1.7. Same behaviour being exhibited, all executors bar one complete with this error

Partition Id: f3088d84b545445b8e70e51a874d85bc, Iteration: 19, Loss: 0.204772 19/05/08 04:51:03 INFO python.PythonRunner: Times: total = 2572365, boot = 851, init = 2847, finish = 2568667 19/05/08 04:51:03 INFO executor.Executor: Finished task 4.0 in stage 4.0 (TID 63). 2347 bytes result sent to driver 19/05/08 04:56:05 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM 19/05/08 04:56:05 INFO storage.DiskBlockManager: Shutdown hook called 19/05/08 04:56:05 INFO util.ShutdownHookManager: Shutdown hook called

and the remaining executor hangs. Not sure if it's a YARN issue, the logs are surprising sparse on detail but the CoarseGrainedError seems to be memory related?

we've tried a number of combinations of partitioned dataframe data, mini batches and partitions but this issue keeps cropping up. Any suggestions?

Thanks again for your help.

Cheers

Geoff

dmmiller612 commented 5 years ago

Ah. that's a bummer. I have a few suggestions that might help.

  1. Tensorflow 1.8 and greater had issues in our ecosystem, due to a change on their end where graphs were handled with the C api. Even though we made a change to help solve this, internally, we still use Tensorflow 1.7 to avoid any weirdness.

  2. Try adding the Lock for SparkAsyncDL. By default it runs on Hogwild, and there could be a deadlock somewhere that we missed. The lock option is still asynchronous but ensures weights won't overwrite each other.

  3. Check to make sure the partitions are even. It may be trying to write images to disk and loading them in map partitions. Preferably, all data could fit into memory.

  4. You could try increasing the executor memory, which should hopefully avoid writing to disk.

GeoffDuniam commented 5 years ago

Hi Derek,

Thanks for the quick response - we were running tensorflow 1.13 so we've downgraded to tensorflow 1.7; looks like that may have sorted the issue. I'll be testing all morning, but at the moment it's looking promising.

Quick question on partitions, if I may - what's the relationship between the dataframe partitions and the partitions parameter we pass into SparkAsyncDL? As an example, if we load the csv file from HDFS the dataframe as 27 partitions (and we can repartiton the dataframe to any number of partitions necessary using either coalesce or repartition). So, should we be passing in the same number of partition to SparkAsyncDL? And should the partitions have record counts that are multiples of the mini batch size?

Again, thanks very much for the help, really appreciate it.

Cheers

Geoff

howanu commented 5 years ago

Hi Geoff,

Did you eventually succeed running a large job with the change to Tensorflow 1.7? I was also having trouble running the examples, and just saw this post. I’m also at Spark 2.4.1 and Tensorflow 1.13.1. I’m hoping to use sparkflow for a GAN on about 100M rows of transactional data, but downgrading to Tensorflow 1.7 may be difficult and I’d like have some confidence that it will pan out before starting down that road.

I wish I could help with you partition questions, but I have the same questions myself! I'll post back when I figure it out.

Much appreciated, Howanu

dmmiller612 commented 5 years ago

I was able to reproduce the issue in 1.13. I am trying to work through the recent changes in TensorFlow, other than the C compiled graph issue to see if anything pops out. I think right now that 1.7 is the safest bet. I also found an issue spark 2.4.3 (not sure if it effects 2.4.1), where a job stalled. That one I am bit more perplexed on, but I am tracking down recent changes from Spark to see if I can spot the issue.

The partitions parameter is there to match executors from the currently running cluster, which can be important for GPU training. We will basically coalesce the partitions to the number of clusters, if and only if the specified number of partitions is more than the current number number of partitions.

dmmiller612 commented 5 years ago

Specifically, this line: https://github.com/lifeomic/sparkflow/blob/master/sparkflow/tensorflow_async.py#L283

howanu commented 5 years ago

Thank you, Derek, the code clears that up! I'm investigating how hard the Tensorflow downgrade would be given my shared environment. Hopefully I'll be able to downgrade to 1.7.

GeoffDuniam commented 5 years ago

Hi Derek and Howanu,

Derek, I'll echo what Howanu said, that code clears it up nicely. I've gone through the YARN and Spark settings, turns out that the yarn.nodemanager.resource settings for memory and cpu allocation hadn't been optimised, as well as the node manager settings java heap size- my bad, please accept my apologies for that. We're still fine tuning but it seems to be running well at the moment.

Howanu, we'll certainly be looking at using SparkFlow for source detection in radio images in larger and larger datasets; we currently have a RNN that we're running against 80,000 images in the training set that we're going to convert to Spark and SparkFlow. At the moment, downgrading to tensorflow 1.7 isn't an issue for us, but we'll certainly be interested in upgrading as soon as we can. Long term, some of the data products we'll need to analyse will be going hundreds of TB. (Theoretically, some data products may well go to PB scale) hence our interest in Spark and distributed tensorflow.

We're also be running tests against raw CSV data and storing the images as a Parquet table on HDFS with Snappy compression. I have the MNIST dataset loaded as a partitioned table on Parquet and the compression looks excellent - the ~107 mb csv file compresses down to ~ 23.4m (which equates to 70.3m total storage with block replication). Whether or not that translates to improved read performance remains to be seen, but I'll certainly let you know.

Derek, if you need us to help out with any of the testing, please do let us know. We'll be looking at upgrading our cluster to GPU enabled worker nodes in the next couple of months and there are a number of classification and source finding use cases we will be implementing with tensorflow, so SparkFlow is of considerable interest to us.

Thanks again

Geoff

GeoffDuniam commented 5 years ago

Howanu, just a thought - we're running multiple virtual environments against Spark (Python 2.7 and 3.6 against Spark 1.6 and 2.3) and that seems to work quite well? We're running a Cloudera cluster, and going in through Jupyterhub; would a virtualenv be an option for you?

dmmiller612 commented 5 years ago

Thanks Geoff, I will definitely take you up on that. I am planning some sweeping changes to fix the c Tensorflow Graph issue once and forall. It may take me some time though with testing and everything.

dmmiller612 commented 5 years ago

So good news/bad news. I think I found the underlying cause for some of the stalling issues. The good news is that there is a fix for python 3 in using a spawn process. The bad news is that it won't work for python 2. In newer versions of SparkFlow, we may drop support of python 2.7, especially considering that TensorFlow 2.0 will definitely need that spawn process.

GeoffDuniam commented 5 years ago

Hi Derek,

Great news - any idea when the fix is available? We are currently using Python 2.7 for some of our analysis but frankly I don't see that as being an issue as we're migrating to Python 3 anyway.

dmmiller612 commented 5 years ago

We released a new version over the weekend (specifically 0.7.0). Let me know if some of the same issues persist.

GeoffDuniam commented 5 years ago

Will do - just finishing up some testing and we'll be installing the new version later today. I'll keep you posted.

Thanks again :)

Geoff

GeoffDuniam commented 5 years ago

Hi Derek,

Ugraded to 0.7.0, but the logs are showing this error -

py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext. : org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at: org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:58)

Not sure if you've seen this - I reverted back to 0.6.0 and job ran fine. Tried setting the allowMultipleContexts parameter to true, but no joy. Any ideas?

Cheers

Geoff.

dmmiller612 commented 5 years ago

That error typically happens when the spark context or session is called multiple times in the same job. Are you using a singleton for the session? Or probably a better question is do you see the error in the dnn example?

GeoffDuniam commented 5 years ago

Hi Derek,

The DNN process seems to be throwing the error- monitoring the job on YARN, runs fine up until the process runs the fit statement, and then the process throws the error - I've attached the program script and the logs below. Not sure what you mean by singleton but I'm assuming that's the container/executor setup with 1 core per executor?

Here's the spark session set up we've been using. FYI, the conf statements for spark.yarn.appMasterEnv, spark.pyspark.python and spark.pyspark.driver.python are there to make sure the spark jobs pick up the libraries from the virtual environments we're using - we have identical environments across the worker and gateway nodes. In this case, RGZ3_ENV is Python 3.6, Spark 2.3, and the associated libraries for sparkflow etc installed.

spark2-submit \ --master yarn \ --deploy-mode client \ # Same result if we submit cluster mode --num-executors [varies] \ --executor-cores 1 \ --executor-memory 2642m \ --conf spark.yarn.appMasterEnv.RGZ3_ENV='True' \ --conf spark.default.parallelism=80 \ --conf spark.kryoserializer.buffer.max=512 \ --conf spark.app.name='Sparkflow testing' \ --conf spark.pyspark.python=/home/hduser/.virtualenvs/ClaRAN3/bin/python \ --conf spark.pyspark.driver.python=/home/hduser/.virtualenvs/ClaRAN3/bin/python \

Source code for the test script RunSparkflowDemo.txt

Log file from last run - Sparkflow0.7.0-Test.log_01_58_22_05_2019.log

Run log, error message at line 502 subTest.log

HTH

Cheers Geoff

dmmiller612 commented 5 years ago

Ah, I see. This is helpful, thanks. One thing to try if this is a blocker is to turn on fair scheduling with this command: spark.scheduler.mode FAIR. I'll see if I can reproduce in aws today.

dmmiller612 commented 5 years ago

I tried a few different tests with Spark 2.4.0 and TensorFlow 1.12.0 on AWS EMR, and I have been unable to reproduce the issue. GPU seems to work as well. I'll keep trying to dig in. One thing maybe the sparkContext instead of SparkSession. We utilize the session over the context, and maybe another context is being created during the spawn when we ask for the session?

GeoffDuniam commented 5 years ago

Hi Derek,

Changed SparkContext to SparkSession, see

RunSparkflowDemo.txt

But still getting the same error. However, running the same program in a JupyterNotebook, runs fine.

Here's the spark-submit command

spark2-submit --master yarn --deploy-mode client --conf spark.pyspark.python=/home/hduser/.virtualenvs/ClaRAN3/bin/python --conf spark.pyspark.driver.python=/home/hduser/.virtualenvs/ClaRAN3/bin/python RunSparkflowDemo.py true true 1 64 80 0.4 sf-Lck-E1-B64-P80-Mclient > submit.log 2>&1 &

Run log

submit.log

Curious that this issue didn't raise it's ugly head with Sparkflow 0.6.0?

dmmiller612 commented 5 years ago

There are a couple of things that surprise me. It clearly shows that the spawn is being called and is apart of the exception. Some of these are:

  1. I think the calling this appId=sc._jsc.sc().applicationId() would raise the same error, since I think that creates another spark context with the session.
  2. The source of the error is pointing to the appName: File "/home/hduser/pySparkNotebooks/SparkFlow/RunSparkflowDemo.py", line 29, in <module> .appName(sys.argv[7])\. I am not really sure what to make of that, as I am not sure how that would throw an error.

I wonder if removing all javaspark context calls will fix the issue?

dmmiller612 commented 5 years ago

Regardless, I am going to be doing a bit of cleaning up, just not sure it will fix this issue.

GeoffDuniam commented 5 years ago

Hi Derek,

OK, removed all the extraneous code, see

RunSparkflowSimple.txt

Still throwing the error

submit.log

Quick Q - how are you submitting your jobs, you using spark-submit?

dmmiller612 commented 5 years ago

yeah, this is using spark-submit. I am going to have another release soon that might address it. I tried running it on Google cloud finally, and I still wasn't able to reproduce. Here was the simple config that I used for submission with aws:

[
  {
    "Name": "ML Local",
    "Args": [
      "spark-submit",
      "--master", "yarn",
      "--deploy-mode", "cluster",
      "s3://s3-bucket-here/dnn_yarn.py"
    ],
    "Jar": "command-runner.jar",
    "Type": "CUSTOM_JAR"
  }
]

With this configuration:

[
  {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.authenticate": "true",
      "spark.authenticate.enableSaslEncryption": "true",
      "spark.network.sasl.serverAlwaysEncrypt": "true",
      "spark.sql.pivotMaxValues": "100000"
    }
  },
  {
    "Classification": "spark-env",
    "Properties": {
      "SPAKR_JAVA_OPTS": "-XX:+UseCompressedOops"
    },
    "Configurations":[
      {
        "Classification":"export",
        "Properties":{
          "PYSPARK_PYTHON":"/usr/bin/python3"
        }
      }
    ]
  },
  {
    "Classification": "emrfs-site",
    "Properties": {
      "fs.s3.connection.maximum": "2000",
      "fs.s3.consistent": "false",
      "fs.s3.consistent.retryCount": "10",
      "fs.s3.consistent.retryPeriodSeconds": "10",
      "fs.s3.enableServerSideEncryption":"true",
      "fs.s3.serverSideEncryptionAlgorithm": "AES256"
    }
  },
  {
    "Classification": "spark",
    "Properties": {
      "maximizeResourceAllocation": "true"
    }
  },
  {
    "Classification": "core-site",
    "Properties": {
      "spark.authenticate": "true"
    }
  }

]

I ran the simple_dnn.py script with a simple modification in removing master and memory constraints. The really weird thing to me is that it points to the appName. I may open up an issue with Spark because that part doesn't make any sense.

dmmiller612 commented 5 years ago

Have you tried a 2.4.0 cluster yet? Not sure that it would make a difference, but I was just curious.

dmmiller612 commented 5 years ago

I finally was able to reproduce. It has to do where the TensorFlow function is put. I got around this by first adding the if __name__ == '__main__': condition. Then I put the tensorflow function outside of that if condition. So in short:

def small_model():
    x = tf.placeholder(tf.float32, shape=[None, 784], name='x')
    y = tf.placeholder(tf.float32, shape=[None, 10], name='y')
    layer1 = tf.layers.dense(x, 256, activation=tf.nn.relu, kernel_initializer=tf.glorot_uniform_initializer())
    layer2 = tf.layers.dense(layer1, 256, activation=tf.nn.relu, kernel_initializer=tf.glorot_uniform_initializer())
    out = tf.layers.dense(layer2, 10, kernel_initializer=tf.glorot_uniform_initializer())
    z = tf.argmax(out, 1, name='out')
    loss = tf.losses.softmax_cross_entropy(y, out)
    return loss

if __name__ == '__main__':
    print("Do things here")

I am working on a way to get around this, but that is the current solution that I found that works.

Edit: I was able to dig in and find out why this is the case. Originally, we were seeing training jobs freeze with newer versions of tensorflow. This was because their graph change to C was not fork safe. It seems like they have stated that is by design, so that pretty much ends the conversation there. With that in mind, we needed to use the "Spawn" multiprocessing context. What I didn't realize is that, even if code isn't referenced, it will pickle all files that associate with the the HogWild class. Even though that isn't amazing, I think we are going to have to stay with "spawn" multiprocessing. I will add to the readme to avoid global spark sessions outside of the name == 'main' sections. I think Spark also notes that is best practice as well.

I am still playing with different ideas, such as ditching flask entirely, but the fork restrictions will always be the biggest challenge.

GeoffDuniam commented 5 years ago

Hi Derek,

That works - looks like global spark session was causing the problem; creating the spark session within main and defining the tensorflow graph as a function is working. Thanks very much for the assist.

FYI, I've attached the simplified code that we got running - may help someone else getting started.

RunSparkflowSimpleCon.txt

GeoffDuniam commented 5 years ago

Hi Derek,

Just need a quick clarification on some of the documentation - the iters parameter in the SparkAsyncDL class - that denotes a full pass through the training data? (ie an epoch?) reason we ask is that if we're using the calculation (iterations = trainingSetSize / batchSize epochs) - it takes ~ 57 hours for 10 epochs with a batch size of 32 (36000 / 32 10 = 11250 iterations) which sounds excessive? As a comparison, training the same dataset with a similar graph in standard tensorflow for 40 epochs on a batch size of 50 takes no more than a couple of minutes; so obviously we're doing something wrong?

Other than that, it's all running fine.

Thanks again,

Geoff.

dmmiller612 commented 5 years ago

Sorry I was out of country. Is this for the MNIST example? I am cleaning up some code, and after that I will add some larger examples. While epochs represent the number of iterations, than can be several sub batch iterations through the miniStochasticIters parameter which defaults to -1.

This parameter does: miniStochasticIters: If using a mini batch, you can choose number of iters you would like to do with the batch size above per epoch. A value of -1 means that you would like to run mini-batches on all data in the partition.

GeoffDuniam commented 5 years ago

hi Derek,

Thanks for that and no problem - we suspected that may have been the case; I did have a look through the source code (probably should have done that first!) and worked it out from there.

We came across an interesting phenomenon - we seem to be getting different results depending on whether we run the program in client or cluster mode in Spark. All other parameters being equal the accuracy results vary quite significantly. - running in client mode we get an accuracy figure of 0.8915 (20 iters, batch size if 128, training against 35769 images) but we run exactly the same set of parameters in the same pyspark program on Cluster mode, the result is 0.5695. Not sure why we would be seeing that discrepancy, any ideas?

dmmiller612 commented 5 years ago

We predominantly use client mode for now, because that puts the main network on the driver node and is easier to manage. It kind of makes sense that the cluster mode might perform less than client mode, but probably not that bad. I have never used that mode for training at work though.

The move to barrier executors should solve all of this though, as they the tasks will be running at the same time. I have a local branch started on this, but I have had split priorities. I hope to have something up soon.

tengo-group commented 2 years ago

Hi Derek, I am new to Spark and to TensorFlow. I am trying to run an LSTM model on spark with the help of your library. I wonder if you could elaborate a bit on the options you have for SparkAsyncDL. Most of them seem straightforward but what about this ones:

tfInput='x:0', tfLabel='y:0', tfOutput='out:0', tfOptimizer='adam'

Also in my LSTM model I will optimes for Mean Squared Error, would that affect the OptimizerOptions = ?? What are the rgith set of parameters for LSTM? On the other hand, I want to split my data to train and test sets. How should I go about with SparkFlow? Does the SparkAsyncDL do it for me? I am sorry in addvance so as to some of the questiones may seem dumb but as I told you, this is all new world for me.