AbsaOSS / cobrix

A COBOL parser and Mainframe/EBCDIC data source for Apache Spark
Apache License 2.0
138 stars 78 forks source link

java.lang.AssertionError: assertion failed: Byte array does not have correct length #702

Closed devoplib closed 1 month ago

devoplib commented 3 months ago

Background [Optional]

A clear explanation of the reason for raising the question. This gives us a better understanding of your use cases and how we might accommodate them.

Question

A clear and concise inquiry

First of all Thank you very much for all your work and support. We are using COBRIX to convert mainframe files from EBCDIC to ASCII and it is working perfectly fine in Databricks. To increase the throughput and process the data with more speed we are running the same job in parallel i.e. processing multiple files using the same program by passing different files to each one of them instead of sending it to one task. Say earlier we have fileload task to load all the 42 files we are running the same cobrix conversion module 6 times with 7 files each. We are getting following "java.lang.AssertionError: assertion failed: Byte array does not have correct length" when we are wrting the data from dataframe to databricks UC table i.e. saveastable or when we do transformations on the dataframe like df.count() or df.rdd.isEmpty etc.

Note: When failed task is resubmitted it completes and it seems some kind of memory contention on Databricks driver since when run as a single task it never fails.

df.write.mode("overWrite").format("delta").saveAsTable(f"{table_name}")

looking for any advise on where to look and debug the error. Any help is appreciated.

Please find the complete error below. Py4JJavaError: An error occurred while calling o2411.saveAsTable. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 22 in stage 3238.0 failed 4 times, most recent failure: Lost task 22.3 in stage 3238.0 (TID 50850) (100.126.48.51 executor 27): java.lang.AssertionError: assertion failed: Byte array does not have correct length at scala.Predef$.assert(Predef.scala:223) at org.apache.spark.SparkContext.$anonfun$binaryRecords$2(SparkContext.scala:1603) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage24.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:32) at com.google.common.collect.Iterators$PeekingImpl.hasNext(Iterators.java:1139) at com.databricks.photon.NativeRowBatchIterator.hasNext(NativeRowBatchIterator.java:44) at 0xa37b947 .HasNext(external/workspace_spark_3_5/photon/jni-wrappers/jni-row-batch-iterator.cc:50) at com.databricks.photon.JniApiImpl.hasNext(Native Method) at com.databricks.photon.JniApi.hasNext(JniApi.scala) at com.databricks.photon.JniExecNode.hasNext(JniExecNode.java:76) at com.databricks.photon.BasePhotonResultHandler$$anon$1.hasNext(PhotonExec.scala:862) at com.databricks.photon.PhotonBasicEvaluatorFactory$PhotonBasicEvaluator$$anon$1.$anonfun$hasNext$1(PhotonBasicEvaluatorFactory.scala:211) at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) at com.databricks.photon.PhotonResultHandler.timeit(PhotonResultHandler.scala:30) at com.databricks.photon.PhotonResultHandler.timeit$(PhotonResultHandler.scala:28) at com.databricks.photon.BasePhotonResultHandler.timeit(PhotonExec.scala:849) at com.databricks.photon.PhotonBasicEvaluatorFactory$PhotonBasicEvaluator$$anon$1.hasNext(PhotonBasicEvaluatorFactory.scala:211) at com.databricks.photon.CloseableIterator$$anon$10.hasNext(CloseableIterator.scala:211) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage46.columnartorow_nextBatch_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage46.hashAgg_doAggregateWithKeys_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage46.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:195) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:56) at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$3(ShuffleMapTask.scala:92) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$1(ShuffleMapTask.scala:87) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:58) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:39) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:201) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:186) at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:151) at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:45) at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:103) at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:108) at scala.util.Using$.resource(Using.scala:269) at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:107) at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:145) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:958) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:105) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:961) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:853) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750)

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3908) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3830) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3817) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3817) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1695) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1680) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1680) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:4154) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:4066) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:4054) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:54) Caused by: java.lang.AssertionError: assertion failed: Byte array does not have correct length at scala.Predef$.assert(Predef.scala:223) at org.apache.spark.SparkContext.$anonfun$binaryRecords$2(SparkContext.scala:1603) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage24.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:32) at com.google.common.collect.Iterators$PeekingImpl.hasNext(Iterators.java:1139) at com.databricks.photon.NativeRowBatchIterator.hasNext(NativeRowBatchIterator.java:44) at 0xa37b947 .HasNext(external/workspace_spark_3_5/photon/jni-wrappers/jni-row-batch-iterator.cc:50) at com.databricks.photon.JniApiImpl.hasNext(Native Method) at com.databricks.photon.JniApi.hasNext(JniApi.scala) at com.databricks.photon.JniExecNode.hasNext(JniExecNode.java:76) at com.databricks.photon.BasePhotonResultHandler$$anon$1.hasNext(PhotonExec.scala:862) at com.databricks.photon.PhotonBasicEvaluatorFactory$PhotonBasicEvaluator$$anon$1.$anonfun$hasNext$1(PhotonBasicEvaluatorFactory.scala:211) at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) at com.databricks.photon.PhotonResultHandler.timeit(PhotonResultHandler.scala:30) at com.databricks.photon.PhotonResultHandler.timeit$(PhotonResultHandler.scala:28) at com.databricks.photon.BasePhotonResultHandler.timeit(PhotonExec.scala:849) at com.databricks.photon.PhotonBasicEvaluatorFactory$PhotonBasicEvaluator$$anon$1.hasNext(PhotonBasicEvaluatorFactory.scala:211) at com.databricks.photon.CloseableIterator$$anon$10.hasNext(CloseableIterator.scala:211) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage46.columnartorow_nextBatch_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage46.hashAgg_doAggregateWithKeys_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage46.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:195) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:56) at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$3(ShuffleMapTask.scala:92) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$1(ShuffleMapTask.scala:87) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:58) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:39) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:201) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:186) at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:151) at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:45) at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:103) at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:108) at scala.util.Using$.resource(Using.scala:269) at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:107) at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:145) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:958) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:105) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:961) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:853) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750)

