apache-spark-on-k8s / spark

Apache Spark enhanced with native Kubernetes scheduler back-end: NOTE this repository is being ARCHIVED as all new development for the kubernetes scheduler back-end is now on https://github.com/apache/spark/
https://spark.apache.org/
Apache License 2.0
612 stars 118 forks source link

badMessage: java.lang.IllegalStateException: too much data after closed for HttpChannelOverHttp@2830dea8{r=1,c=false,a=IDLE,uri=} #318

Closed duyanghao closed 7 years ago

duyanghao commented 7 years ago

sprak submit logs:

2017-05-27T02:47:28.058794414Z feign.RetryableException: Connection reset executing POST http://x.x.x.x:30976/v1/submissions/create
2017-05-27T02:47:28.058798205Z at feign.FeignException.errorExecuting(FeignException.java:67)
2017-05-27T02:47:28.058802027Z at feign.SynchronousMethodHandler.executeAndDecode(SynchronousMethodHandler.java:102)
2017-05-27T02:47:28.058805214Z at feign.SynchronousMethodHandler.invoke(SynchronousMethodHandler.java:76)
2017-05-27T02:47:28.058808233Z at feign.ReflectiveFeign$FeignInvocationHandler.invoke(ReflectiveFeign.java:103)
2017-05-27T02:47:28.058811266Z at com.sun.proxy.$Proxy31.submitApplication(Unknown Source)
2017-05-27T02:47:28.058814367Z at org.apache.spark.deploy.kubernetes.Client.org$apache$spark$deploy$kubernetes$Client$$submitApplicationToDriverServer(Client.scala:277)
2017-05-27T02:47:28.058817479Z at org.apache.spark.deploy.kubernetes.Client$$anonfun$run$9$$anonfun$apply$8.apply(Client.scala:192)
2017-05-27T02:47:28.058832872Z at org.apache.spark.deploy.kubernetes.Client$$anonfun$run$9$$anonfun$apply$8.apply(Client.scala:155)
2017-05-27T02:47:28.058835985Z at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2494)
2017-05-27T02:47:28.058838739Z at org.apache.spark.deploy.kubernetes.Client$$anonfun$run$9.apply(Client.scala:155)
2017-05-27T02:47:28.058841638Z at org.apache.spark.deploy.kubernetes.Client$$anonfun$run$9.apply(Client.scala:144)
2017-05-27T02:47:28.058844532Z at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2494)
2017-05-27T02:47:28.058847406Z at org.apache.spark.deploy.kubernetes.Client.run(Client.scala:144)
2017-05-27T02:47:28.058850184Z at org.apache.spark.deploy.kubernetes.Client$.main(Client.scala:736)
2017-05-27T02:47:28.058854016Z at org.apache.spark.deploy.kubernetes.Client.main(Client.scala)
2017-05-27T02:47:28.058857541Z at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
2017-05-27T02:47:28.058860352Z at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
2017-05-27T02:47:28.058863246Z at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
2017-05-27T02:47:28.058866262Z at java.lang.reflect.Method.invoke(Method.java:498)
2017-05-27T02:47:28.058869217Z at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:757)
2017-05-27T02:47:28.058872266Z at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:188)
2017-05-27T02:47:28.058875252Z at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:213)
2017-05-27T02:47:28.058877991Z at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:127)
2017-05-27T02:47:28.058880835Z at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
2017-05-27T02:47:28.058887456Z Caused by: java.net.SocketException: Connection reset
2017-05-27T02:47:28.058890386Z at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:115)
2017-05-27T02:47:28.058893124Z at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
2017-05-27T02:47:28.058896078Z at okio.Okio$1.write(Okio.java:78)
2017-05-27T02:47:28.058898746Z at okio.AsyncTimeout$1.write(AsyncTimeout.java:179)
2017-05-27T02:47:28.058901482Z at okio.RealBufferedSink.emitCompleteSegments(RealBufferedSink.java:171)
2017-05-27T02:47:28.058904314Z at okio.RealBufferedSink.write(RealBufferedSink.java:41)
2017-05-27T02:47:28.058907099Z at okhttp3.internal.http1.Http1Codec$FixedLengthSink.write(Http1Codec.java:286)
2017-05-27T02:47:28.058909875Z at okio.RealBufferedSink.emitCompleteSegments(RealBufferedSink.java:171)
2017-05-27T02:47:28.058912669Z at okio.RealBufferedSink.write(RealBufferedSink.java:91)
2017-05-27T02:47:28.058915406Z at okhttp3.RequestBody$2.writeTo(RequestBody.java:96)
2017-05-27T02:47:28.058918127Z at okhttp3.internal.http.CallServerInterceptor.intercept(CallServerInterceptor.java:59)
2017-05-27T02:47:28.058920986Z at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
2017-05-27T02:47:28.058923848Z at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:45)
2017-05-27T02:47:28.058926613Z at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
2017-05-27T02:47:28.058929718Z at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
2017-05-27T02:47:28.058932498Z at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)
2017-05-27T02:47:28.058939102Z at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
2017-05-27T02:47:28.058942108Z at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
2017-05-27T02:47:28.058944964Z at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
2017-05-27T02:47:28.058947820Z at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
2017-05-27T02:47:28.058950836Z at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:120)
2017-05-27T02:47:28.058953768Z at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
2017-05-27T02:47:28.058956644Z at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
2017-05-27T02:47:28.058959638Z at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:179)
2017-05-27T02:47:28.058962386Z at okhttp3.RealCall.execute(RealCall.java:63)
2017-05-27T02:47:28.058965797Z at feign.okhttp.OkHttpClient.execute(OkHttpClient.java:153)
2017-05-27T02:47:28.058968600Z at org.apache.spark.deploy.rest.kubernetes.HttpClientUtil$$anon$1.execute(HttpClientUtil.scala:53)
2017-05-27T02:47:28.058971646Z at feign.SynchronousMethodHandler.executeAndDecode(SynchronousMethodHandler.java:97)
2017-05-27T02:47:28.058974604Z ... 22 more

