apache-spark-on-k8s / spark

Apache Spark enhanced with native Kubernetes scheduler back-end: NOTE this repository is being ARCHIVED as all new development for the kubernetes scheduler back-end is now on https://github.com/apache/spark/
https://spark.apache.org/
Apache License 2.0
612 stars 118 forks source link

dynamic allocation not working #461

Closed erikerlandson closed 7 years ago

erikerlandson commented 7 years ago

@ssaavedra is having problems using shuffle service and dynamic allocation, I'm creating this issue to track.

He is running against kube 1.7.2

sahilprasad commented 7 years ago

Concerned if this is due to me as a result of #460. I ran a manual test just now but it appears to be working fine on my end.

erikerlandson commented 7 years ago

@sahilprasad I don't think so, @ssaavedra updated the images on his local copies.

ssaavedra commented 7 years ago

For easier reference, I'll paste here the logs I gave so far.

Kubernetes version: 1.7.2 Spark on kubernetes images version: kubespark/[image]:v2.2.0-kubernetes-0.3.0

I created the shuffle service from conf/ with kubectl -nssaavedraspark apply -f shuffle.yaml, modifying the images so that they are at version v2.2.0-kubernetes-0.3.0.

The stacktrace on the driver:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 13, 100.96.4.249, executor 16): ExecutorLostFailure (executor 16 exited caused by one of the running tasks) Reason: Unable to create executor due to Unable to register with external shuffle server due to : Failed to connect to /100.96.4.249:7337
Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
        at org.apache.spark.streaming.scheduler.ReceiverTracker.runDummySparkJob(ReceiverTracker.scala:434)
        at org.apache.spark.streaming.scheduler.ReceiverTracker.launchReceivers(ReceiverTracker.scala:450)
        at org.apache.spark.streaming.scheduler.ReceiverTracker.start(ReceiverTracker.scala:160)
        at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:102)
        at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:583)
        at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578)
        at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578)
        at ... run in separate thread using org.apache.spark.util.ThreadUtils ... ()
        at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578)
        at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
        at StreamingSA$.main(StreamingSA.scala:43)
        at StreamingSA.main(StreamingSA.scala)
2017-08-23 10:51:15 INFO  SparkContext:54 - Invoking stop() from shutdown hook
2017-08-23 10:51:15 INFO  AbstractConnector:310 - Stopped Spark@24675ca9{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
2017-08-23 10:51:15 INFO  SparkUI:54 - Stopped Spark web UI at http://100.96.4.246:4040
2017-08-23 10:51:15 INFO  KubernetesClusterSchedulerBackend:54 - Shutting down all executors
2017-08-23 10:51:15 INFO  KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint:54 - Asking each executor to shut down
2017-08-23 10:51:15 INFO  KubernetesClusterSchedulerBackend:54 - Received delete pod spark-pi-4-1503485234043-exec-8 event. Reason: null
2017-08-23 10:51:15 INFO  KubernetesClusterSchedulerBackend:54 - Received delete pod spark-pi-4-1503485234043-exec-7 event. Reason: null
2017-08-23 10:51:15 WARN  TransportChannelHandler:78 - Exception in connection from /100.96.7.248:56728
java.io.IOException: Connection reset by peer
        at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
        at sun.nio.ch.IOUtil.read(IOUtil.java:192)
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
        at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221)
        at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899)
        at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:275)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
        at java.lang.Thread.run(Thread.java:748)
