bio-guoda / guoda-services

Services provided by GUODA, currently a container for tickets and wikis.
MIT License
2 stars 0 forks source link

HDFS inaccessible #77

Closed jhpoelen closed 4 months ago

jhpoelen commented 5 years ago

While running:

val corpus = spark.read.parquet("/guoda/data/source=preston.acis.ufl.edu/dwca/core.parquet")                
corpus.coalesce(200).write.parquet("/guoda/data/source=preston.acis.ufl.edu/data/core2.parquet")

via spark-shell-cluster.sh in https://github.com/bio-guoda/preston-scripts

The tasks crashes and on attempting to restarting the job, it appeared that HDFS was inaccessible due to excessive failovers:

19/09/13 18:35:20 WARN RetryInvocationHandler: Exception while invoking class org.apache.hadoop.hdfs.protoco
lPB.ClientNamenodeProtocolTranslatorPB.getFileInfo over mesos02.acis.ufl.edu/10.13.44.18:8020. Not retrying 
because failovers (15) exceeded maximum allowed (15)                                                        
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is no
t supported in state standby                                                                                
        at org.apache.hadoop.hdfs.server.namenode.ha.StandbyState.checkOperation(StandbyState.java:87)      
        at org.apache.hadoop.hdfs.server.namenode.NameNode$NameNodeHAContext.checkOperation(NameNode.java:17
74)                                                                                                         
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkOperation(FSNamesystem.java:1313)       
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getFileInfo(FSNamesystem.java:3856)          
        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getFileInfo(NameNodeRpcServer.java:1006)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getFileInfo(Client
NamenodeProtocolServerSideTranslatorPB.java:843)                                                            
        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callB
lockingMethod(ClientNamenodeProtocolProtos.java)                                                            
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616
)                                                                                                           
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)                                              
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)                                     
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)                                     
        at java.security.AccessController.doPrivileged(Native Method)                                       
        at javax.security.auth.Subject.doAs(Subject.java:422)                                               
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)             

This was also supported by the error messages on attempting to do a hdfs dfs -ls via a jupyter.idigbio.org terminal:

2019-09-13 19:06:27,408 INFO  [main] retry.RetryInvocationHandler (RetryInvocationHandler.java:invoke(140)) 
- Exception while invoking getFileInfo of class ClientNamenodeProtocolTranslatorPB over mesos02.acis.ufl.edu
/10.13.44.18:8020 after 1 fail over attempts. Trying to fail over after sleeping for 1230ms.                
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is no
t supported in state standby                                                                                
        at org.apache.hadoop.hdfs.server.namenode.ha.StandbyState.checkOperation(StandbyState.java:87)      
        at org.apache.hadoop.hdfs.server.namenode.NameNode$NameNodeHAContext.checkOperation(NameNode.java:17
74)                                                                                                         
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkOperation(FSNamesystem.java:1313)       
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getFileInfo(FSNamesystem.java:3856)          
        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getFileInfo(NameNodeRpcServer.java:1006)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getFileInfo(Client
NamenodeProtocolServerSideTranslatorPB.java:843)                                                            
        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callB
lockingMethod(ClientNamenodeProtocolProtos.java)                                                            
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616
)                                                                                                           
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)                                              
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)                                     
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)                                     
        at java.security.AccessController.doPrivileged(Native Method)                                       
        at javax.security.auth.Subject.doAs(Subject.java:422)                                               
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)             
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)                                       

fyi @roncanepa @mielliott @ialzuru

jhpoelen commented 5 years ago

Also see related past issue #40 .

roncanepa commented 5 years ago

I believe hdfs should now be available. Please retry? For reasons still unknown, namenode service went down on nn1 and for some reason zk did not transition (or was unable) to using nn2. It was still in standby state.

jhpoelen commented 5 years ago

@roncanepa running test in https://github.com/bio-guoda/guoda-services/issues/77#issue-493548206 ...

jhpoelen commented 5 years ago

@roncanepa I am reproducing the results by running https://github.com/bio-guoda/guoda-services/issues/77#issue-493548206 and am observing many lost tasks and errors in the console:

19/09/17 12:02:39 WARN TaskSetManager: Lost task 155.3 in stage 1.0 (TID 311, mesos07.acis.ufl.edu, executor
 19): ExecutorLostFailure (executor 19 exited caused by one of the running tasks) Reason: Executor heartbeat
 timed out after 159522 ms                                                                                  
19/09/17 12:02:39 WARN TaskSetManager: Lost task 80.2 in stage 1.0 (TID 314, mesos07.acis.ufl.edu, executor 
19): ExecutorLostFailure (executor 19 exited caused by one of the running tasks) Reason: Executor heartbeat 
timed out after 159522 ms                                                                                   
19/09/17 12:02:39 WARN TaskSetManager: Lost task 121.3 in stage 1.0 (TID 305, mesos07.acis.ufl.edu, executor
 19): ExecutorLostFailure (executor 19 exited caused by one of the running tasks) Reason: Executor heartbeat
 timed out after 159522 ms 
jhpoelen commented 5 years ago

Please do note that hdfs is still accessible via hdfs dfs -ls

$ hdfs dfs -ls                                                                       
Found 3 items                                                                                               
drwxr-xr-x   - hdfs supergroup          0 2019-06-04 11:53 small_test_dataset                               
drwxr-xr-x   - hdfs supergroup          0 2017-07-24 13:47 tmp                                              
drwxr-xr-x   - hdfs supergroup          0 2017-07-19 01:23 ~
jhpoelen commented 5 years ago

See more evidence of a malfunctioning cluster (see screenshot attached): while the job has only completed 8 out of 200 tasks, 304 attempts have failed.

So, it appears you can reproduce the failures with a two line spark scripts.

@roncanepa please let me know how you'd like to proceed to find the root cause of these failures. Screenshot from 2019-09-17 09-39-36

roncanepa commented 5 years ago

@jhpoelen FYI I'm still investigating, including reviewing the cluster config

jhpoelen commented 5 years ago

@roncanepa Thanks for all the work! If you have any ideas on how to simplify the setup, I'd be curious to know. For instance, it seems that the apache spark + kubernetes is getting quite popular: https://kubernetes.io/blog/2018/03/apache-spark-23-with-native-kubernetes/ . Now, we are using the apache spark + mesos combo.

roncanepa commented 5 years ago

@jhpoelen funny you mention it, as we were just discussing a bit ago whether there's a better setup we can move to, and spark +k8s came up

jhpoelen commented 5 years ago

@roncanepa I don't have experience with k8s yet, but heard good things. Would this help managing HDFS also?

jhpoelen commented 5 years ago

