CODAIT / stocator

Stocator is high performing connector to object storage for Apache Spark, achieving performance by leveraging object storage semantics.
Apache License 2.0
113 stars 72 forks source link

Fail to read parquet data #185

Closed minbaev closed 6 years ago

minbaev commented 6 years ago

We have data stored on COS serialized into parquet. We are reading it hourly using the following method:

def read_parquet_func_with_input(y,m,d,h):
    CRaccesslogTmp_df = spark.read.option("mergeSchema", "true").parquet(BucketAccessName).\
    select("Year","Month","Day","Hour","Minute","Location","DC","request_type","status","container_name","storage_account_id","storage_location_id","remote_user","interface_type")
    CRaccesslog_df = CRaccesslogTmp_df.filter(CRaccesslogTmp_df.Year==y).filter(CRaccesslogTmp_df.Month==m).filter(CRaccesslogTmp_df.Day==d).filter(CRaccesslogTmp_df.Hour==h)

    return CRaccesslog_df

From times to times it is throwing the following error

Caused by: java.io.IOException: Could not read footer for file: FileStatus{path=cos://cos-analytics-parquet.appSpark/AccessLogs/Location=us-cr/DC=sjc03/gz_log_write_time=2018-03-31-01/Year=2018/Month=3/Day=31/Hour=0/part-00001-4711ed6c-8305-420b-8ffc-db611dee6327.snappy.parquet-attempt_20180401092333_0006_m_000001_0; isDirectory=false; length=17133527; replication=0; blocksize=0; modification_time=0; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:515)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:502)
    at scala.collection.parallel.AugmentedIterableIterator$class.flatmap2combiner(RemainsIterator.scala:132)
    at scala.collection.parallel.immutable.ParVector$ParVectorIterator.flatmap2combiner(ParVector.scala:62)
    at scala.collection.parallel.ParIterableLike$FlatMap.leaf(ParIterableLike.scala:1072)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
    at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
    at scala.collection.parallel.ParIterableLike$FlatMap.tryLeaf(ParIterableLike.scala:1068)
    at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:169)
    at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:443)
    at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:149)
    at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
    at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinTask.doJoin(ForkJoinTask.java:341)
    at scala.concurrent.forkjoin.ForkJoinTask.join(ForkJoinTask.java:673)
    at scala.collection.parallel.ForkJoinTasks$WrappedTask$class.sync(Tasks.scala:378)
    at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.sync(Tasks.scala:443)
    at scala.collection.parallel.ForkJoinTasks$class.executeAndWaitResult(Tasks.scala:426)
    at scala.collection.parallel.ForkJoinTaskSupport.executeAndWaitResult(TaskSupport.scala:56)
    at scala.collection.parallel.ParIterableLike$ResultMapping.leaf(ParIterableLike.scala:958)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
    at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
    at scala.collection.parallel.ParIterableLike$ResultMapping.tryLeaf(ParIterableLike.scala:953)
    at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
    at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
    at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: com.amazonaws.shade.SdkClientException: Data read has a different length than the expected: dataLength=16384; expectedLength=17794; includeSkipped=true; in.getClass()=class com.amazonaws.shade.services.s3.AmazonS3Client$2; markedSupported=false; marked=0; resetSinceLastMarked=false; markCount=0; resetCount=0
    at com.amazonaws.shade.util.LengthCheckInputStream.checkLength(LengthCheckInputStream.java:151)
    at com.amazonaws.shade.util.LengthCheckInputStream.read(LengthCheckInputStream.java:93)
    at com.amazonaws.shade.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:66)
    at com.ibm.stocator.fs.cos.COSInputStream.closeStream(COSInputStream.java:399)
    at com.ibm.stocator.fs.cos.COSInputStream.close(COSInputStream.java:363)
    at java.io.FilterInputStream.close(FilterInputStream.java:181)
    at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:433)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:508)
    ... 34 more

Could you please have a look at the possible solution for the issue? Thank you

gilv commented 6 years ago

@minbaev thank you for reporting this. If i understand correctly then it fails on random basis, sometimes working and sometimes not, right? Do you know what Stocator version you are using?

From the exception it looks like it comes from Amazon SDK that received data of length it didn't expected. I suggest you to look over web as well and see if users reported similar issues with Amazon SDK. Did you tried the same with Stocator branch based on COS SDK? I wonder if the same issue exists there

minbaev commented 6 years ago

@gilv thank you for your response. the basis for failure seems random, that's right. As we are running spark jobs on the Analytics Engine, We are not providing Stocator jar directly, we are using the version IBM Analytics Engine has configured.

The actual data length is always smaller than expected so I tend to believe it might be that input stream

Researching online suggests to specify the file I want to read instead of having input file stream, so we don't have inconsistency in expected and actual data length.

https://stackoverflow.com/questions/27798825/amazonclientexception-data-read-has-a-different-length-than-the-expected

But I'm not sure how this can be applied to our use case (with pyspark Dataframe API)