intel-analytics / ipex-llm

Accelerate local LLM inference and finetuning (LLaMA, Mistral, ChatGLM, Qwen, Baichuan, Mixtral, Gemma, Phi, MiniCPM, etc.) on Intel XPU (e.g., local PC with iGPU and NPU, discrete GPU such as Arc, Flex and Max); seamlessly integrate with llama.cpp, Ollama, HuggingFace, LangChain, LlamaIndex, GraphRAG, DeepSpeed, vLLM, FastChat, Axolotl, etc.
Apache License 2.0
6.51k stars 1.24k forks source link

[PPML] HDFS read csv error with E2E encrypt IO under specific partition number #6232

Open CharleneHu-42 opened 1 year ago

CharleneHu-42 commented 1 year ago

After encrypt data with EncryptWithRepartition and set partition number as 371 (data block number) and upload to HDFS, and run gbt train, there is exception when read csv:

Job 2 failed: csv at EncryptedDataFrameReader.scala:44, took 27.318223 s
...
java.lang.ArrayIndexOutOfBoundsException: 65536
        at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:222)
        at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
        at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:186)
        at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:37)
        at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:69)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:511)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at scala.collection.Iterator.foreach(Iterator.scala:941)
        at scala.collection.Iterator.foreach$(Iterator.scala:941)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
        at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162)
        at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160)
        at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429)
        at scala.collection.TraversableOnce.aggregate(TraversableOnce.scala:219)
        at scala.collection.TraversableOnce.aggregate$(TraversableOnce.scala:219)
        at scala.collection.AbstractIterator.aggregate(Iterator.scala:1429)
        at org.apache.spark.rdd.RDD.$anonfun$aggregate$2(RDD.scala:1207)
        at org.apache.spark.SparkContext.$anonfun$runJob$6(SparkContext.scala:2290)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206)
        at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079)
        at scala.Option.foreach(Option.scala:407)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2291)
        at org.apache.spark.rdd.RDD.$anonfun$aggregate$1(RDD.scala:1209)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
        at org.apache.spark.rdd.RDD.aggregate(RDD.scala:1202)
        at org.apache.spark.sql.catalyst.csv.CSVInferSchema.infer(CSVInferSchema.scala:59)
        at org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.$anonfun$inferFromDataset$4(CSVDataSource.scala:138)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
        at org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.inferFromDataset(CSVDataSource.scala:138)
        at org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.infer(CSVDataSource.scala:113)
        at org.apache.spark.sql.execution.datasources.csv.CSVDataSource.inferSchema(CSVDataSource.scala:65)
        at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.inferSchema(CSVFileFormat.scala:62)
        at org.apache.spark.sql.execution.datasources.DataSource.$anonfun$getOrInferFileFormatSchema$11(DataSource.scala:209)
        at scala.Option.orElse(Option.scala:447)
        at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:206)
        at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:419)
        at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:325)
        at org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:307)
        at scala.Option.getOrElse(Option.scala:189)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:307)
        at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:795)
        at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:595)
        at com.intel.analytics.bigdl.ppml.crypto.dataframe.EncryptedDataFrameReader.csv(EncryptedDataFrameReader.scala:44)
        at com.intel.analytics.bigdl.ppml.examples.gbtClassifierTrainingExampleOnCriteoClickLogsDataset$.main(gbtClassifierTrainingExampleOnCriteoClickLogsDataset.scala:121)
gc-fu commented 1 year ago

Sorry, I mis-typed in the description of my pr. Please ignore the reference..