@roncanepa note that I was able to complete a large r/w hdfs apache spark job . . . however, only with 5 parallel tasks at a time.

Here's how to reproduce:

  1. start spark shell using:
#!/bin/bash                                                                                                 
#                                                                                                           
# Starts a spark-shell in the guoda cluster.                                                                
#                                                                                                           
# Please use with care and close when done, because it take up all the resources.                           
#                                                                                                           

spark-shell \                                                                                               
  --master mesos://zk://mesos01:2181,mesos02:2181,mesos03:2181/mesos \                                      
  --driver-memory 4G \                                                                                      
  --executor-memory 20G \                                                                                   
  --total-executor-cores 5 \                                                                                
  --conf spark.sql.caseSensitive=true
  1. now, in spark shell, run:
    val corpus = spark.read.parquet("/guoda/data/source=preston.acis.ufl.edu/dwca/core.parquet")                
    corpus.coalesce(200).write.parquet("/user/jhpoelen/dwca-core4.parquet")

This completed in about a day, reading ~400 GB and writing ~400 GB. So, an approximate throughput would be 400 / 24 ~ 17 GB/h ~ 5 MB/s with max parallelism of 5. Is this expected?

jhpoelen commented 5 years ago

fwiw - I have re-started a basic word freq count across all of dwca using:

import spark.implicits._
val corpus = spark.read.parquet("/guoda/data/source=preston.acis.ufl.edu/dwca/core.parquet")
corpus.rdd.flatMap(_.toSeq.filter(_ != null).map(x => x.toString.toLowerCase)).map( x=> (x, 1)).reduceByKey(_ + _).map(x => (x._2, x._1)).sortByKey(false).coalesce(20).saveAsTextFile("/user/jhpoelen/dwcafreq.txt")

using spark shell with config:

spark-shell \
  --master mesos://zk://mesos01:2181,mesos02:2181,mesos03:2181/mesos \
  --driver-memory 4G \
  --executor-memory 20G \
  --conf spark.sql.caseSensitive=true
jhpoelen commented 5 years ago

running so far produced the following warnings and errors. 216 tasks failed for 525 successful ones out of an estimated total of 6384 tasks (see spark ui screenshot attached) Screenshot from 2019-09-18 17-08-43

19/09/18 20:03:59 WARN TaskSetManager: Lost task 634.0 in stage 1.0 (TID 807, mesos06.acis.ufl.edu, executor
 10): ExecutorLostFailure (executor 10 exited caused by one of the running tasks) Reason: Executor heartbeat
 timed out after 167003 ms                                                                                  
19/09/18 20:03:59 WARN TaskSetManager: Lost task 604.0 in stage 1.0 (TID 789, mesos06.acis.ufl.edu, executor
 10): ExecutorLostFailure (executor 10 exited caused by one of the running tasks) Reason: Executor heartbeat
 timed out after 167003 ms                                                                                  
[Stage 1:==============>                                                                 (595 + -73) / 3192]
19/09/18 20:04:02 ERROR TaskSchedulerImpl: Lost executor 10 on mesos06.acis.ufl.edu: Executor finished with 
state KILLED                                                                                                
19/09/18 20:04:04 WARN MesosCoarseGrainedSchedulerBackend: Unable to parse  into a key:value label for the t
ask.                                                                                                        
[Stage 1:==============>                                                                 (595 + -57) / 3192]
19/09/18 20:04:07 WARN TransportChannelHandler: Exception in connection from /10.13.44.30:51328             
java.io.IOException: Connection reset by peer                                                               
        at sun.nio.ch.FileDispatcherImpl.read0(Native Method)                                               
        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)                                       
        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)                                          
        at sun.nio.ch.IOUtil.read(IOUtil.java:192)                                                          
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)                                    
        at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221)           
        at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899)                             
        at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:275)              
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)  
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)                      
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)            
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)                     
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)                                     
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)     
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.j
ava:144)                                                                                                    
        at java.lang.Thread.run(Thread.java:748)                                                            
jhpoelen commented 5 years ago

and some time later, the job crashed:

va:343)                                                                                                     
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java
:336)                                                                                                       
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.ja
va:357)                                                                                                     
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.ja
va:343)                                                                                                     
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)         
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)  
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)                      
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)            
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)                     
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)                                     
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)     
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.j
ava:144)                                                                                                    
        at java.lang.Thread.run(Thread.java:748)                                                            
19/09/18 20:23:10 WARN MesosCoarseGrainedSchedulerBackend: Timed out waiting for 3 remaining executors to te
rminate within 10000 ms. This may leave temporary files on the mesos nodes.                                 
I0918 20:23:10.315367 23677 sched.cpp:1987] Asked to stop the driver                                        
I0918 20:23:10.315531 23752 sched.cpp:1187] Stopping framework '1d4219cc-7cf5-42bf-bbe0-a82adfdef4b0-0002'

but hdfs still seems to be up and running:

$ hdfs dfs -ls                                                       
Found 3 items                                                                                               
drwxr-xr-x   - hdfs supergroup          0 2019-06-04 11:53 small_test_dataset                               
drwxr-xr-x   - hdfs supergroup          0 2017-07-24 13:47 tmp                                              
drwxr-xr-x   - hdfs supergroup          0 2017-07-19 01:23 ~
jhpoelen commented 5 years ago

Am re-running https://github.com/bio-guoda/guoda-services/issues/77#issuecomment-532905849 with maximum parallelism of 20 using:

spark-shell \                                                                                               
  --master mesos://zk://mesos01:2181,mesos02:2181,mesos03:2181/mesos \                                      
  --driver-memory 4G \                                                                                      
  --executor-memory 20G \                                                                                   
  --total-executor-cores 20\                                                                                
  --conf spark.sql.caseSensitive=true

If this run succeeds, it seems to support the ideas that HDFS is the bottleneck in this system and cannot handle the load of a cluster running at full capacity (176 cpus) for some unknown reason.

jhpoelen commented 5 years ago

Even when only running with parallelism of 20, the task are being dropped with familiar messages:

19/09/18 21:32:51 WARN TaskSetManager: Lost task 380.0 in stage 1.0 (TID 132, mesos07.acis.ufl.edu, executor
 0): ExecutorLostFailure (executor 0 exited caused by one of the running tasks) Reason: Executor heartbeat t
imed out after 121123 ms                                                                                    
19/09/18 21:32:51 WARN TaskSetManager: Lost task 421.0 in stage 1.0 (TID 144, mesos07.acis.ufl.edu, executor
 0): ExecutorLostFailure (executor 0 exited caused by one of the running tasks) Reason: Executor heartbeat t
