yahoo / CaffeOnSpark

Distributed deep learning on Hadoop and Spark clusters.
Apache License 2.0
1.27k stars 358 forks source link

java.lang.RuntimeException: Error while encoding: java.lang.ArrayIndexOutOfBoundsException: 1 #305

Open yuanwuab opened 4 years ago

yuanwuab commented 4 years ago

I got an error when I was trying to join two datasets which are from postgres and csv file the error message is like this: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 14.0 failed 1 times, most recent failure: Lost task 0.0 in stage 14.0 (TID 14, localhost, executor driver): java.lang.RuntimeException: Error while encoding: java.lang.ArrayIndexOutOfBoundsException: 1 staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, targetString), StringType), true, false) AS targetString#205 staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, deviceName), StringType), true, false) AS deviceName#206 staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, alarmDetectionCode), StringType), true, false) AS alarmDetectionCode#207 at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:292) at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:593) at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:593) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ArrayIndexOutOfBoundsException: 1 at org.apache.spark.sql.catalyst.expressions.GenericRow.get(rows.scala:174) at org.apache.spark.sql.Row$class.isNullAt(Row.scala:191) at org.apache.spark.sql.catalyst.expressions.GenericRow.isNullAt(rows.scala:166) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.StaticInvoke_1$(generated.java:94) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(generated.java:36) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:289) ... 15 more

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1890) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1877) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:929) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:929) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:929) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2111) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2060) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2049) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:740) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2073) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2094) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2113) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365) at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383) at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544) at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544) at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363) at org.apache.spark.sql.Dataset.head(Dataset.scala:2544) at org.apache.spark.sql.Dataset.take(Dataset.scala:2758) at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254) at org.apache.spark.sql.Dataset.showString(Dataset.scala:291) at org.apache.spark.sql.Dataset.show(Dataset.scala:745) at org.apache.spark.sql.Dataset.show(Dataset.scala:704) at org.apache.spark.sql.Dataset.show(Dataset.scala:713) at jp.co.nec.necdas.commons.customize.service.dataset.ALMTriggerProcessLogic.process(ALMTriggerProcessLogic.java:295) at jp.co.nec.necdas.commons.customize.service.dataset.ALMTriggerProcessLogic.execute(ALMTriggerProcessLogic.java:179) at jp.co.nec.necdas.commons.customize.service.dataset.ALMTriggerProcessLogic.execute(ALMTriggerProcessLogic.java:1) at jp.co.nec.necdas.commons.service.StandardExecutorService.execute(StandardExecutorService.java:65) at jp.co.nec.necdas.commons.service.StandardExecutorServiceTest.testExecute(StandardExecutorServiceTest.java:26) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.springframework.test.context.junit4.statements.RunBeforeTestExecutionCallbacks.evaluate(RunBeforeTestExecutionCallbacks.java:73) at org.springframework.test.context.junit4.statements.RunAfterTestExecutionCallbacks.evaluate(RunAfterTestExecutionCallbacks.java:83) at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75) at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86) at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:251) at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:97) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61) at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:190) at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86) at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:538) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:760) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:460) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:206) Caused by: java.lang.RuntimeException: Error while encoding: java.lang.ArrayIndexOutOfBoundsException: 1 staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, targetString), StringType), true, false) AS targetString#205 staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, deviceName), StringType), true, false) AS deviceName#206 staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, alarmDetectionCode), StringType), true, false) AS alarmDetectionCode#207 at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:292) at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:593) at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:593) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ArrayIndexOutOfBoundsException: 1 at org.apache.spark.sql.catalyst.expressions.GenericRow.get(rows.scala:174) at org.apache.spark.sql.Row$class.isNullAt(Row.scala:191) at org.apache.spark.sql.catalyst.expressions.GenericRow.isNullAt(rows.scala:166) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.StaticInvoke_1$(generated.java:94) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(generated.java:36) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:289) ... 15 more

It looked like mismatch happened when spark application was joining two dataset with different schema , but I don't know how it happened.

My java code: Dataset result = null; result = deviceInfoDataset.join(searchInfo,deviceInfoDataset.col("deviceName").equalTo(searchInfo.col("deviceName"))); result.show();

Dataset schema: device +--------+----------+----------+ |ctgry_cd|deviceInfo|deviceName| +--------+----------+----------+

searchinfo

+------------+----------+------------------+ |targetString|deviceName|alarmDetectionCode| +------------+----------+------------------+