CODAIT / stocator

Stocator is high performing connector to object storage for Apache Spark, achieving performance by leveraging object storage semantics.
Apache License 2.0
113 stars 72 forks source link

Spark 2.3 app in k8s cluster: parquet->COS throws exception #200

Closed dkazakevich closed 5 years ago

dkazakevich commented 6 years ago

Created a sample Spark 2.3 application that runs in k8s cluster. The application creates a sample Dataset and trying to put it into Bluemix COS using stocator:

       StructType schema = DataTypes.createStructType(
                new StructField[]{
                        createStructField("NAME", StringType, false),
                        createStructField("STRING_VALUE", StringType, false),
                        createStructField("NUM_VALUE", IntegerType, false),
                });
        Row r1 = RowFactory.create("name1", "value1", 1);
        Row r2 = RowFactory.create("name2", "value2", 2);
        List<Row> rowList = ImmutableList.of(r1, r2);
        Dataset<Row> rows = spark.createDataFrame(rowList, schema);

        spark.sparkContext().hadoopConfiguration().set("fs.stocator.scheme.list", "cos");
        spark.sparkContext().hadoopConfiguration().set("fs.cos.impl", "com.ibm.stocator.fs.ObjectStoreFileSystem");
        spark.sparkContext().hadoopConfiguration().set("fs.stocator.cos.impl", "com.ibm.stocator.fs.cos.COSAPIClient");
        spark.sparkContext().hadoopConfiguration().set("fs.stocator.cos.scheme", "cos");
        spark.sparkContext().hadoopConfiguration().set("fs.cos.service.iam.api.key", spark.sparkContext().getConf().get("spark.COS_API_KEY"));
        spark.sparkContext().hadoopConfiguration().set("fs.cos.service.iam.service.id", spark.sparkContext().getConf().get("spark.COS_SERVICE_ID"));
        spark.sparkContext().hadoopConfiguration().set("fs.cos.service.endpoint", spark.sparkContext().getConf().get("spark.COS_ENDPOINT"));

        String path = "cos://" + spark.sparkContext().getConf().get("spark.COS_BUCKET") + ".service/" +
                spark.sparkContext().getConf().get("spark.COS_OUTPUT_FILENAME");
        rows.write().mode(SaveMode.Overwrite).parquet(path);

But I'm getting exception:

...
2018-08-09 20:18:44 INFO  TaskSchedulerImpl:54 - Adding task set 0.0 with 2 tasks
2018-08-09 20:18:44 INFO  TaskSetManager:54 - Starting task 0.0 in stage 0.0 (TID 0, 172.17.0.6, executor 2, partition 0, PROCESS_LOCAL, 8140 bytes)
2018-08-09 20:18:44 INFO  TaskSetManager:54 - Starting task 1.0 in stage 0.0 (TID 1, 172.17.0.5, executor 1, partition 1, PROCESS_LOCAL, 8205 bytes)
2018-08-09 20:18:45 INFO  BlockManagerInfo:54 - Added broadcast_0_piece0 in memory on 172.17.0.5:43525 (size: 48.4 KB, free: 408.9 MB)
2018-08-09 20:18:45 INFO  BlockManagerInfo:54 - Added broadcast_0_piece0 in memory on 172.17.0.6:39377 (size: 48.4 KB, free: 408.9 MB)
2018-08-09 20:18:47 WARN  TaskSetManager:66 - Lost task 0.0 in stage 0.0 (TID 0, 172.17.0.6, executor 2): org.apache.spark.SparkException: Task failed while writing rows.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:254)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:168)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:367)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: saving output plants.parquet/part-00000-8e5efd57-0e4a-4bed-95f9-2178e9b9dedd-c000-attempt_20180809201845_0000_m_000000_0.snappy.parquet com.ibm.stocator.thirdparty.cos.AmazonClientException: Unable to complete transfer: null
    at com.ibm.stocator.fs.cos.COSOutputStream.close(COSOutputStream.java:173)
    at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
    at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
    at org.apache.parquet.hadoop.util.HadoopPositionOutputStream.close(HadoopPositionOutputStream.java:64)
    at org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:685)
    at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:122)
    at org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:165)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetOutputWriter.scala:42)
    at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:57)
    at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:74)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:244)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:239)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1369)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:245)
    ... 8 more