imed out after 121123 ms                                                                                    
19/09/18 21:32:51 WARN TaskSetManager: Lost task 382.0 in stage 1.0 (TID 135, mesos07.acis.ufl.edu, executor
 0): ExecutorLostFailure (executor 0 exited caused by one of the running tasks) Reason: Executor heartbeat t
imed out after 121123 ms                                                                                    
19/09/18 21:32:51 WARN TaskSetManager: Lost task 434.0 in stage 1.0 (TID 152, mesos07.acis.ufl.edu, executor
 0): ExecutorLostFailure (executor 0 exited caused by one of the running tasks) Reason: Executor heartbeat t
imed out after 121123 ms                                                                                    
19/09/18 21:32:51 WARN TaskSetManager: Lost task 401.0 in stage 1.0 (TID 143, mesos07.acis.ufl.edu, executor
 0): ExecutorLostFailure (executor 0 exited caused by one of the running tasks) Reason: Executor heartbeat t
imed out after 121123 ms                                                                                    
[Stage 1:===>                                                                            (140 + -77) / 3192]
19/09/18 21:35:04 ERROR TaskSchedulerImpl: Lost executor 0 on mesos07.acis.ufl.edu: Remote RPC client disass
ociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messag
es.                                                                                                         
[Stage 1:===>                                                                            (140 + -79) / 3192]
19/09/18 21:35:06 WARN MesosCoarseGrainedSchedulerBackend: Unable to parse  into a key:value label for the t
ask.                                                                                                        

Suggests for those with admin privileges @ialzuru @roncanepa to inspect logs and system load to uncover the root cause of these failures.

jhpoelen commented 5 years ago

I repeated with parallelism of 5 and still tasks are dropped. However, after 13 hours the job is still running. See attached screenshot and logs.

Screenshot from 2019-09-19 08-26-10

In this configuration, 5 cpus are used on a single node, reducing the benefits of parallel processing to that of a single server / laptop.

roncanepa commented 5 years ago

Some initial benchmarking suggests that it's not HDFS itself that's bottlenecking your process to that magnitude. I don't think this cluster will ever be super-performant, as it's running on bladeservers.

19/09/19 11:43:55 INFO fs.TestDFSIO: ----- TestDFSIO ----- : write
19/09/19 11:43:55 INFO fs.TestDFSIO:            Date & time: Thu Sep 19 11:43:55 EDT 2019
19/09/19 11:43:55 INFO fs.TestDFSIO:        Number of files: 32
19/09/19 11:43:55 INFO fs.TestDFSIO: Total MBytes processed: 32000.0
19/09/19 11:43:55 INFO fs.TestDFSIO:      Throughput mb/sec: 74.07218797669503
19/09/19 11:43:55 INFO fs.TestDFSIO: Average IO rate mb/sec: 75.5267333984375
19/09/19 11:43:55 INFO fs.TestDFSIO:  IO rate std deviation: 10.273583918786349
19/09/19 11:43:55 INFO fs.TestDFSIO:     Test exec time sec: 451.125
19/09/19 11:43:55 INFO fs.TestDFSIO: 

19/09/19 11:48:45 INFO fs.TestDFSIO: ----- TestDFSIO ----- : read
19/09/19 11:48:45 INFO fs.TestDFSIO:            Date & time: Thu Sep 19 11:48:45 EDT 2019
19/09/19 11:48:45 INFO fs.TestDFSIO:        Number of files: 32
19/09/19 11:48:45 INFO fs.TestDFSIO: Total MBytes processed: 32000.0
19/09/19 11:48:45 INFO fs.TestDFSIO:      Throughput mb/sec: 146.32919496078836
19/09/19 11:48:45 INFO fs.TestDFSIO: Average IO rate mb/sec: 149.43663024902344
19/09/19 11:48:45 INFO fs.TestDFSIO:  IO rate std deviation: 25.93200590856121
19/09/19 11:48:45 INFO fs.TestDFSIO:     Test exec time sec: 223.903
19/09/19 11:48:45 INFO fs.TestDFSIO: 

####

19/09/19 12:14:53 INFO fs.TestDFSIO: ----- TestDFSIO ----- : write
19/09/19 12:14:53 INFO fs.TestDFSIO:            Date & time: Thu Sep 19 12:14:53 EDT 2019
19/09/19 12:14:53 INFO fs.TestDFSIO:        Number of files: 1
19/09/19 12:14:53 INFO fs.TestDFSIO: Total MBytes processed: 102400.0
19/09/19 12:14:53 INFO fs.TestDFSIO:      Throughput mb/sec: 73.5042354662243
19/09/19 12:14:53 INFO fs.TestDFSIO: Average IO rate mb/sec: 73.50423431396484
19/09/19 12:14:53 INFO fs.TestDFSIO:  IO rate std deviation: 0.009824026833019096
19/09/19 12:14:53 INFO fs.TestDFSIO:     Test exec time sec: 1396.191

19/09/19 12:34:34 INFO fs.TestDFSIO: ----- TestDFSIO ----- : read
19/09/19 12:34:34 INFO fs.TestDFSIO:            Date & time: Thu Sep 19 12:34:34 EDT 2019
19/09/19 12:34:34 INFO fs.TestDFSIO:        Number of files: 1
19/09/19 12:34:34 INFO fs.TestDFSIO: Total MBytes processed: 102400.0
19/09/19 12:34:34 INFO fs.TestDFSIO:      Throughput mb/sec: 100.74902571559849
19/09/19 12:34:34 INFO fs.TestDFSIO: Average IO rate mb/sec: 100.7490234375
19/09/19 12:34:34 INFO fs.TestDFSIO:  IO rate std deviation: 0.022075497178627567
19/09/19 12:34:34 INFO fs.TestDFSIO:     Test exec time sec: 1019.037

####

19/09/19 12:49:12 INFO fs.TestDFSIO: ----- TestDFSIO ----- : write
19/09/19 12:49:12 INFO fs.TestDFSIO:            Date & time: Thu Sep 19 12:49:12 EDT 2019
19/09/19 12:49:12 INFO fs.TestDFSIO:        Number of files: 1000
19/09/19 12:49:12 INFO fs.TestDFSIO: Total MBytes processed: 10000.0
19/09/19 12:49:12 INFO fs.TestDFSIO:      Throughput mb/sec: 41.768128411933986
19/09/19 12:49:12 INFO fs.TestDFSIO: Average IO rate mb/sec: 57.362693786621094
19/09/19 12:49:12 INFO fs.TestDFSIO:  IO rate std deviation: 19.724398067380072
19/09/19 12:49:12 INFO fs.TestDFSIO:     Test exec time sec: 325.891
19/09/19 12:49:12 INFO fs.TestDFSIO: 