spark driver logs:

2017-05-26T11:58:58.943112117Z 2017-05-26 11:58:58 WARN HttpParser:1355 - badMessage: java.lang.IllegalStateException: too much data after closed for HttpChannelOverHttp@2830dea8{r=1,c=false,a=IDLE,uri=}
2017-05-26T11:59:00.735405817Z 2017-05-26 11:59:00 WARN ServletHandler:667 - Error for /v1/submissions/create
2017-05-26T11:59:00.735423127Z java.lang.OutOfMemoryError: Java heap space

jars file:

bash-4.3# ls examples/jars/tsp_rt_v2-1.0.0-jar-with-dependencies.jar  -l
-rw-r--r--    1 root     root     117944155 May 23 08:08 examples/jars/tsp_rt_v2-1.0.0-jar-with-dependencies.jar

And if i submit file:

bash-4.3# ls examples/jars/spark-examples-1.0-SNAPSHOT.jar -l
-rw-r--r--    9 root     root          7405 Apr 20 07:22 examples/jars/spark-examples-1.0-SNAPSHOT.jar

i will succeed.

so, i guess the bug above has something to do with file size.

mccheah commented 7 years ago

This is using submission v1 which has now been replaced by an entirely different submission workflow. Can you take a look at the updated documentation, pull from branch-2.1-kubernetes, and try with the new submission workflow? The resource staging server streams the data to disk directly instead of holding it in memory, so that should help this case.

duyanghao commented 7 years ago

After googling around,the bug above may has something to do with jetty Setting Max Form Size in spark.But i still don't know how to make changes to that setting in spark.

Is the submission v2 different with submission v1 in the jetty usage in spark?especially in Setting Max Form Size?

mccheah commented 7 years ago

Submission v2 streams the jars and the files to the resource staging server, as opposed to using a single JSON blob all at once. We also use multipart form data uploads there.

duyanghao commented 7 years ago

@mccheah as you said,i pull from branch-2.1-kubernetes, and try with the new submission workflow,but questions below appear:

question 1: Seen from kubernetes pods,the driver resource configuration of --conf spark.driver.memory and --conf spark.driver.cores has actually been invalid.