2018-08-09 20:18:47 INFO  TaskSetManager:54 - Starting task 0.1 in stage 0.0 (TID 2, 172.17.0.6, executor 2, partition 0, PROCESS_LOCAL, 8140 bytes)
2018-08-09 20:18:47 INFO  TaskSetManager:54 - Lost task 0.1 in stage 0.0 (TID 2) on 172.17.0.6, executor 2: org.apache.spark.SparkException (Task failed while writing rows.) [duplicate 1]
2018-08-09 20:18:47 INFO  TaskSetManager:54 - Starting task 0.2 in stage 0.0 (TID 3, 172.17.0.6, executor 2, partition 0, PROCESS_LOCAL, 8140 bytes)
2018-08-09 20:18:47 INFO  TaskSetManager:54 - Lost task 1.0 in stage 0.0 (TID 1) on 172.17.0.5, executor 1: org.apache.spark.SparkException (Task failed while writing rows.) [duplicate 2]
2018-08-09 20:18:47 INFO  TaskSetManager:54 - Starting task 1.1 in stage 0.0 (TID 4, 172.17.0.5, executor 1, partition 1, PROCESS_LOCAL, 8205 bytes)
2018-08-09 20:18:48 INFO  TaskSetManager:54 - Lost task 0.2 in stage 0.0 (TID 3) on 172.17.0.6, executor 2: org.apache.spark.SparkException (Task failed while writing rows.) [duplicate 3]
2018-08-09 20:18:48 INFO  TaskSetManager:54 - Starting task 0.3 in stage 0.0 (TID 5, 172.17.0.6, executor 2, partition 0, PROCESS_LOCAL, 8140 bytes)
2018-08-09 20:18:48 INFO  TaskSetManager:54 - Lost task 1.1 in stage 0.0 (TID 4) on 172.17.0.5, executor 1: org.apache.spark.SparkException (Task failed while writing rows.) [duplicate 4]
2018-08-09 20:18:48 INFO  TaskSetManager:54 - Starting task 1.2 in stage 0.0 (TID 6, 172.17.0.5, executor 1, partition 1, PROCESS_LOCAL, 8205 bytes)
2018-08-09 20:18:48 INFO  TaskSetManager:54 - Lost task 1.2 in stage 0.0 (TID 6) on 172.17.0.5, executor 1: org.apache.spark.SparkException (Task failed while writing rows.) [duplicate 5]
2018-08-09 20:18:48 INFO  TaskSetManager:54 - Starting task 1.3 in stage 0.0 (TID 7, 172.17.0.5, executor 1, partition 1, PROCESS_LOCAL, 8205 bytes)
2018-08-09 20:18:48 INFO  TaskSetManager:54 - Lost task 0.3 in stage 0.0 (TID 5) on 172.17.0.6, executor 2: org.apache.spark.SparkException (Task failed while writing rows.) [duplicate 6]
2018-08-09 20:18:48 ERROR TaskSetManager:70 - Task 0 in stage 0.0 failed 4 times; aborting job
2018-08-09 20:18:48 INFO  TaskSchedulerImpl:54 - Cancelling stage 0
2018-08-09 20:18:48 INFO  TaskSchedulerImpl:54 - Stage 0 was cancelled
2018-08-09 20:18:48 INFO  DAGScheduler:54 - ResultStage 0 (parquet at App.java:75) failed in 3.923 s due to Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 5, 172.17.0.6, executor 2): org.apache.spark.SparkException: Task failed while writing rows.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:254)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:168)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:367)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: saving output plants.parquet/part-00000-8e5efd57-0e4a-4bed-95f9-2178e9b9dedd-c000-attempt_20180809201848_0000_m_000000_3.snappy.parquet com.ibm.stocator.thirdparty.cos.AmazonClientException: Unable to complete transfer: Could not initialize class sun.security.ssl.SSLSessionImpl
    at com.ibm.stocator.fs.cos.COSOutputStream.close(COSOutputStream.java:173)
    at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
    at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
    at org.apache.parquet.hadoop.util.HadoopPositionOutputStream.close(HadoopPositionOutputStream.java:64)
    at org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:685)
    at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:122)
    at org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:165)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetOutputWriter.scala:42)
    at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:57)
    at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:74)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:244)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:239)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1369)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:245)
    ... 8 more