19/09/19 12:52:29 INFO fs.TestDFSIO: ----- TestDFSIO ----- : read
19/09/19 12:52:29 INFO fs.TestDFSIO:            Date & time: Thu Sep 19 12:52:29 EDT 2019
19/09/19 12:52:29 INFO fs.TestDFSIO:        Number of files: 1000
19/09/19 12:52:29 INFO fs.TestDFSIO: Total MBytes processed: 10000.0
19/09/19 12:52:29 INFO fs.TestDFSIO:      Throughput mb/sec: 1045.1505016722408
19/09/19 12:52:29 INFO fs.TestDFSIO: Average IO rate mb/sec: 1074.4195556640625
19/09/19 12:52:29 INFO fs.TestDFSIO:  IO rate std deviation: 165.12114766630742
19/09/19 12:52:29 INFO fs.TestDFSIO:     Test exec time sec: 46.132
roncanepa commented 5 years ago

I'm also assuming that your files you're working with are rather large, such as reading in the parquet file from your example above. hdfs performance drops when the use case is many smaller files (which you can see in the 1000-file benchmark write speeds above).

Note that job/task design will also influence this, such as if intermediate steps write out temp files.

roncanepa commented 5 years ago

I'm running a data rebalance on the hdfs nodes. If it's a storage exhaustion problem that is causing the executors to be shut down, this might help.

roncanepa commented 5 years ago

I'd also suggest trying a lower value for --executor-memory 20G and see if that helps task completion, as these nodes have 24GB of memory.

jhpoelen commented 5 years ago

@roncanepa thanks for info!

  1. the input dataset: 200 parquet files of about 2G each. See $ hdfs dfs -ls -h /guoda/data/source=preston.acis.ufl.edu/dwca/core.parquet . I've coalesced the original 14k files into 200 files to in an attempt to avoid the small file problem.

  2. re: executor memory - I started new experiment with 10G executor memory with 20 total executor cores to test whether lowering the executor memory helps. Previously, with 20G, this crashed. :

    spark-shell \                                                                                               
    --master mesos://zk://mesos01:2181,mesos02:2181,mesos03:2181/mesos \                                      
    --driver-memory 4G \                                                                                      
    --executor-memory 10G \                                                                                   
    --total-executor-cores 20 \                                                                               
    --conf spark.sql.caseSensitive=true 

    with

import spark.implicits._                                                                                    
val corpus = spark.read.parquet("/guoda/data/source=preston.acis.ufl.edu/dwca/core.parquet")                
corpus.rdd.flatMap(_.toSeq.filter(_ != null).map(x => x.toString.toLowerCase)).map( x=> (x, 1)).reduceByKey(_ + _).map(x => (x._2, x._1)).sortByKey(false).coalesce(20).saveAsTextFile("/user/jhpoelen/dwcafreq.txt")
jhpoelen commented 5 years ago

@roncanepa Looking more closely at the detailed write stats you provided: with 1000 files write throughput ~ 40 Mb/s ~ 5 MB/s which is similar to observed in https://github.com/bio-guoda/guoda-services/issues/77#issuecomment-532867255 . Agree?

Also, about performance - what would you recommend for performance improvements of the system? why are the bladeservers an issue currently? spinning disk?

roncanepa commented 5 years ago

I'll be curious to see what sort of results we get out of this run.

If your data files are ~2GB, it might be worth dropping the executor memory even further. Perhaps another run with executor memory set to, say, 4GB. Spark can run multiple executors on a given node but only up to the limits that are set. For instance, with a 20GB memory setting, it will likely only be able to fit 1 executor. (I'm assuming here that the scheduler doesn't over-allocate)

Additionally, the driver memory is variable as well. If the workflow is such that executors are delivering a lot of results back to the driver that need subsequent processing, then a higher value is better, but if not, the default is 1G. (You may have already increased this based on previous runs, but worth mentioning).

roncanepa commented 5 years ago

A few things about the benchmarks. It uses 1 mapper per file.

roncanepa commented 5 years ago

We do have a relatively large (IMHO) stdev in some of the numbers, so it could also be that one or more nodes are performing worse than their peers...

jhpoelen commented 5 years ago

@roncanepa I can see your point about the stddev . Is there anyway to check performance for individual nodes?

Also, I was wondering whether mesos is a bit too optimistic about handing out resources . . . especially because nodes run along side hdfs / zookeeper etc. In this case, spark executors try to allocate but fail.

jhpoelen commented 5 years ago

related to the 20 cpu / 10GB executor mem run - so far so good: no tasks dropped. See attached screenshot.

Screenshot from 2019-09-19 13-58-55

jhpoelen commented 5 years ago

related to the 20 cpu / 10GB executor mem run - tasks have failed. See attached screenshot and error logs of three associated executors (mesos08 got killed, mesos09 and mesos10 were running at time of writing)

mesos-2019-09-20.stderr.zip

Screenshot from 2019-09-20 10-17-43

Screenshot from 2019-09-20 10-13-04

jhpoelen commented 5 years ago

@roncanepa can you see any reason why mesos08 was killed in the middle of the job? From mesos10 error logs, it appears that errors started around 19/09/20 11:23:32 (eastern time):

I0919 15:11:22.451288 15373 exec.cpp:161] Version: 1.0.0
I0919 15:11:22.453444 15367 exec.cpp:236] Executor registered on agent 88b8d0bf-aef8-4b34-bbc7-df9c1bbcf641-S2
19/09/19 15:11:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
19/09/20 11:23:32 WARN DFSClient: Slow ReadProcessor read fields took 30167ms (threshold=30000ms); ack: seqno: 2 reply: SUCCESS reply: SUCCESS reply: SUCCESS downstreamAckTimeNanos: 256710473 flag: 0 flag: 0 flag: 0, targets: [DatanodeInfoWithStorage[10.13.44.42:50010,DS-186c27a1-551a-4858-b3b5-66d0ca77f100,DISK], DatanodeInfoWithStorage[10.13.44.27:50010,DS-f1bc8c22-35c0-4778-9524-929944941fd9,DISK], DatanodeInfoWithStorage[10.13.44.36:50010,DS-5a322c80-832d-40ab-9dd6-b3df23e8fdaf,DISK]]
19/09/20 11:30:22 WARN TransportChannelHandler: Exception in connection from mesos08.acis.ufl.edu/10.13.44.36:32914
java.io.IOException: Connection reset by peer
    at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
    at sun.nio.ch.IOUtil.read(IOUtil.java:192)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
    at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221)
    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899)
    at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:275)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
    at java.lang.Thread.run(Thread.java:748)