question 2: If i start spark with pars --conf spark.kubernetes.driver.pod.name=xxx , the driver pod name is indeed xxx,but executor pod(name should be something like this:'xxx-exec-1') could not be found.

question 3: What is the function of initcontainer? why does it exist both in driver and executor pods?

question 4: Maybe related to question 3. What is the function of resourceStagingServer? I am think about that what on earth the resource restrict(cpu and memory) of resourceStagingServer should be?

guess that user A sends Kbytes jar file while user B sends Gbytes jar file,will the resource allocation of resourceStagingServer be the max of them?

question 5: How can i configurate spark to get it if the jar file is stored in hdfs?

question 6: Is there any detailed documents about the principle of Submission v2?and what is the advantage of this approach comparing with Submission v1?

duyanghao commented 7 years ago

@mccheah when i submit with submission v2 workflow,the error below appear:

Exception in thread "main" java.net.SocketTimeoutException: timeout
    at okio.Okio$4.newTimeoutException(Okio.java:227)
    at okio.AsyncTimeout.exit(AsyncTimeout.java:284)
    at okio.AsyncTimeout$2.read(AsyncTimeout.java:240)
    at okio.RealBufferedSource.indexOf(RealBufferedSource.java:325)
    at okio.RealBufferedSource.indexOf(RealBufferedSource.java:314)
    at okio.RealBufferedSource.readUtf8LineStrict(RealBufferedSource.java:210)
    at okhttp3.internal.http1.Http1Codec.readResponseHeaders(Http1Codec.java:189)
    at okhttp3.internal.http.CallServerInterceptor.intercept(CallServerInterceptor.java:67)
    at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
    at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:45)
    at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
    at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
    at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)
    at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
    at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
    at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
    at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
    at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:120)
    at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
    at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
    at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:179)
    at okhttp3.RealCall.execute(RealCall.java:63)
    at retrofit2.OkHttpCall.execute(OkHttpCall.java:174)
    at org.apache.spark.deploy.kubernetes.submit.SubmittedDependencyUploaderImpl.getTypedResponseResult(SubmittedDependencyUploaderImpl.scala:102)
    at org.apache.spark.deploy.kubernetes.submit.SubmittedDependencyUploaderImpl.doUpload(SubmittedDependencyUploaderImpl.scala:98)
    at org.apache.spark.deploy.kubernetes.submit.SubmittedDependencyUploaderImpl.uploadJars(SubmittedDependencyUploaderImpl.scala:71)
    at org.apache.spark.deploy.kubernetes.submit.Client$$anonfun$6.apply(Client.scala:152)
    at org.apache.spark.deploy.kubernetes.submit.Client$$anonfun$6.apply(Client.scala:151)
    at scala.Option.map(Option.scala:146)
    at org.apache.spark.deploy.kubernetes.submit.Client.run(Client.scala:151)
    at org.apache.spark.deploy.kubernetes.submit.Client$$anonfun$run$6.apply(Client.scala:323)
    at org.apache.spark.deploy.kubernetes.submit.Client$$anonfun$run$6.apply(Client.scala:303)
    at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2494)
    at org.apache.spark.deploy.kubernetes.submit.Client$.run(Client.scala:303)
    at org.apache.spark.deploy.kubernetes.submit.Client$.main(Client.scala:264)
    at org.apache.spark.deploy.kubernetes.submit.Client.main(Client.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:757)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:188)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:213)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:127)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.net.SocketException: Socket closed
    at java.net.SocketInputStream.read(SocketInputStream.java:203)
    at java.net.SocketInputStream.read(SocketInputStream.java:141)
    at okio.Okio$2.read(Okio.java:138)
    at okio.AsyncTimeout$2.read(AsyncTimeout.java:236)
    ... 42 more
mccheah commented 7 years ago

@duyanghao thanks a lot for the feedback and questions. We ought to make some of these details clearer in our documentation, and this brought a number of issues to light that I've since filed tickets for.

question 1: Seen from kubernetes pods,the driver resource configuration of --conf spark.driver.memory and --conf spark.driver.cores has actually been invalid.

