Azure / azure-storage-java

Microsoft Azure Storage Library for Java
https://docs.microsoft.com/en-us/java/api/overview/azure/storage
MIT License
189 stars 163 forks source link

IOException: Stream is already closed #537

Closed jomach closed 4 years ago

jomach commented 4 years ago

What problem was encountered?

When running a spark job on aks + wasbs storage starting the application does not work with:

20/03/20 07:45:22 INFO KubernetesUtils: Uploading file: ............/config_example/job.yaml to dest: wasbs://...@.....blob.core.windows.net/spark-upload-21d5cf2c-2ad2-440d-9166-a8cca3636862/job.yaml...
Exception in thread "main" org.apache.spark.SparkException: Uploading file .....config_example/job.yaml failed...
    at org.apache.spark.deploy.k8s.KubernetesUtils$.uploadFileUri(KubernetesUtils.scala:287)
    at org.apache.spark.deploy.k8s.KubernetesUtils$.$anonfun$uploadAndTransformFileUris$1(KubernetesUtils.scala:246)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at scala.collection.TraversableLike.map(TraversableLike.scala:238)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
    at scala.collection.AbstractTraversable.map(Traversable.scala:108)
    at org.apache.spark.deploy.k8s.KubernetesUtils$.uploadAndTransformFileUris(KubernetesUtils.scala:245)
    at org.apache.spark.deploy.k8s.features.BasicDriverFeatureStep.$anonfun$getAdditionalPodSystemProperties$1(BasicDriverFeatureStep.scala:165)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.deploy.k8s.features.BasicDriverFeatureStep.getAdditionalPodSystemProperties(BasicDriverFeatureStep.scala:163)
    at org.apache.spark.deploy.k8s.submit.KubernetesDriverBuilder.$anonfun$buildFromFeatures$3(KubernetesDriverBuilder.scala:60)
    at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
    at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
    at scala.collection.immutable.List.foldLeft(List.scala:89)
    at org.apache.spark.deploy.k8s.submit.KubernetesDriverBuilder.buildFromFeatures(KubernetesDriverBuilder.scala:58)
    at org.apache.spark.deploy.k8s.submit.Client.run(KubernetesClientApplication.scala:98)
    at org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.$anonfun$run$4(KubernetesClientApplication.scala:221)
    at org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.$anonfun$run$4$adapted(KubernetesClientApplication.scala:215)
    at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2539)
    at org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.run(KubernetesClientApplication.scala:215)
    at org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.start(KubernetesClientApplication.scala:188)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:928)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1007)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.IllegalArgumentException: Self-suppression not permitted
    at java.lang.Throwable.addSuppressed(Throwable.java:1043)
    at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
    at org.apache.hadoop.fs.azure.NativeAzureFileSystem$NativeAzureFsOutputStream.close(NativeAzureFileSystem.java:818)
    at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
    at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
    at org.apache.hadoop.io.IOUtils.cleanup(IOUtils.java:245)
    at org.apache.hadoop.io.IOUtils.closeStream(IOUtils.java:262)
    at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:69)
    at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:120)
    at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:366)
    at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:338)
    at org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:1969)
    at org.apache.spark.deploy.k8s.KubernetesUtils$.uploadFileToHadoopCompatibleFS(KubernetesUtils.scala:307)
    at org.apache.spark.deploy.k8s.KubernetesUtils$.uploadFileUri(KubernetesUtils.scala:283)
    ... 30 more
Caused by: java.io.IOException: Stream is already closed.
    at com.microsoft.azure.storage.blob.BlobOutputStreamInternal.close(BlobOutputStreamInternal.java:332)
    at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
    at org.apache.hadoop.fs.azure.NativeAzureFileSystem$NativeAzureFsOutputStream.close(NativeAzureFileSystem.java:818)
    at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
    at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
    at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:62)
    ... 36 more

Which version of the SDK was used?

Command:

spark-submit \
       --master k8s://https://akscluster.eastus.azmk8s.io:443 \
       --deploy-mode cluster \
       --name rda-fal \
       --conf "spark.executor.instances=1" \
       --conf "spark.kubernetes.pyspark.pythonVersion=3" \
       --conf "spark.pyspark.python=python3.8" \
       --conf "spark.kubernetes.container.image.pullPolicy=Always" \
       --conf "spark.kubernetes.container.image=someImage" \
       --conf "spark.kubernetes.executor.podTemplateFile=./config_example/k8s/spark_template.yml" \
       --conf "spark.kubernetes.executor.podTemplateContainerName=spark-executor" \
       --conf "spark.hadoop.fs.azure.account.auth.type=OAuth" \
       --conf "spark.hadoop.fs.azure.account.oauth.provider.type=org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider" \
       --conf "spark.hadoop.fs.azure.account.oauth2.client.id=id" \
       --conf "spark.hadoop.fs.azure.account.oauth2.client.secret=secret"   \
       --conf "spark.hadoop.fs.azure.account.oauth2.client.endpoint=https://login.microsoftonline.com/id_endpoint/oauth2/token"  \
       --conf "spark.hadoop.fs.azure=org.apache.hadoop.fs.azure.NativeAzureFileSystem" \
       --conf "spark.hadoop.fs.azure.account.key.storage_name.blob.core.windows.net=secret" \
       --conf "spark.kubernetes.file.upload.path=wasbs://container_name@storage_name.blob.core.windows.net"\
       --files config_example/job.yaml \
       /app/cli.py --config_uri job.yaml

Spark Jars for azure: azure-storage-8.6.2.jar azure-storage-blob-11.0.1.jar hadoop-azure-2.7.7.jar Spark Version: spark 3.0.0 preview 2

rickle-msft commented 4 years ago

Hi, @jomach. Thank you for posting this issue. The first thing I notice is that you're using azure-storage-8.6.2 and azure-storage-blob-11.0.1. Is there a reason why you're trying to load both these versions side-by-side?

jomach commented 4 years ago

Not really, but this is true. I'm loading both libs. I was trying to get at least one working... Then I assume this is the problem. Closing. Thanks for the tipp