Driver stacktrace:
2018-08-09 20:18:48 INFO  DAGScheduler:54 - Job 0 failed: parquet at App.java:75, took 3.973086 s
2018-08-09 20:18:48 ERROR FileFormatWriter:91 - Aborting job null.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 5, 172.17.0.6, executor 2): org.apache.spark.SparkException: Task failed while writing rows.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:254)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:168)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:367)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: saving output plants.parquet/part-00000-8e5efd57-0e4a-4bed-95f9-2178e9b9dedd-c000-attempt_20180809201848_0000_m_000000_3.snappy.parquet com.ibm.stocator.thirdparty.cos.AmazonClientException: Unable to complete transfer: Could not initialize class sun.security.ssl.SSLSessionImpl
    at com.ibm.stocator.fs.cos.COSOutputStream.close(COSOutputStream.java:173)
    at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
    at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
    at org.apache.parquet.hadoop.util.HadoopPositionOutputStream.close(HadoopPositionOutputStream.java:64)
    at org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:685)
    at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:122)
    at org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:165)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetOutputWriter.scala:42)
    at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:57)
    at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:74)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:244)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:239)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1369)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:245)
    ... 8 more

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1607)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1595)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1594)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1594)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1828)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1777)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1766)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:166)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:154)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:662)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:662)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:662)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225)
    at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:554)
    at com.ibm.mpw.App.main(App.java:75)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:846)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:194)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:921)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:932)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.spark.SparkException: Task failed while writing rows.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:254)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:168)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:367)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: saving output plants.parquet/part-00000-8e5efd57-0e4a-4bed-95f9-2178e9b9dedd-c000-attempt_20180809201848_0000_m_000000_3.snappy.parquet com.ibm.stocator.thirdparty.cos.AmazonClientException: Unable to complete transfer: Could not initialize class sun.security.ssl.SSLSessionImpl
    at com.ibm.stocator.fs.cos.COSOutputStream.close(COSOutputStream.java:173)
    at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
    at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
    at org.apache.parquet.hadoop.util.HadoopPositionOutputStream.close(HadoopPositionOutputStream.java:64)
    at org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:685)
    at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:122)
    at org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:165)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetOutputWriter.scala:42)
    at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:57)
    at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:74)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:244)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:239)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1369)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:245)
    ... 8 more
2018-08-09 20:18:48 WARN  TaskSetManager:66 - Lost task 1.3 in stage 0.0 (TID 7, 172.17.0.5, executor 1): TaskKilled (Stage cancelled)
2018-08-09 20:18:48 INFO  TaskSchedulerImpl:54 - Removed TaskSet 0.0, whose tasks have all completed, from pool 
2018-08-09 20:18:49 WARN  COSAPIClient:532 - Not Found (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request ID: 97de1087-4d62-4b33-a118-e627aa5943b7)
Error: Job aborted.

Writing the same data to COS json file works fine:

rows.write().mode(SaveMode.Overwrite).json(path);
gilv commented 6 years ago

@dkazakevich thanks. A bit strange, are you sure the same code works for CSV and fails for Parquet? The exception in the log is "Could not initialize class sun.security.ssl.SSLSessionImpl", which is related SSL and has nothing to do to Stocator. What stocator version (and a branch) you are using?

gilv commented 6 years ago

@dkazakevich a short update; I tested your code on Spark 2.3 and the same code works perfectly for me. So i need more details ( see my previous msg ) in order to proceed with it

dkazakevich commented 6 years ago

@gilv Thank you for response. It works for JSON, but failed for parquet. I haven’t check fo CSV, will try it tomorrow. I tried 1.0.21-ibm-sdk, 1.0.22-ibm-sdk and 1.0.23-Snapshot-ibm-sdk branches. Also this works for spark client/local mode, but failed for claster mode with master and executor(s). I uses local minikube and bluemix k8s clusters to submit this spark job.

gilv commented 6 years ago

@dkazakevich I want to be sure i understand correctly. Is the following correct?

Do you use exactly the same code and the same path when access JSON and Parquet? SSL issues can't be related file types, so i try to figure out what else is different in your code

bahdzevich commented 6 years ago

@gilv, yes, you are correct. I have the same behavior:

gilv commented 6 years ago

@dkazakevich @bahdzevich thanks. what JDK version you are using? I also wonder, what is the value of "fs.cos.service.endpoint" ?

dkazakevich commented 6 years ago

@gilv We are using JDK 1.8.0_172 fs.cos.service.endpoint=s3-api.dal-us-geo.objectstorage.softlayer.net We are using the same code and the same COS path for parquet and json. Just updated rows.write().mode(SaveMode.Overwrite).parquet(path) to rows.write().mode(SaveMode.Overwrite).json(path). It also looks strange for us and don't know what the problem source.