This is correct - looks like we failed to port over the driver resource requests and limits from submission v1. Filed as #334.

question 2: If i start spark with pars --conf spark.kubernetes.driver.pod.name=xxx , the driver pod name is indeed xxx,but executor pod(name should be something like this:'xxx-exec-1') could not be found.

Can you set the application name via spark.app.name? That will make the prefixes consistent everywhere. Separately, I think there's a discussion that should be had around how users should be able to configure the names of things. I filed #335 to discuss the matter further.

question 3: What is the function of initcontainer? why does it exist both in driver and executor pods?

question 4: Maybe related to question 3. What is the function of resourceStagingServer? I am think about that what on earth the resource restrict(cpu and memory) of resourceStagingServer should be? guess that user A sends Kbytes jar file while user B sends Gbytes jar file,will the resource allocation of resourceStagingServer be the max of them?

Regarding both of these questions, the init-container downloads dependencies for the user's applications. There are two types of dependencies the init-container is responsible for downloading.

The first is jars that are stored in remote locations like hdfs or on HTTP servers. In such cases, if you add jars to spark.jars that have URIs like hdfs://my-cluster.spark.dev:9000/apps/app.jar and http://spark-app.com:8080/apps/app.jar, the init-containers would be responsible for downloading these jars to the containers before they execute.

The second is jars that are submitted from the user's local disk. This is where the resource staging server comes in. Suppose you are working on an application that is changing quickly, and you're producing your artifacts on your local disk with your local Maven commands or equivalent build scripts. One could conceivably want to avoid needing to re-host these dependencies repeatedly; it would be overkill for a user to manually upload these to a distributed filesystem (or perhaps one is not available), and it could also be heavyweight for the user to build a custom Docker image and push it to their registry. Of course, the user would still want to work against their dev cluster. In this case, the dev cluster admin would install the resource staging server, and you would set spark.kubernetes.resourceStagingServer.uri in your application's configuration. In doing this, spark-submit sends your dependencies that you've specified with either no URI scheme or a URI with scheme file:// (e.g. /Users/johndoe/apps/app.jar, file:///Users/janedoe/workspace/dev/app.jar) up to the resource staging server. The init-container will then download these dependencies from the resource staging server as well.

The init-container's execution in the driver pod is necessary to run the user's code in the first place - typically the user's code won't be in the Spark distribution unless they're running an example class. The init-container's execution in the executor pod is necessary in particular scenarios when you want dependencies to be mounted on the executors class path at their boot time rather than after the JVM has started - think agents that need to be present at JVM startup time, or overriding Spark's versions of third-party classes with your own that have been submitted.

Since the resource staging server only streams bytes from the request input stream to the disk and never holds an entire jar in memory at once, its memory and CPU footprint should be minimal. If you're sending many jars or large jars to this resource staging server, I would recommend using a different hosting format such as HDFS or installing it in a custom Docker image, since you'll save download times using those other methods and gain the benefits of Docker image caching in the latter method. We intend to have a guide on writing custom Docker images that comply with the submission client's contract - I filed #336 for this. If you intend on using the resource staging server still for these larger dependencies, consider modifying the YML we provide to use a persistent volume. You will be primarily disk-bound for this service, despite the fact that we implemented resource cleanup in #305.

question 5: How can i configurate spark to get it if the jar file is stored in hdfs?

Adding the URI in spark.jars should suffice, e.g. bin/spark-submit ... --jars hdfs://my-test-cluster.spark.dev:9000/apps/app1/app1.jar .... We haven't tested this with complex HDFS setups - Kerberos authentication comes to mind. We don't ship a core-site.xml to the resource staging server by default, but it wouldn't be hard to extend the existing YAML to mount a core-site.xml file as a ConfigMap that is mounted on the resource staging server, and then to set the HADOOP_CONF_DIR environment variable on the resource staging server pod. This is all possible by modifying the PodSpec and creating a ConfigMap object, and should require no change in the resource staging server code itself.

question 6: Is there any detailed documents about the principle of Submission v2? and what is the advantage of this approach comparing with Submission v1?