yruslan commented 3 months ago

Hi @devoplib ,

Thanks for your interest in Cobrix!

I've looked at the stack trace. It seems like the issue is with the reshuffling or the writer since there are no za.co.absa.cobrix classes in the stack trace.

Could you elaborate on how do you run tasks in parallel? Do you execute multiple threads within executors, or do you run multiple Spark actions in a multi-threaded fashion? Could you share an example code?

devoplib commented 3 months ago

Initially we created one task and that task runs a note book that reads the EBCDIC files and converts to ASCII and creates dataframe. We save the dataframe onto Databricks unity catalog. We have many files and to increase the output we created multiple tasks and pass the files to each one of them and each task runs the same notebook. When we run the single task with all the files we don't see any issues but when we are running the notebook in multiple tasks it is giving the error. The difference between the two runs is in the first one we send all the files to one task in the second one we split the files and pass it to multiple tasks. Say we have 10 files we run 5 tasks and send 2 files randomly to each of the 5 tasks. When we run multiple tasks it is failing while doing transformations on the Dataframe for example df.count(). Please let me know If you hav any questions. Notebook mainly has df.write.mode("overWrite").format("delta").saveAsTable(f"{table_name}") and we loaded the cobrix bundle as library in the cluster.

yruslan commented 3 months ago

Actually, Cobrix already runs conversion in parallel. You don't need to process each file separatey. You can just use '*' the path name:

spark.read.format("cobol").load("s3://bucket/path/path/*")

or just

spark.read.format("cobol").load("s3://bucket/path/path")
devoplib commented 3 months ago