2017-08-23 10:51:15 INFO  KubernetesClusterSchedulerBackend:54 - Received delete pod spark-pi-4-1503485234043-exec-11 event. Reason: null
2017-08-23 10:51:15 INFO  KubernetesClusterSchedulerBackend:54 - Received delete pod spark-pi-4-1503485234043-exec-9 event. Reason: null
2017-08-23 10:51:15 INFO  KubernetesClusterSchedulerBackend:54 - Closing kubernetes client
2017-08-23 10:51:15 INFO  MapOutputTrackerMasterEndpoint:54 - MapOutputTrackerMasterEndpoint stopped!
2017-08-23 10:51:15 INFO  MemoryStore:54 - MemoryStore cleared
2017-08-23 10:51:15 INFO  BlockManager:54 - BlockManager stopped
2017-08-23 10:51:15 INFO  BlockManagerMaster:54 - BlockManagerMaster stopped
2017-08-23 10:51:15 INFO  OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:54 - OutputCommitCoordinator stopped!
2017-08-23 10:51:15 INFO  SparkContext:54 - Successfully stopped SparkContext
2017-08-23 10:51:15 INFO  ShutdownHookManager:54 - Shutdown hook called
2017-08-23 10:51:15 INFO  ShutdownHookManager:54 - Deleting directory /tmp/spark-7a089e4c-46cc-4855-8cd2-2c6e2ed2b990

The 100.96.4.249 IP seems to be the driver's pod IP instead of the shuffle service's IP, but I can't understand why.

The script I used to launch the Spark Driver is the following:

KUBEMASTER=${KUBEMASTER:-https://my-api-server.internal}
DRIVER_IMAGE=kubespark/spark-driver:v2.2.0-kubernetes-0.3.0
EXECUTOR_IMAGE=kubespark/spark-executor:v2.2.0-kubernetes-0.3.0
INITCONTAINER_IMAGE=kubespark/spark-init:v2.2.0-kubernetes-0.3.0
RSSURI=http://my-api-server.internal:31000
# RSSURI=$KUBEMASTER/api/v1/namespaces/ssaavedraspark/services/spark-resource-staging-service/proxy/ # That does not work
file=`pwd`/target/scala-2.11/twitter-sentiment-assembly-*.jar
file=$(echo $file)
exec spark-submit --class StreamingSA --master k8s://$KUBEMASTER \
  --kubernetes-namespace ssaavedraspark \
  `#--conf spark.master=$KUBEMASTER2` \
  `#--conf spark.executor.instances=1` \
  --conf spark.app.name=spark-pi-4 \
  --conf spark.kubernetes.driver.docker.image=$DRIVER_IMAGE \
  --conf spark.kubernetes.executor.docker.image=$EXECUTOR_IMAGE \
  --conf spark.kubernetes.initcontainer.docker.image=$INITCONTAINER_IMAGE \
  --conf spark.kubernetes.resourceStagingServer.uri=$RSSURI \
  --conf spark.kubernetes.resourceStagingServer.internal.uri=http://spark-resource-staging-service:10000 \
  --conf spark.shuffle.service.enabled=true \
  --conf spark.dynamicAllocation.enabled=true \
  --conf spark.kubernetes.shuffle.labels="app=spark-shuffle-service,spark-version=2.2.0" \
  --conf spark.kubernetes.shuffle.namespace=ssaavedraspark \
  --conf spark.kubernetes.shuffle.dir=/tmp \
  --deploy-mode cluster \
file://$file

The shuffle service seems to include the appropriate labels in the retrieved yaml:

# kubectl -nssaavedraspark get pod shuffle-8ks4v -o yaml
apiVersion: v1
kind: Pod
metadata:
  annotations:
    kubernetes.io/created-by: |
      {"kind":"SerializedReference","apiVersion":"v1","reference":{"kind":"DaemonSet","namespace":"ssaavedraspark","name":"shuffle","uid":"b790b4e0-8365-11e7-9942-06609768f4c5","apiVersion":"extensions","resourceVersion":"7284498"}}
  creationTimestamp: 2017-08-17T16:04:15Z
  generateName: shuffle-
  labels:
    app: spark-shuffle-service
    controller-revision-hash: "228216293"
    pod-template-generation: "1"
    spark-version: 2.2.0
  name: shuffle-8ks4v
  namespace: ssaavedraspark
  ownerReferences:
  - apiVersion: extensions/v1beta1
    blockOwnerDeletion: true
    controller: true
    kind: DaemonSet
    name: shuffle
    uid: b790b4e0-8365-11e7-9942-06609768f4c5
  resourceVersion: "7284543"
  selfLink: /api/v1/namespaces/ssaavedraspark/pods/shuffle-8ks4v
  uid: b7926fc9-8365-11e7-9942-06609768f4c5
spec:
  containers:
  - image: kubespark/spark-shuffle:v2.2.0-kubernetes-0.3.0
    imagePullPolicy: IfNotPresent
    name: shuffle
    resources:
      limits:
        cpu: "1"
      requests:
        cpu: "1"
    terminationMessagePath: /dev/termination-log
    terminationMessagePolicy: File
    volumeMounts:
    - mountPath: /var/tmp
      name: temp-volume
    - mountPath: /var/run/secrets/kubernetes.io/serviceaccount
      name: default-token-dh76f
      readOnly: true
  dnsPolicy: ClusterFirst
  nodeName: ip-x-x-x-x.eu-west-2.compute.internal
  restartPolicy: Always
  schedulerName: default-scheduler
  securityContext: {}
  serviceAccount: default
  serviceAccountName: default
  terminationGracePeriodSeconds: 30
  tolerations:
  - effect: NoExecute
    key: node.alpha.kubernetes.io/notReady
    operator: Exists
  - effect: NoExecute
    key: node.alpha.kubernetes.io/unreachable
    operator: Exists
  volumes:
  - hostPath:
      path: /var/tmp
    name: temp-volume
  - name: default-token-dh76f
    secret:
      defaultMode: 420
      secretName: default-token-dh76f
status:
  conditions:
  - lastProbeTime: null
    lastTransitionTime: 2017-08-17T16:04:15Z
    status: "True"
    type: Initialized
  - lastProbeTime: null
    lastTransitionTime: 2017-08-17T16:04:18Z
    status: "True"
    type: Ready
  - lastProbeTime: null
    lastTransitionTime: 2017-08-17T16:04:18Z
    status: "True"
    type: PodScheduled
  containerStatuses:
  - containerID: docker://aa917c7536abe14ddd1e13d7c735f73156196f666cd8a4348bf918b9d1263417
    image: kubespark/spark-shuffle:v2.2.0-kubernetes-0.3.0
    imageID: docker-pullable://kubespark/spark-shuffle@sha256:52e2546d8b39f3e72430089c001664d2cfa9603f1362d0c84cf1cd9bc0fe52da
    lastState: {}
    name: shuffle
    ready: true
    restartCount: 0
    state:
      running:
        startedAt: 2017-08-17T16:04:18Z
  hostIP: 172.20.35.40
  phase: Running
  podIP: 100.96.2.67
  qosClass: Burstable
  startTime: 2017-08-17T16:04:15Z
ssaavedra commented 7 years ago

I've just spun up a new cluster with kube 1.6.6, and similar problems are showing up.

Also, it seems that 256MB is too low memory for my toy use-case in the resource staging server, so I pushed that to 1024MB in my installation.

For some reason, in this cluster, the executors are actually booted but fail immediately. Perhaps, it was so in the 1.7 cluster, and I just wasn't quick enough to find out.

This time, I had the scripting ready, and I found the following on the executor 1:

result of kubectl describe pods spark-pi-1-1503676660202-exec-1

Name:           spark-pi-1-1503676660202-exec-1
Namespace:      default
Node:           ip-172-20-82-247.eu-west-2.compute.internal/172.20.82.247
Start Time:     Fri, 25 Aug 2017 17:59:44 +0200
Labels:         spark-app-selector=spark-ed0df0f15af948dab087261f0ba07e9a
                spark-exec-id=1
                spark-role=executor
Annotations:    kubernetes.io/limit-ranger=LimitRanger plugin set: cpu request for init container spark-init
Status:         Running
IP:             100.96.6.73
Controlled By:  Pod/spark-pi-1-1503676660202-driver
Init Containers:
  spark-init:
    Container ID:       docker://21802a75a6cc2a4a0cae9d30354e3c2d8747f69eaddb225590a7bb75068440eb
    Image:              kubespark/spark-init:v2.2.0-kubernetes-0.3.0
    Image ID:           docker-pullable://kubespark/spark-init@sha256:82c238bd88d6a3dc3a207bda61153024bf2b8a4ecccd05c1243c23181258445d
    Port:               <none>
    Args:
      /etc/spark-init/spark-init.properties
    State:              Terminated
      Reason:           Completed
      Exit Code:        0
      Started:          Fri, 25 Aug 2017 17:59:45 +0200
      Finished:         Fri, 25 Aug 2017 17:59:47 +0200
    Ready:              True
    Restart Count:      0
    Requests:
      cpu:              100m
    Environment:        <none>
    Mounts:
      /etc/spark-init from spark-init-properties (rw)
      /mnt/secrets/spark-init from spark-init-secret (rw)
      /var/run/secrets/kubernetes.io/serviceaccount from default-token-xnp2p (ro)
      /var/spark-data/spark-files from download-files (rw)
      /var/spark-data/spark-jars from download-jars-volume (rw)
Containers:
  executor:
    Container ID:       docker://c80d2d08a9beaf43193066329fb1b920481dcc9b68cb1261d5efdfd98857bc13
    Image:              kubespark/spark-executor:v2.2.0-kubernetes-0.3.0
    Image ID:           docker-pullable://kubespark/spark-executor@sha256:ff34a0a16978fa3d855abb6839fa5b25c177ac48b7ce90c549b14dfeecfde382
    Ports:              10000/TCP, 7079/TCP
    State:              Running
      Started:          Fri, 25 Aug 2017 17:59:48 +0200
    Ready:              True
    Restart Count:      0
    Limits:
      memory:   1408M
    Requests:
      cpu:      1
      memory:   1024M
    Environment:
      SPARK_EXECUTOR_PORT:      10000
      SPARK_DRIVER_URL:         spark://CoarseGrainedScheduler@100.96.6.72:38588
      SPARK_EXECUTOR_CORES:     1
      SPARK_EXECUTOR_MEMORY:    1g
      SPARK_APPLICATION_ID:     spark-ed0df0f15af948dab087261f0ba07e9a
      SPARK_EXECUTOR_ID:        1
      SPARK_MOUNTED_CLASSPATH:  /var/spark-data/spark-jars/*
      SPARK_USER:               root
      SPARK_EXECUTOR_MEMORY:    1024m
      SPARK_EXECUTOR_POD_IP:     (v1:status.podIP)
      SPARK_MOUNTED_FILES_DIR:  /var/spark-data/spark-files
    Mounts:
      /tmp from tmp (rw)
      /var/run/secrets/kubernetes.io/serviceaccount from default-token-xnp2p (ro)
      /var/spark-data/spark-files from download-files (rw)
      /var/spark-data/spark-jars from download-jars-volume (rw)
Conditions:
  Type          Status
  Initialized   True
  Ready         True
  PodScheduled  True
Volumes:
  tmp:
    Type:       HostPath (bare host directory volume)
    Path:       /tmp
  spark-init-properties:
    Type:       ConfigMap (a volume populated by a ConfigMap)
    Name:       spark-pi-1-1503676660202-init-config
    Optional:   false
  download-jars-volume:
    Type:       EmptyDir (a temporary directory that shares a pod's lifetime)
    Medium:
  download-files:
    Type:       EmptyDir (a temporary directory that shares a pod's lifetime)
    Medium:
  spark-init-secret:
    Type:       Secret (a volume populated by a Secret)
    SecretName: spark-pi-1-1503676660202-init-secret
    Optional:   false
  default-token-xnp2p:
    Type:       Secret (a volume populated by a Secret)
    SecretName: default-token-xnp2p
    Optional:   false
QoS Class:      Burstable
Node-Selectors: <none>
Tolerations:    node.alpha.kubernetes.io/notReady:NoExecute for 300s
                node.alpha.kubernetes.io/unreachable:NoExecute for 300s
Events:
  FirstSeen     LastSeen        Count   From                                                    SubObjectPath                   Type            Reason          Message
  ---------     --------        -----   ----                                                    -------------                   --------        ------          -------
  9s            9s              1       default-scheduler                                                                       Normal          Scheduled       Successfully assigned spark-pi-1-1503676660202-exec-1 to ip-172-20-82-247.eu-west-2.compute.internal
  8s            8s              1       kubelet, ip-172-20-82-247.eu-west-2.compute.internal    spec.initContainers{spark-init} Normal          Pulled          Container image "kubespark/spark-init:v2.2.0-kubernetes-0.3.0" already present on machine
  8s            8s              1       kubelet, ip-172-20-82-247.eu-west-2.compute.internal    spec.initContainers{spark-init} Normal          Created         Created container with id 21802a75a6cc2a4a0cae9d30354e3c2d8747f69eaddb225590a7bb75068440eb
  8s            8s              1       kubelet, ip-172-20-82-247.eu-west-2.compute.internal    spec.initContainers{spark-init} Normal          Started         Started container with id 21802a75a6cc2a4a0cae9d30354e3c2d8747f69eaddb225590a7bb75068440eb
  5s            5s              1       kubelet, ip-172-20-82-247.eu-west-2.compute.internal    spec.containers{executor}       Normal          Pulled          Container image "kubespark/spark-executor:v2.2.0-kubernetes-0.3.0" already present on machine
  5s            5s              1       kubelet, ip-172-20-82-247.eu-west-2.compute.internal    spec.containers{executor}       Normal          Created         Created container with id c80d2d08a9beaf43193066329fb1b920481dcc9b68cb1261d5efdfd98857bc13
  5s            5s              1       kubelet, ip-172-20-82-247.eu-west-2.compute.internal    spec.containers{executor}       Normal          Started         Started container with id c80d2d08a9beaf43193066329fb1b920481dcc9b68cb1261d5efdfd98857bc13

The logs have the following stacktrace (I can provide the full log if required):

[...]
2017-08-25 15:59:49 INFO  CoarseGrainedExecutorBackend:54 - Successfully registered with driver
2017-08-25 15:59:49 INFO  Executor:54 - Starting executor ID 1 on host 100.96.6.73
2017-08-25 15:59:49 INFO  Utils:54 - Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 33336.
2017-08-25 15:59:49 INFO  NettyBlockTransferService:54 - Server created on 100.96.6.73:33336
2017-08-25 15:59:49 INFO  BlockManager:54 - Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
2017-08-25 15:59:49 INFO  BlockManagerMaster:54 - Registering BlockManager BlockManagerId(1, 100.96.6.73, 33336, None)
2017-08-25 15:59:49 INFO  BlockManagerMaster:54 - Registered BlockManager BlockManagerId(1, 100.96.6.73, 33336, None)
2017-08-25 15:59:49 INFO  BlockManager:54 - external shuffle service port = 7337
2017-08-25 15:59:49 INFO  BlockManager:54 - Registering executor with local external shuffle service.
2017-08-25 15:59:49 ERROR BlockManager:91 - Failed to connect to external shuffle server, will retry 2 more times after waiting 5 seconds...
java.io.IOException: Failed to connect to /100.96.6.73:7337
        at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:232)
        at org.apache.spark.network.client.TransportClientFactory.createUnmanagedClient(TransportClientFactory.java:196)
        at org.apache.spark.network.shuffle.ExternalShuffleClient.registerWithShuffleServer(ExternalShuffleClient.java:133)
        at org.apache.spark.storage.BlockManager$$anonfun$registerWithExternalShuffleServer$1.apply$mcVI$sp(BlockManager.scala:263)
        at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
        at org.apache.spark.storage.BlockManager.registerWithExternalShuffleServer(BlockManager.scala:260)
        at org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:244)
        at org.apache.spark.executor.Executor.<init>(Executor.scala:117)
        at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:83)
        at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:117)
        at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205)
        at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:101)
        at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:213)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /100.96.6.73:7337
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
        at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:257)
        at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:291)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:631)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
        ... 1 more
2017-08-25 15:59:54 ERROR BlockManager:91 - Failed to connect to external shuffle server, will retry 1 more times after waiting 5 seconds...
[...]
erikerlandson commented 7 years ago

cc @foxish

erikerlandson commented 7 years ago

@ssaavedra can you attach console log output from the shuffle service pod(s)? Event-logs might also be interesting.

ssaavedra commented 7 years ago

Sorry, didn't see this message before. Are you referring to event-logs of the driver, executors or the shuffle-service pods?

None of the shuffle-service pods notices anything. These are the logs of one of them as a representative example:

2017-08-28 08:26:38 INFO  ExternalShuffleService:2574 - Started daemon with process name: 11@shuffle-rv965
2017-08-28 08:26:38 INFO  SignalUtils:54 - Registered signal handler for TERM
2017-08-28 08:26:38 INFO  SignalUtils:54 - Registered signal handler for HUP
2017-08-28 08:26:38 INFO  SignalUtils:54 - Registered signal handler for INT
2017-08-28 08:26:38 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2017-08-28 08:26:39 INFO  SecurityManager:54 - Changing view acls to: root
2017-08-28 08:26:39 INFO  SecurityManager:54 - Changing modify acls to: root
2017-08-28 08:26:39 INFO  SecurityManager:54 - Changing view acls groups to: 
2017-08-28 08:26:39 INFO  SecurityManager:54 - Changing modify acls groups to: 
2017-08-28 08:26:39 INFO  SecurityManager:54 - SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
2017-08-28 08:26:41 INFO  KubernetesExternalShuffleService:54 - Starting shuffle service on port 7337 (auth enabled = false)

kubectl describe driver pod: https://gist.github.com/ssaavedra/c6e5795e8be47195d2f51879d2a78988

I couldn't get the executor of that particular run, because the driver deletes the pod when it fails, I'll try to make another run at it.

ssaavedra commented 7 years ago

Logs for everything I could think of: https://gist.github.com/ssaavedra/e6d9805f5ac0e61e709ab19b2911c0cf

Includes:

foxish commented 7 years ago

I think it could be RBAC here as well, similar to https://github.com/apache-spark-on-k8s/spark/issues/448. The service account used with the shuffle service may not have the requisite permissions to watch all pods that it needs to watch.

ssaavedra commented 7 years ago

The serviceaccount was created with:

kubectl create clusterrolebinding default-default-cluster-admin --clusterrole=cluster-admin --serviceaccount=default:default

This is the result of describing the clusterrolebinding:

kubectl describe clusterrolebinding default-default-cluster-admin
Name:           default-default-cluster-admin
Labels:         <none>
Annotations:    <none>
Role:
  Kind: ClusterRole
  Name: cluster-admin
Subjects:
  Kind                  Name    Namespace
  ----                  ----    ---------
  ServiceAccount        default default
ssaavedra commented 7 years ago

@foxish I think the clusterrolebinding should provide the container with enough permissions on the default Namespace (and it's the only namespace I'm using in this test cluster). It should not be related to the cluster roles, I'm using this same ServiceAccount for other services in this cluster (kube v1.6.6) and in another dev cluster (kube v1.7.2) and it seems they are able to access the Kubernetes API and list all pods and endpoints in the namespace.

foxish commented 7 years ago

This is very odd. I'm unable to repro this case at all. Just tried with the 2.2 release and dynamic allocation on GKE with Kubernetes v1.7.3 and it all seems fine. Looking into the code, the only way it should point back to the driver's pod IP is if the labels somehow select the driver pod.

I'm posting all the config I used: https://gist.github.com/foxish/e9a7c496ef0fed54b23c1c37bf4b5a62 I used a vanilla release of 2.2.0 for this test, downloaded from https://github.com/apache-spark-on-k8s/spark/releases/tag/v2.2.0-kubernetes-0.3.0.

Can you test this particular setup? Just to remove any other variables from this mix.

ssaavedra commented 7 years ago

I'm sorry, I couldn't get back to this today.

For clarification tomorrow (due to our apparent timezone difference), i'd like to ask you the following so that I can take action tomorrow:

About removing variables do you mean something else besides the RBAC?

Looking into the code, the only way it should point back to the driver's pod IP is if the labels somehow select the driver pod.

It seems it does not point back to the driver pod, but instead it is some non-existent IP in the Pod IP range. But I can't really imagine that such an IP came from nowhere. I'll try pinpointing it down.

I used a vanilla release of 2.2.0 for this test, downloaded from https://github.com/apache-spark-on-k8s/spark/releases/tag/v2.2.0-kubernetes-0.3.0.

You mean you're pushing your own docker containers instead of using the ones at https://hub.docker.com/kubespark?

I'm also going to try my setup on GKE, to try and further isolate differences there might possibly be between my environment and yours.

ssaavedra commented 7 years ago

I tested successfully your example both on AWS and on GKE, where it run successfully.

While trying to isolate the differences, I found I could run into the same problem I have been describing in the present issue when trying to run my setup on GKE.

It seems that, setting other variables aside, this is related to the combination of using both the shuffle service and the resource staging server.

If instead of using a local: URL, I'm configuring the resourceStagingServer.uri (and .internal.uri) and using a file: URL, then I can reproduce the same problems in both AWS and GKE with similar error paths.

I'll try to use the Spark Examples instead of my own code, so that I can provide a reproducible scenario for you to test.

foxish commented 7 years ago

I see! That is very helpful. Thanks @ssaavedra for investigating further.

ssaavedra commented 7 years ago

Ok, after watching myself fail to reproduce this issue with the Spark Examples jar, it stumbled upon me. The job I was launching was being launched from a single jar with all dependencies embedded (using sbt-assembly). Thus, I would only have to upload a single jar to the resources staging server and everything would be easy to run. I was used to being able to do this on mesos and in the standalone setting.

However, I think that due to the fact that in sbt I'm pulling the spark dependencies from the released 2.2.0 version of Spark (no kubernetes support) and adding them to the classpath, that may be causing some havoc.

For this test I've been using a Twitter Sentiment Analysis example that is open source, and if I listed the dependencies as --jars file:///path/to/dependent.jar,file:///path/to/dependent-2.jar file:///path/to/main.jar (making sure I didn't pull any jar from org.apache.spark, those should already be on the executors and driver's classpath) instead of pulling an assembly jar it worked.

So, it seems this issue is after all related to the classpath interactions of assembling jars with their dependencies. I would suspect more users to be affected by this issue.

I will try to add to the sbt classpath the libraries in spark/jars, so that they are included in the fat jar from there instead of pulled from maven.

The other thing I could do is of course an image based FROM kubespark/spark-driver, but that would render the resource staging server pretty much useless.

Anyway, hopefully once this gets merged upstream this should be resolved as a consequence, as the dependency-downloaded jar would match exactly the one in the executors and drivers.

foxish commented 7 years ago

Makes sense, and seems to be one of the perils of building a fat jar. We should document this somewhere, but as you said, once it's upstreamed, this issue should pretty much vanish.

foxish commented 7 years ago

Closing this, as it was not an issue with the integration.

ssaavedra commented 7 years ago

Sorry about the later lack of engagement. I can put this on the documentation if you think it's ok. I can send you a PR about that next week.