The technical details of the new submission process need to be documented. I filed #337 to track this - we should be updating our proposal upstream in general anyways, as it has been a while since we attached the draft that's on the ticket. The technical discussion around why we switched approaches is discussed in detail on #167, but the short version of the reasoning is that:

  1. The main reason was that the old version required every driver pod to be accessible from the submission client so that the driver and executors could receive the application's dependencies. This is a difficult problem to solve generically. For example, the initial approach created a service that exposed the driver pod on a Service's NodePort, but NodePorts are limited resources, and not all environments will be able to expose services in this manner, a situation that I highlighted in #70 for a use case that @ash211 and I are working on. We could use an Ingress instead, but Ingress resources have very different semantics and behavior across different environments (GCE vs. GKE vs. simple Nginx etc.). We got a solution that could work for any environment in #140, but it requires application writers to provide a custom implementation of an interface which would be service loaded. This then requires all submitters to have the jar containing the implementation of the interface that is applicable for their organization and environment, not to mention that it places the burden on the cluster administrator to write an implementation of that service if the defaults did not suffice. Instead, we chose to let the cluster administrator decide how to expose a given service once using typical Kubernetes primitives. The endpoint could be customized for their environment - though reasonable defaults are provided in the YML spec that we bundle in the distribution - and then all drivers from that point forward need not be exposed to the submission clients.

  2. We wanted the driver's Docker command to just be a Java command directly into the user's main class, as opposed to a command that runs a server which then forks the user's code.

  3. Server-side TLS is a lot easier to set up for a single service, rather than dynamically handling TLS for every Spark job that's submitted, to secure the transfer of jars from the user's local disk up to the driver.

  4. The driver and executor pods can be restarted and re-fetch down the submitted dependencies repeatedly. This wasn't possible in submission v1 since when the driver crashed, it lost all of the dependencies that were submitted to it, and the new driver would have nowhere to fetch those dependencies from.

I hope these responses helped. I'll follow up on the stack trace in a second comment shortly.

mccheah commented 7 years ago

@duyanghao regarding your stack trace, it would seem that the resource staging server URI that was provided was not reachable. How did you start your resource staging server? Are you using the provided YML in the distribution?

mccheah commented 7 years ago

question 4: Maybe related to question 3. What is the function of resourceStagingServer? I am think about that what on earth the resource restrict(cpu and memory) of resourceStagingServer should be? guess that user A sends Kbytes jar file while user B sends Gbytes jar file,will the resource allocation of resourceStagingServer be the max of them?

It's also worth noting that the resource staging server is only required if any application dependencies are being sent from the user's local disk. One could conceive of a production environment where all Spark applications are bundled into Docker images. In such scenarios the resource staging server does not have to be deployed, hence why the URL is optional. The init-container will still always run to fetch remotely-hosted files, if any.

It's perhaps worth bypassing attaching the init-container entirely if spark.jars and spark.files is empty. I filed #338 for that.

duyanghao commented 7 years ago

@mccheah yeah,i am using the provided conf/kubernetes-resource-staging-server.yaml,and start resource staging server with command: kubectl create -f conf/kubernetes-resource-staging-server.yaml.

when i submit the little file spark-examples-1.0-SNAPSHOT.jar,it will succeed,but it will fail with the error below sometimes when i submit the big file tsp_rt_v2-1.0.0-jar-with-dependencies.jar