EBCDIC to ASCII conversion process is running in parallel for each table since layout is same like you mentioned above. We have couple of tables that needs to be processed and the schema is different so we need to run one by one and they run parallel. Parallel run that I mentioned is loading different tables using the same python notebook i.e. multiple tasks and only difference is input files and for each table we process mutiple files and they run in parallel. Please let me know

yruslan commented 3 months ago

I see, makes sense. Thanks for the explanation. From what I can see, you are processing files properly, and there should not be any issues. I suspect this is related to the environment or an issue on the writer's side, not spark-cobol (the reader size). The stack trace does not contain any call chain that leads to the Cobrix codebase.

devoplib commented 3 months ago

Thank you very much. When we retries the task it completes or it fails couple of times and completes finally. Databricks is saying since we are using Cobrix and the issue is with Cobrix and I know it is not Cobrix module but want to make sure. I will check with them again and really appreciate your quick response.

pinakigit commented 2 months ago

Hi Devoolib, Let me know the solution if you hear back anything from Databricks. We have similar kind of issues and we think it’s Databricks but it doesn’t throw any proper error.

devoplib commented 2 months ago

Definitely I will share If we find anything related to this error. We have been working [sharing logs] with Databricks team and they are saying that it is due to the external module that we are using[Cobrix] but we mentioned that it runs fine after couple of reruns. We will continue to work with Databricks team and get additional resources to pursue the root cause of the issue.

vinodkc commented 2 months ago

We are getting following "java.lang.AssertionError: assertion failed: Byte array does not have correct length" when we are wrting the data from dataframe to databricks UC table i.e. saveastable or when we do transformations on the dataframe like df.count() or df.rdd.isEmpty etc.

If there is the same assertion error on df.count(), the issue may be on the read flow. The same issue during saveastable/write operation is a red herring as the write follows the read.

@yruslan, during the read flow of the shuffle map task execution in the executor, assertion failure is from sqlContext.sparkContext.binaryRecords object created here

I see the recordSize passed from CobolScannersto sqlContext.sparkContext.binaryRecords is getting asserted in this Spark code

ie .

val br = newAPIHadoopFile[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](path,
      classOf[FixedLengthBinaryInputFormat],
      classOf[LongWritable],
      classOf[BytesWritable],
      conf = conf)
    br.map { case (k, v) =>
      val bytes = v.copyBytes()
      assert(bytes.length == recordLength, "Byte array does not have correct length")
      bytes
    }

Is there any chance that val recordSize = reader.getRecordSize gives a different value than v.copyBytes().legth?

@devoplib devoplib To isolate the issue, please disable photon and simulate the issue.

@pinakigit Are you also following this approach and seeing the issue?

To increase the throughput and process the data with more speed we are running the same job in parallel i.e. processing multiple files using the same program by passing different files to each one of them instead of sending it to one task.

pinakigit commented 2 months ago

We have tried wit h and without photon as suggested by databricks and it doesn't help. We have seen that disabling autoscaling we haven't seen the issue. We are observing for couple of more days. We are runnig multiple pipelines for hundreds of files in parallel.

vinodkc commented 2 months ago

@pinakigit , I didn't mean that the photon is affecting this. Avoiding photon is to isolate the issue towards the executor JVM and CobolScanner path. Could you please share the exception stack trace from the non-Photon executor?

devoplib commented 2 months ago

Thank you very much for looking into it.

Note: In order to complete the job we are retrying the job multiple times. Since we have many streams running parallel they are getting completed and after couple of retries they are all getting done. The issue is it taking resources and ultimately costing more $s. If it runs with out failure it will run faster and help us overall.

Please note that I tried with the following 2 options and it is still failing from my end with the same error. 1.) Disable Auto Scaling with Photon Accelerator 2.) Disable Auto Scaling and Disable Photon Accelerator