19/09/20 11:30:22 ERROR TransportResponseHandler: Still have 5 requests outstanding when connection from mesos08.acis.ufl.edu/10.13.44.36:32914 is closed
19/09/20 11:30:22 ERROR OneForOneBlockFetcher: Failed while starting block fetches
java.io.IOException: Connection reset by peer
    at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
    at sun.nio.ch.IOUtil.read(IOUtil.java:192)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
    at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221)
    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899)
    at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:275)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
    at java.lang.Thread.run(Thread.java:748)
19/09/20 11:30:22 ERROR OneForOneBlockFetcher: Failed while starting block fetches
java.io.IOException: Connection reset by peer
    at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
    at sun.nio.ch.IOUtil.read(IOUtil.java:192)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
    at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221)
    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899)
jhpoelen commented 5 years ago

and . . . the job was stopped before completion, no results after >1 day of processing. See attached logs.

@roncanepa mesos08 (stopped first), mesos09 and mesos10 were involved in this run. Can find root cause of the failures? I'll restart the same job with identical params and attempt to capture more logging.

19/09/20 22:28:30 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() for one-way messa                       
ge.                                                                                                                                
org.apache.spark.SparkException: Could not find CoarseGrainedScheduler.                                                            
        at org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:154)                                                 
        at org.apache.spark.rpc.netty.Dispatcher.postOneWayMessage(Dispatcher.scala:134)                                           
        at org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:644)                                               
        at org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHand                       
ler.java:178)                                                                                                                      
        at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:107)                        
        at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:                       
118)                                                                                                                               
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.ja                       
va:357)                                                                                                                            
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.ja                       
va:343)                                                                                                                            
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java                       
:336)                                                                                                                              
        at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)                                        
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.ja                       
va:357)                                                                                                                            
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.ja                       
va:343)                                                                                                                            
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java                       
:336)                                                                                                                              
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)                            
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.ja                       
va:357)                                                                                                                            
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.ja                       
va:343)                                                                                                                            
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java                       
:336)                                                                                                                              
        at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)                          
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.ja                       
va:357)                                                                                                                            
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.ja                       
va:343)                                                                                                                            
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java                       
:336)                                                                                                                              
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)                       
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.ja                       
va:357)                                                                                                                            
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.ja                       
va:343)                                                                                                                            
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)                                
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)                         
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)                                             
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)                                   
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)                                            
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)                                                            
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)                            
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.j                       
ava:144)                                                                                                                           
        at java.lang.Thread.run(Thread.java:748)                      

        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.ja
va:357)                                                                                                     
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.ja
va:343)                                                                                                     
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java
:336)                                                                                                       
        at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)   
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.ja
va:357)                                                                                                     
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.ja
va:343)                                                                                                     
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java
:336)                                                                                                       
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.ja
va:357)                                                                                                     
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.ja
va:343)                                                                                                     
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)         
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)  
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)                      
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)            
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)                     
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)                                     
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)     
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.j
ava:144)                                                                              

ge.                                                                                                                                
org.apache.spark.SparkException: Could not find CoarseGrainedScheduler.                                                            
        at org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:154)                                                 
        at org.apache.spark.rpc.netty.Dispatcher.postOneWayMessage(Dispatcher.scala:134)                                           
        at org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:644)                                               
        at org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHand                       
ler.java:178)                                                                                                                      
        at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:107)                        
        at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:                       
118)                                                                                                                               
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.ja                       
va:357)                                                                                                                            
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.ja                       
va:343)                                                                                                                            
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java                       
:336)                                                                                                                              
        at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)                                        
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.ja                       
va:357)                                                                                                                            
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.ja                       
va:343)                                                                                                                            
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java                       
:336)                                                                                                                              
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)                            
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.ja                       
va:357)                                                                                                                            
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.ja                       
va:343)                                                                                                                            
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java                       
:336)                                                                                                                              
        at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)                          
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.ja                       
va:357)                                                                                                                            
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.ja                       
va:343)                                                                                                                            
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java                       
:336)                                                                                                                              
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)                       
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.ja                       
va:357)                                                                                                                            
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.ja                       
va:343)                                                                                                                            
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)                                
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)                        
        at java.lang.Thread.run(Thread.java:
748)                                        
19/09/20 22:28:30 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() for one-way messa                       
ge.                                                                                                                                
org.apache.spark.SparkException: Could not find CoarseGrainedScheduler.                                                            
        at org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:154)                                                 
        at org.apache.spark.rpc.netty.Dispatcher.postOneWayMessage(Dispatcher.scala:134)                                           
        at org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:644)                                               
        at org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHand                       
ler.java:178)                                                                                                                      
        at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:107)                        
        at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:                       
118)                                                                                                                               
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.ja                       
va:357)                                                                                                                            
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.ja                       
va:343)                                                                                                                            
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java                       
:336)                                                                                                                              
        at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)                                        
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.ja                       
va:357)                                                                                                                            
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.ja                       
va:343)                                                                                                                            
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java                       
:336)                                                                                                                              
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)                            
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.ja                       
va:357)                                                                                                                            
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.ja                       
va:343)                                                                                                                            
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java                       
:336)                                                                                                                              
        at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)                          
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.ja                       
va:357)                                                                                                                            
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.ja                       
va:343)                                                                                                                            
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java                       
:336)                                                                                                                              
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)                       
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.ja                       
va:357)                                                                                                                            
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.ja                       
va:343)                                                                                                                            
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)                                
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)                         
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)                                             
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)                                   
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)                                            
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)                                                            
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)                            
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.j                       
ava:144)                                                                                                                           
        at java.lang.Thread.run(Thread.java:748)                                                      
19/09/20 22:28:34 WARN MesosCoarseGrainedSchedulerBackend: Timed out waiting for 2 remaining executors to te
rminate within 10000 ms. This may leave temporary files on the mesos nodes.                                 
I0920 22:28:34.476763 22352 sched.cpp:1987] Asked to stop the driver                                        
I0920 22:28:34.476931 22485 sched.cpp:1187] Stopping framework '1d4219cc-7cf5-42bf-bbe0-a82adfdef4b0-0006'
roncanepa commented 5 years ago
org.apache.spark.SparkException: Could not find CoarseGrainedScheduler.                                                            

is a secondary exception due to the process it's trying to find having been already killed off.

Am looking things over on my side as well.

jhpoelen commented 5 years ago

@roncanepa agree . curious to see what you'll find. I was wondering whether mesos nodes are configured to use too many resources, especially because of co-location of services on mesos01, mesos02, and mesos03.

roncanepa commented 5 years ago

I noticed that there's a core count of 16 in the logs:

Received SUBSCRIBED event
Subscribed executor on mesos09.acis.ufl.edu
Received LAUNCH event
Starting task 3
/usr/libexec/mesos/mesos-containerizer launch --command="{"environment":{"variables":[{"name":"SPARK_EXECUTOR_OPTS","value":""},{"name":"SPARK_USER","value":"hdfs"},{"name":"SPARK_EXECUTOR_MEMORY","value":"10240m"}]},"shell":true,"value":" \"\/opt\/spark\/latest\/.\/bin\/spark-class\" org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark:\/\/CoarseGrainedScheduler@10.13.44.50:34212 --executor-id 3 --hostname mesos09.acis.ufl.edu --cores 16 --app-id 1d4219cc-7cf5-42bf-bbe0-a82adfdef4b0-0007"}" --help="false" --unshare_namespace_mnt="false"
Forked command at 10654

Weren't most jobs usually done with 5? If a 5-core-count job succeeds, it would make sense that a 16-core would fail, in that there's only so much memory to go around. Although there are other people using the cluster, too, and so it could be that this is someone else's job.

The structure of the jobs themselves can also play a role, too. You mentioned that you recently partitioned everything down to a max of 200. This combined with a higher core count would explain some OOM errors.

a CoarseGrainedScheduler (which we have; dynamic allocation is turned off) will only run one total executor per node, which then creates the various tasks, which is probably where the original settings of driver = 4GB and executor = 20GB came from. It's an attempt to use all the memory on the system, but feels rather brute-force. This could also be biting us.

roncanepa commented 5 years ago

Dropping the executor memory and increasing the # of cores will also contribute to this. Given that a coarse-mode will only run 1 executor per node, the core count is the number of simultaneous tasks that a executor can run. however, the memory limit is per executor. So would have the effect of dropping the amount of memory that's available to it and also increasing the number of parallel tasks.

What was your original baseline on this job in the months prior to now? memory, cores, etc. And were there any other changes besides the max 200 partition?

Something seems to have changed, whether it's the size of the data, simultaneous usage of the cluster, or experiencing some hidden hiccups now that it's no longer using marathon...

roncanepa commented 5 years ago

I also noticed this, which seems a little strange to have a "used" value > "allocated".

active-task-memory-used-allocated

jhpoelen commented 5 years ago

@roncanepa thanks for sharing your observations and thoughts.

I'd like to better understand why the executors are getting killed. Out of memory issues? Unresponsive? CPU 100%? Disk failures? As you noticed, I was unable to find the root cause of the task/job failures provide by the mesos / spark dashboards.

Mesos should be pretty good at distributing resources given that the resource setting per node match the actual physical machine. Did you find the place were these resource setting are defined? Perhaps an idea to allocate 50-75% of cpu/mem resources to mesos tasks to avoid hitting physical limits.

jhpoelen commented 5 years ago

btw, re: your question "What was your original baseline on this job in the months prior to now? memory, cores, etc. And were there any other changes besides the max 200 partition?"

I was able to run similarly complex jobs months ago on the same system, utilizing the entire cluster with 20GB executor / 4GB driver constraints.

roncanepa commented 5 years ago

@roncanepa thanks for sharing your observations and thoughts.

I'd like to better understand why the executors are getting killed. Out of memory issues? Unresponsive? CPU 100%? Disk failures? As you noticed, I was unable to find the root cause of the task/job failures provide by the mesos / spark dashboards.

This is what I'm trying to find, as well.

Mesos should be pretty good at distributing resources given that the resource setting per node match the actual physical machine. Did you find the place were these resource setting are defined? Perhaps an idea to allocate 50-75% of cpu/mem resources to mesos tasks to avoid hitting physical limits.

Spark loads defaults unless they are otherwise specified via code or command line arguments, which is usually cores = 4 when it comes to the compute side of things. Options that have been altered from default at a config file level are visible via the /environments tab, however, defaults will not show there.

roncanepa commented 5 years ago

I was able to run similarly complex jobs months ago on the same system, utilizing the entire cluster with 20GB executor / 4GB driver constraints.

I had assumed so, but wanted to check. Thank you.

roncanepa commented 5 years ago

So this is interesting. It's likely that it's a disk issue that's forcing the termination of tasks, ~although not yet clear whether it's memory pressure or actual storage that's the culprit.~ I lean highly towards disk. Details follow.

Note on this spark master info page (this host is idb-jupyter1) for this stage that executor 1 (mesos05) has the "ExecutorLostFailure" error message: http://10.13.44.50:4042/stages/stage/?id=5&attempt=2

Here are some system logs that I dug up:

mesos-slave.INFO:

I0923 02:25:44.230794  1265 slave.cpp:4591] Current disk usage 89.88%. Max allowed age: 11.760921977333334mins
I0923 02:26:44.232152  1261 slave.cpp:4591] Current disk usage 89.89%. Max allowed age: 10.968781757800000mins
I0923 02:27:44.233141  1259 slave.cpp:4591] Current disk usage 89.93%. Max allowed age: 7.189654145483333mins
I0923 02:28:44.234130  1253 slave.cpp:4591] Current disk usage 89.93%. Max allowed age: 6.854727008316667mins
I0923 02:29:44.234627  1256 slave.cpp:4591] Current disk usage 89.95%. Max allowed age: 5.072303640383334mins
I0923 02:30:44.235481  1263 slave.cpp:4591] Current disk usage 89.97%. Max allowed age: 3.216784862716667mins
I0923 02:31:44.236294  1267 slave.cpp:4591] Current disk usage 89.97%. Max allowed age: 3.216784862716667mins
I0923 02:32:44.237443  1263 slave.cpp:4591] Current disk usage 89.97%. Max allowed age: 3.216784862716667mins
I0923 02:33:44.238639  1256 slave.cpp:4591] Current disk usage 89.97%. Max allowed age: 3.216784862716667mins
I0923 02:34:44.239791  1258 slave.cpp:4591] Current disk usage 89.99%. Max allowed age: 54.458065132000002secs
I0923 02:35:44.282454  1255 slave.cpp:4591] Current disk usage 90.01%. Max allowed age: 0ns
I0923 02:36:44.321285  1258 slave.cpp:4591] Current disk usage 90.02%. Max allowed age: 0ns
I0923 02:37:44.321596  1258 slave.cpp:4591] Current disk usage 90.03%. Max allowed age: 0ns
I0923 02:38:44.322706  1266 slave.cpp:4591] Current disk usage 90.04%. Max allowed age: 0ns
I0923 02:39:44.323653  1252 slave.cpp:4591] Current disk usage 90.05%. Max allowed age: 0ns
I0923 02:40:44.324791  1260 slave.cpp:4591] Current disk usage 90.06%. Max allowed age: 0ns
I0923 02:41:44.325728  1252 slave.cpp:4591] Current disk usage 90.08%. Max allowed age: 0ns
I0923 02:42:44.326076  1262 slave.cpp:4591] Current disk usage 90.08%. Max allowed age: 0ns