Exception in thread "main" java.net.SocketTimeoutException: timeout
    at okio.Okio$4.newTimeoutException(Okio.java:227)
    at okio.AsyncTimeout.exit(AsyncTimeout.java:284)
    at okio.AsyncTimeout$2.read(AsyncTimeout.java:240)
    at okio.RealBufferedSource.indexOf(RealBufferedSource.java:325)
    at okio.RealBufferedSource.indexOf(RealBufferedSource.java:314)
    at okio.RealBufferedSource.readUtf8LineStrict(RealBufferedSource.java:210)
    at okhttp3.internal.http1.Http1Codec.readResponseHeaders(Http1Codec.java:189)
    at okhttp3.internal.http.CallServerInterceptor.intercept(CallServerInterceptor.java:67)
    at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
    at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:45)
    at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
    at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
    at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)
    at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
    at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
    at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
    at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
    at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:120)
    at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
    at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
    at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:179)
    at okhttp3.RealCall.execute(RealCall.java:63)
    at retrofit2.OkHttpCall.execute(OkHttpCall.java:174)
    at org.apache.spark.deploy.kubernetes.submit.SubmittedDependencyUploaderImpl.getTypedResponseResult(SubmittedDependencyUploaderImpl.scala:102)
    at org.apache.spark.deploy.kubernetes.submit.SubmittedDependencyUploaderImpl.doUpload(SubmittedDependencyUploaderImpl.scala:98)
    at org.apache.spark.deploy.kubernetes.submit.SubmittedDependencyUploaderImpl.uploadJars(SubmittedDependencyUploaderImpl.scala:71)
    at org.apache.spark.deploy.kubernetes.submit.Client$$anonfun$6.apply(Client.scala:152)
    at org.apache.spark.deploy.kubernetes.submit.Client$$anonfun$6.apply(Client.scala:151)
    at scala.Option.map(Option.scala:146)
    at org.apache.spark.deploy.kubernetes.submit.Client.run(Client.scala:151)
    at org.apache.spark.deploy.kubernetes.submit.Client$$anonfun$run$6.apply(Client.scala:323)
    at org.apache.spark.deploy.kubernetes.submit.Client$$anonfun$run$6.apply(Client.scala:303)
    at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2494)
    at org.apache.spark.deploy.kubernetes.submit.Client$.run(Client.scala:303)
    at org.apache.spark.deploy.kubernetes.submit.Client$.main(Client.scala:264)
    at org.apache.spark.deploy.kubernetes.submit.Client.main(Client.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:757)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:188)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:213)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:127)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.net.SocketException: Socket closed
    at java.net.SocketInputStream.read(SocketInputStream.java:203)
    at java.net.SocketInputStream.read(SocketInputStream.java:141)
    at okio.Okio$2.read(Okio.java:138)
    at okio.AsyncTimeout$2.read(AsyncTimeout.java:236)
    ... 42 more

Attention: it wil also succeed sometimes when i submit the big file tsp_rt_v2-1.0.0-jar-with-dependencies.jar

bash-4.3# ls examples/jars/spark-examples-1.0-SNAPSHOT.jar -l
-rw-r--r--    9 root     root          7405 Apr 20 07:22 examples/jars/spark-examples-1.0-SNAPSHOT.jar
bash-4.3# ls examples/jars/tsp_rt_v2-1.0.0-jar-with-dependencies.jar  -l
-rw-r--r--    1 root     root     117944155 May 23 08:08 examples/jars/tsp_rt_v2-1.0.0-jar-with-dependencies.jar
duyanghao commented 7 years ago

@mccheah For local test,i drop the usage of resource staging server and build the custom driver and executor image with the jar dependencies stored in /opt/spark/examples/jars/tsp_rt_v2-1.0.0-jar-with-dependencies.jar, and when i submit with command local:///opt/spark/examples/jars/tsp_rt_v2-1.0.0-jar-with-dependencies.jar,error happens like below:

kubectl get pods -n=xxx -a
NAME                       READY     STATUS    RESTARTS   AGE
spark-secure-xyzu          1/1       Running   0          3m
spark-secure-xyzu-exec-1   0/1       Error     0          2m
spark-secure-xyzu-exec-2   0/1       Error     0          2m

executor container error log:

2017-06-08 10:00:47 WARN  NettyRpcEndpointRef:87 - Error sending message [message = RetrieveSparkAppConfig(1)] in 3 attempts
org.apache.spark.SparkException: Exception thrown in awaitResult
    at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
    at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
    at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
    at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
    at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:203)
    at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:67)
    at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:66)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
    at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:66)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:284)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