Here is the error: Py4JJavaError: An error occurred while calling o648.count. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 351.0 failed 4 times, most recent failure: Lost task 0.3 in stage 351.0 (TID 350) (100.126.10.219 executor 3): java.lang.AssertionError: assertion failed: Byte array does not have correct length at scala.Predef$.assert(Predef.scala:223) at org.apache.spark.SparkContext.$anonfun$binaryRecords$2(SparkContext.scala:1614) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.hashAgg_doAggregateWithKeys_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:151) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:56) at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$3(ShuffleMapTask.scala:92) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$1(ShuffleMapTask.scala:87) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:58) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:39) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:201) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:186) at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:151) at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:45) at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:104) at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:109) at scala.util.Using$.resource(Using.scala:269) at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:108) at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:145) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:958) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:105) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:961) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:853) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750)

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3908) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3830) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3817) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3817) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1695) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1680) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1680) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:4154) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:4066) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:4054) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:54) Caused by: java.lang.AssertionError: assertion failed: Byte array does not have correct length at scala.Predef$.assert(Predef.scala:223) at org.apache.spark.SparkContext.$anonfun$binaryRecords$2(SparkContext.scala:1614) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.hashAgg_doAggregateWithKeys_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:151) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:56) at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$3(ShuffleMapTask.scala:92) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$1(ShuffleMapTask.scala:87) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:58) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:39) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:201) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:186) at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:151) at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:45) at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:104) at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:109) at scala.util.Using$.resource(Using.scala:269) at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:108) at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:145) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:958) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:105) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:961) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:853) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750)

vinodkcin commented 1 month ago

@devoplib , Thanks for the stack trace. Are you using variable record length mainframe files? If so, could you please share how the dataframe was created ? eg :

val df = spark
      .read
      .format("cobol")                                     
      .option("copybook_contents", copybook)               
      .option("record_format", "V")                        
      //.option("copybook", "data/companies_copybook.cpy") 
      //.option("generate_record_id", true)                
      .option("schema_retention_policy", "collapse_root")  
      .option("segment_field", "SEGMENT_ID")               
      .option("segment_id_level0", "C")                    
      .option("segment_id_prefix", "ID")                   
      .load("../example_data/companies_data")  
devoplib commented 1 month ago

Please find below the code that creates the dataframe

code_page_name = 'cp037_extended' cobolDataFrame = spark .read.format("za.co.absa.cobrix.spark.cobol.source") .option("ebcdic_code_page", code_page_name) .option("schema_retention_policy", "collapse_root") .option("string_trimming_policy", "none") .option("pedantic", "true") .option("improved_null_detection", "false") .option("copybook", tmp_copybook_file) .load(ebcdic_table_data_file)

We do count and rdd.isEmpty() etc.. as part of validation.

Please let me know If you need additional information

vinodkc commented 1 month ago

@devoplib , @pinakigit , Thanks for sharing the details. It seems you're encountering a race condition when multiple Spark-cobrix connectors share the same Spark driver instance.

I locally tested the fix and then raised a PR https://github.com/AbsaOSS/cobrix/pull/714 @yruslan, Could you please review the PR ?

devoplib commented 1 month ago

Excellent. Thank you very much @Vinodkc. @yruslan: Please advise. Thank you.

yruslan commented 1 month ago

@vinodkc, Thank you very much for the fix! 🚀

yruslan commented 1 month ago

Merged. @devoplib, @pinakigit, please, text. Will release a new version tomorrow.

pinakigit commented 1 month ago

Sure. We are testing and will let you know the results after the batch is complete.

devoplib commented 1 month ago

Our testing was successfull. @pinakigit : How did your batch testing go? Thank you

pinakigit commented 1 month ago

Our testing is successful for 2 day's batch run. Seems the fix is working. Let us know when the new version is available

devoplib commented 1 month ago

@pinakigit: Thank you very much for the testing. @yruslan: Please let us know when the new version with this is available. Thank you very much to all.

yruslan commented 1 month ago

Cobrox 2.7.7 is released to Maven. It should be visible in a couple of hours. Thanks a lot again for the fix, @vinodkc, and @pinakigit for testing!