see "max allowed age" drop to 0?

just a little later, there's this:

Sep 23 04:51:44 mesos05 mesos-slave[968]: I0923 04:51:44.435835  1255 slave.cpp:4591] Current disk usage 90.61%. Max allowed age: 0ns
Sep 23 04:51:49 mesos05 mesos-slave[968]: I0923 04:51:49.625272  1260 slave.cpp:2035] Asked to kill task 1 of framework 1d4219cc-7cf5-42bf-bbe0-a82adfdef4b0-0007
Sep 23 04:51:49 mesos05 mesos-slave[968]: I0923 04:51:49.977550  1259 slave.cpp:3211] Handling status update TASK_KILLED (UUID: 71f6f5b0-59b7-47cf-8203-a85021db3c91) for task 1 of framework 1d4219cc-7cf5-42bf-bbe0-a82adfdef4b0-0007 from executor(1)@10.13
.44.27:37229
Sep 23 04:51:50 mesos05 mesos-slave[968]: I0923 04:51:50.101289  1266 status_update_manager.cpp:320] Received status update TASK_KILLED (UUID: 71f6f5b0-59b7-47cf-8203-a85021db3c91) for task 1 of framework 1d4219cc-7cf5-42bf-bbe0-a82adfdef4b0-0007
Sep 23 04:51:50 mesos05 mesos-slave[968]: I0923 04:51:50.105306  1266 slave.cpp:3604] Forwarding the update TASK_KILLED (UUID: 71f6f5b0-59b7-47cf-8203-a85021db3c91) for task 1 of framework 1d4219cc-7cf5-42bf-bbe0-a82adfdef4b0-0007 to master@10.13.44.18:5050
Sep 23 04:51:50 mesos05 mesos-slave[968]: I0923 04:51:50.154408  1266 slave.cpp:3514] Sending acknowledgement for status update TASK_KILLED (UUID: 71f6f5b0-59b7-47cf-8203-a85021db3c91) for task 1 of framework 1d4219cc-7cf5-42bf-bbe0-a82adfdef4b0-0007 to executor(1)@10.13.44.27:37229

and then you can see it prune directories and get back to ~66% disk utilization:

I0923 04:52:44.437016  1258 slave.cpp:4591] Current disk usage 90.61%. Max allowed age: 0ns
I0923 04:52:44.437103  1258 gc.cpp:170] Pruning directories with remaining removal time 6.99940466451556days
I0923 04:52:44.437136  1258 gc.cpp:170] Pruning directories with remaining removal time 6.99940466438815days
I0923 04:52:44.437160  1258 gc.cpp:170] Pruning directories with remaining removal time 6.9994046643763days
I0923 04:52:44.437191  1258 gc.cpp:133] Deleting /var/lib/mesos/slaves/e97a8d30-e7b4-41aa-ab12-7dedb8eaa21e-S0/frameworks/1d4219cc-7cf5-42bf-bbe0-a82adfdef4b0-0007
I0923 04:53:00.604630  1258 gc.cpp:146] Deleted '/var/lib/mesos/slaves/e97a8d30-e7b4-41aa-ab12-7dedb8eaa21e-S0/frameworks/1d4219cc-7cf5-42bf-bbe0-a82adfdef4b0-0007'
I0923 04:53:00.604724  1258 gc.cpp:133] Deleting /var/lib/mesos/slaves/e97a8d30-e7b4-41aa-ab12-7dedb8eaa21e-S0/frameworks/1d4219cc-7cf5-42bf-bbe0-a82adfdef4b0-0007/executors/1/runs/20f6aec1-ae87-4887-9d91-29265af20111
W0923 04:53:00.604758  1258 gc.cpp:142] Failed to delete '/var/lib/mesos/slaves/e97a8d30-e7b4-41aa-ab12-7dedb8eaa21e-S0/frameworks/1d4219cc-7cf5-42bf-bbe0-a82adfdef4b0-0007/executors/1/runs/20f6aec1-ae87-4887-9d91-29265af20111': No such file or directory
I0923 04:53:00.648247  1258 gc.cpp:133] Deleting /var/lib/mesos/slaves/e97a8d30-e7b4-41aa-ab12-7dedb8eaa21e-S0/frameworks/1d4219cc-7cf5-42bf-bbe0-a82adfdef4b0-0007/executors/1
W0923 04:53:00.648288  1258 gc.cpp:142] Failed to delete '/var/lib/mesos/slaves/e97a8d30-e7b4-41aa-ab12-7dedb8eaa21e-S0/frameworks/1d4219cc-7cf5-42bf-bbe0-a82adfdef4b0-0007/executors/1': No such file or directory
I0923 04:53:44.438127  1259 slave.cpp:4591] Current disk usage 66.87%. Max allowed age: 1.619062924066204days

also on the stage info page, note that there's a CoalescedRDD operation: http://10.13.44.50:4042/stages/stage/?id=5&attempt=2

Tying everything together, it's looking like the coalesce caused the drive to hit a threshold "full" value (in that a coalesce can minimize data movement, but can't won't prevent it entirely) and spark then unwound everything, causing the task failure.

jhpoelen commented 5 years ago

@roncanepa cool, these logs really tell the story! Thanks for digging into this.

Here's what comes to my mind for next steps:

  1. re-run previous experiment without the coalesce operation (will start this in a minute)
  2. figure out why 66% of disk on mesos is used. What is stored there? Perhaps tmp files that are not getting cleaned? Perhaps related to #60 ?
  3. figure out whether "W0923 04:53:00.604758 1258 gc.cpp:142] Failed to delete '/var/lib/mesos/slaves/e97a8d30-e7b4-41aa-ab12-7dedb8eaa21e-S0/frameworks/1d4219cc-7cf5-42bf-bbe0-a82adfdef4b0-0007/executors/1/runs/20f6aec1-ae87-4887-9d91-29265af20111': No such file or directory" is an error, more like a warning or a known issue.

Curious to hear your thoughts.

jhpoelen commented 5 years ago

fyi started experiment similar to https://github.com/bio-guoda/guoda-services/issues/77#issuecomment-533268883 , but without coalesce:

scala> import spark.implicits._                                                                             
import spark.implicits._                                                                                    