Caused by: org.apache.spark.SparkException: Unsupported message RpcMessage(xxx,RetrieveSparkAppConfig(1),org.apache.spark.rpc.netty.RemoteNettyRpcCallContext@29fc646c) from xxx
    at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1$$anonfun$apply$mcV$sp$1.apply(Inbox.scala:106)
    at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1$$anonfun$apply$mcV$sp$1.apply(Inbox.scala:105)
    at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anonfun$receiveAndReply$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:150)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:105)
    at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205)
    at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:101)
    at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1713)
    at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:66)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:284)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
Caused by: org.apache.spark.SparkException: Error sending message [message = RetrieveSparkAppConfig(1)]
    at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:119)
    at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:203)
    at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:67)
    at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:66)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
    ... 4 more
Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult
    at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
    at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
    at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
    at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
    ... 11 more
Caused by: org.apache.spark.SparkException: Unsupported message RpcMessage(xxx,RetrieveSparkAppConfig(1),org.apache.spark.rpc.netty.RemoteNettyRpcCallContext@29fc646c) from xxx
    at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1$$anonfun$apply$mcV$sp$1.apply(Inbox.scala:106)
    at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1$$anonfun$apply$mcV$sp$1.apply(Inbox.scala:105)
    at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anonfun$receiveAndReply$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:150)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:105)
    at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205)
    at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:101)
    at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

but when i replace tsp_rt_v2-1.0.0-jar-with-dependencies.jar with spark-examples-1.0-SNAPSHOT.jar,it will succeed again!!!

duyanghao commented 7 years ago

In my tests, there is no stable way(including local custom image and resource staging server) for submitting application with big dependencies(both failed with different strange errors).hope that there is one stable way to do it,after all,it is the prerequisite and foundation of spark application.

duyanghao commented 7 years ago

@mccheah I have created a PR to support specify CPU cores and Memory restricts for driver.Look at here

mccheah commented 7 years ago

@duyanghao it's possible that with large files, we're hitting timeouts on the upload with the resource staging server. That wouldn't be too surprising since we don't set timeouts on the Retrofit client, so it's likely using some default. Keep in mind that this is an HTTP upload so performance would reflect accordingly, hence why I suggested the custom Docker image.

Can you try instead to mount your custom jar on the Docker images and in your custom Dockerfile, also include this, and don't set spark.jars:

ENV SPARK_EXTRA_CLASSPATH /opt/spark/examples/jars/tsp_rt_v2-1.0.0-jar-with-dependencies.jar

Do this in both the driver and executor Docker images.

mccheah commented 7 years ago

For the above, the jar needs to be on both the driver and executor images, at that location.

foxish commented 7 years ago

2017-06-08 10:00:47 WARN NettyRpcEndpointRef:87 - Error sending message [message = RetrieveSparkAppConfig(1)] in 3 attempts

I have seen this before when the spark core version is expecting a different message, as opposed to the message we send from KubernetesSchedulerBackend. We modified that message for the shuffle service, so, the reason I can think of is the fat jar with dependencies that you built includes a conflicting version of spark libs.

erikerlandson commented 7 years ago

Related: generally good practice to shade Spark in uber jar assemblies, for example in sbt using % Provided modifier: https://github.com/erikerlandson/spark-kafka-sink/blob/master/build.sbt#L22

Other libs may also be shaded in some cases to reduce the jar size further: https://github.com/erikerlandson/spark-kafka-sink/blob/master/build.sbt#L34

duyanghao commented 7 years ago

@mccheah @foxish Thanks you for your advice,i rebuild jar with the latest spark libs,and it works now.

And i will still follow this issue as there are still many problems for resource staging server.

duyanghao commented 7 years ago

@mccheah maybe not related to this issue,i am curious about the support for Python application.Is there any plan for this part?Anyway,i have tried to make it worked recently and already had a demo.

erikerlandson commented 7 years ago

@duyanghao python is not yet supported but there is a PR in the works so hopefully soon

mccheah commented 7 years ago

@duyanghao I think we can close this issue and follow up on subsequent issues for the more precise matters you flagged. I opened #342 to track supporting large files, or what the proper messaging should be there. Does that sound ok to you?

duyanghao commented 7 years ago

@mccheah That's ok. And thank you for the help.

@erikerlandson could you give the links of the PR for the python support,maybe there is something i can do.