paul-carron commented 6 years ago

@dkazakevich have you installed or do you need to install a cert on Kubernetes?

dkazakevich commented 6 years ago

@paul-carron I haven't install certs on minikube and blumix k8s cluster. Also don't know is need cert to put data into COS. I only put provided k8s cluster config yaml file with cluster certificate-authority .pem file into local ~/.kube directory. Also created spark k8s serviceaccount, clusterrolebinding and uses it to run spark job as described here: https://spark.apache.org/docs/2.3.0/running-on-kubernetes.html#rbac

gilv commented 6 years ago

@dkazakevich can you read some existing Parquet file from COS on this cluster?

dkazakevich commented 6 years ago

@gilv reading Parquet file from COS and writing it back into COS works fine.

gilv commented 6 years ago

@dkazakevich so all is working, except when you create 2 records size Parquet file in Spark and write it into COS? This sounds a bit strange.. and it fails with SSL.. I wonder if you can experiment with some other ways to create Parquet files in Spark. Try to use Scala shell and write something like

` import spark.implicits._

val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")

squaresDF.write.format(“parquet”).save("cos://yourbucket.myCOS/data.parquet") `

Will this work?

dkazakevich commented 6 years ago

@gilv Initially we faced with the problem in process of reading data from DB2 and writing into COS. The 2 records size example is a simplified version that reproduce the same problem like we have with DB2 source. I think that the scala shell example runs spark job in local/client mode that executes jobs using one master (one node). As I described above we also don't have problems in spark local/client mode. The problem accrues for spark cluster mode that runs job using master and one or some executor(s) (some nodes).

gilv commented 6 years ago

@dkazakevich you can connect spark-shell to the exiting cluster, by using --master spark://master_host:master_port

dkazakevich commented 6 years ago

@gilv Because we don't have k8s spark cluster with running master and executors (that necessary for spark-shell), and uses spark-submit tool that automatically creates master and indicated number of executors for a spark job (feature of spark 2.3 for k8s), is it ok to create a scala application, generate a .jar file for the spark-submit and run it for the above experiment?

gilv commented 6 years ago

@dkazakevich I think it's the same to create jar with scala and use spark-submit

dkazakevich commented 6 years ago

@gilv Tried the experiment and got the same exception:

object App {
  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder.appName("COS spark").getOrCreate()
    import sparkSession.sqlContext.implicits._

    sparkSession.sparkContext.hadoopConfiguration.set("fs.stocator.scheme.list", "cos")
    sparkSession.sparkContext.hadoopConfiguration.set("fs.cos.impl", "com.ibm.stocator.fs.ObjectStoreFileSystem")
    sparkSession.sparkContext.hadoopConfiguration.set("fs.stocator.cos.impl", "com.ibm.stocator.fs.cos.COSAPIClient")
    sparkSession.sparkContext.hadoopConfiguration.set("fs.stocator.cos.scheme", "cos")
    sparkSession.sparkContext.hadoopConfiguration.set("fs.cos.service.iam.api.key", "***")
    sparkSession.sparkContext.hadoopConfiguration.set("fs.cos.service.iam.service.id", "***")
    sparkSession.sparkContext.hadoopConfiguration.set("fs.cos.service.endpoint", "s3-api.us-geo.objectstorage.softlayer.net")

    val squaresDF = sparkSession.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
    squaresDF.write.format("parquet").save("cos://mpw-plants.service/data.parquet")
  }
}

Also FYI: I'm not sure that it could be the problem reason, but I tried to load data from some DB2 databases and write parquet into COS and found out that:

gilv commented 6 years ago

@dkazakevich if you can't write a simple Parquet with this Scala example, then it's not related DB2 i guess.. and the exception you getting is related an SSL connection.. but you can read from COS, write JSON without hitting SSL issues..

gilv commented 6 years ago

@dkazakevich please open also a ticket on IBM Bluemix Support. You can link them this issue that you opened against Stocator. You should see support in the console.bluemix.net and create a ticket

paul-carron commented 6 years ago

@dkazakevich At the don't believe this is be an issue with the Java SDK as I’m unable to recreate it in a stand alone or cluster environment. My hunch at the minute is that its some sort of cert issue between Spark and Kubernetes although you haven’t installed certs on their minicube cluster. Unfortunately I’m not familiar with Kubernetes so don’t know what might be required. It might be worth having somebody with Kubernetes knowledge look at this.