scala> val corpus = spark.read.parquet("/guoda/data/source=preston.acis.ufl.edu/dwca/core.parquet")         
corpus: org.apache.spark.sql.DataFrame = [http://purl.org/dc/terms/contributor: string, http://rs.tdwg.org/dwc/terms/organismRemarks: string ... 266 more fields]                                                       

scala> corpus.rdd.flatMap(_.toSeq.filter(_ != null).map(x => x.toString.toLowerCase)).map( x=> (x, 1)).reduceByKey(_ + _).map(x => (x._2, x._1)).sortByKey(false).saveAsTextFile("/user/jhpoelen/dwcafreq.txt")  
roncanepa commented 5 years ago

I'll see if I can examine other previous failures for any similar patterns.

roncanepa commented 5 years ago

Thank you for pointing out #60, that's quite helpful.

The disk usage is indeed all coming mostly from hdfs (in /srv):

root@mesos09:/# du -h --max-depth=1
0 ./sys
9.7M ./bin
87G ./var
3.3G ./usr
145M ./boot
4.0K ./lib64
8.0M ./sbin
19M ./etc
0 ./dev
4.0K ./media
2.8M ./tmp
4.0K ./mnt
16K ./lost+found
8.0K ./data
494M ./root
929M ./lib
980K ./run
65M ./.sbt
48K ./home
0 ./proc
585G ./srv
1.3G ./opt
678G .

The 85GB usage in /var is for mesos frameworks. Like #60 mentions, we can take a look to see if it's failed to clean any old ones out. Most of the "Failed to delete" errors that I mentioned in a previous comment were because those directories were already empty.

1d4219cc-7cf5-42bf-bbe0-a82adfdef4b0-00071 is the framework I was exploring yesterday, actually.

root@mesos09:/var/lib/mesos/slaves/dabbc59d-84c3-4fb3-8baa-045c55e020fb-S4/frameworks# du -h --max-depth=1
4.4G    ./dabbc59d-84c3-4fb3-8baa-045c55e020fb-0000-driver-20170808140116-0088
32K ./90c501d7-1666-4b40-ad52-2b402a71d2e3-0000
32K ./d712f8f3-f339-4fbb-a403-936306c0c95f-0001
68G ./1d4219cc-7cf5-42bf-bbe0-a82adfdef4b0-0007
3.7G    ./dabbc59d-84c3-4fb3-8baa-045c55e020fb-0000-driver-20170802164902-0001
1.4G    ./dabbc59d-84c3-4fb3-8baa-045c55e020fb-0000-driver-20170808140116-0087
77G .

At the end of the day, these nodes don't have much storage on them in the first place. With the loss of node 11, that only put more pressure on the remaining nodes.

There's still some imbalance in the usage across nodes, as well, with % dfs free anywhere between 10-35%.

todo from here:

  1. review #60 and do any cleanups there (and script+schedule any that are doable)

  2. continue to run the balancer to help improve utilization

  3. wait to see if your newest job completes to help ensure we're not dealing with multiple issues

roncanepa commented 5 years ago

Looks like that job was terminated as well. We'll have to see what else we can find.

jhpoelen commented 5 years ago

re: 1 + 2 - much agreed!

re: 3 - Success! The last experiment completed successfully after removing the coalesce(20) operation. The result was an ordered list of word frequency in entire dwca corpus spread across about 1000 uncompressed text files in /user/jhpoelen/dwcafreq.txt/ totaling 58.6 GB .

This time is seems that we 1. didn't crash HDFS and 2. had enough space to store intermediate results.

My take-aways are: 1. root cause analysis is extremely useful and @roncanepa was instrumental 2. monitoring disk usage is important 3. spreading / chunking results across may files seems to reduce local cache disk pressure 4. a suspicion that hdfs crashes on highly parallel writes 5. we'd benefit from more/faster node storage capacity, ideally separating hfds resources from mesos/spark tmp/work files .

Re: 5. Can't we just get a bunch of 1TB sdd/NMVe drives ? Might save many hours of troubleshooting in the near future.

And, just for fun . . . the most popular dwca value was . . . drum roll . . . "aves" with 1,824,027,947 occurrences. Here's the top 10:

$ hdfs dfs -cat /user/jhpoelen/dwcafreq.txt/part-00000 | head                               
(1824027947,aves)                                                                                                                  
(1732055899,chordata)                                                                                                              
(1631802656,europe)                                                                                                                
(1628759810,present)                                                                                                               
(1583700199,1)                                                                                                                     
(1472255439,sweden)                                                                                                                
(1435328139,se)                                                                                                                    
(1428784596,occurrence)                                                                                                            
(1420473001,swedish)                                                                                                               
(1420472839,urn:lsid:swedishlifewatch.se:dataprovider:1)

It appears that humans are disproportionately interested in birds (probably due to eBird). . . and that the swedes are prolific digitizers.

Next experiment attempts to reproduce the hdfs crash by removing the constraints of using only 20 cpu ( removing the --total-executor-cores 20), by running:

spark-shell \                                                                                               
  --master mesos://zk://mesos01:2181,mesos02:2181,mesos03:2181/mesos \                                      
  --driver-memory 4G \                                                                                      
  --executor-memory 10G \                                                                                   
  --conf spark.sql.caseSensitive=true 

with:

import spark.implicits._                                                                                    
val corpus = spark.read.parquet("/guoda/data/source=preston.acis.ufl.edu/dwca/core.parquet")                
corpus.rdd.flatMap(_.toSeq.filter(_ != null).map(x => x.toString.toLowerCase)).map( x=> (x, 1)).reduceByKey(_ + _).map(x => (x._2, x._1)).sortByKey(false).saveAsTextFile("/user/jhpoelen/dwcafreq2.txt")

Although I am expecting that writing to parquet would be more efficient, I'll stick with text files to minimize variations in our experiments.

jhpoelen commented 5 years ago

re: https://github.com/bio-guoda/guoda-services/issues/77#issuecomment-534731133 - looks like the job terminated successfully, see my previous comment.

jhpoelen commented 5 years ago

btw - I've started the next experiment with app id 70150f71-010e-4490-a591-b0c301692efe-0000 .

roncanepa commented 5 years ago

looks like the job terminated successfully, see my previous comment.

Excellent! I must have read things wrong.

I've implemented cluster-level monitoring of hdfs space as well, but the ultimate issue is the same one you and Matt were discussing in #60: hdfs is 72% full. Within 75-80% is the maximum before more troubles begin happening. And apparently the local drives don't always have enough room for temporary tasks.

Note that the way the jobs are programmed will affect how much data needs to move, be collected at stage divisions, and so forth, so you can also experiment with tuning jobs, both what data is actually pulled and the order in which operations are done (and the number of partitions used).

Otherwise, you can remove things from hdfs that you no longer need, too.

As I mentioned before, we can also sweep the sandboxes to make sure no data has been left behind by terminated executors.

Re: 4, we'll have to keep an eye on things.

re: 5, that's a conversation between yourself and Jose.

I also started running another storage rebalance. The defaults are mild so it shouldn't have any major impact on further jobs.