apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.25k stars 2.17k forks source link

Byte-buffer read unsupported by org.apache.hadoop.fs.BufferedFSInputStream across multiple iceberg versions on 14.3 LTS databricks, Spark 3.5 Scala 2.12 #10808

Open rphadke-trinity opened 2 months ago

rphadke-trinity commented 2 months ago

Apache Iceberg version

1.6.0 (latest release)

Query engine

Spark

Please describe the bug 🐞

Error: UnsupportedOperationException: Byte-buffer read unsupported by org.apache.hadoop.fs.BufferedFSInputStream org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3) (10.139.64.20 executor 1): java.lang.UnsupportedOperationException: Byte-buffer read unsupported by org.apache.hadoop.fs.BufferedFSInputStream

Using the following runtime configs

  1. Azure databricks 14.3 LTS runtime
  2. spark 3.5.0
  3. scala 2.12

Getting this same error when ran across multiple versions of iceberg, the following versions were tested for:

  1. 1.6.0
  2. 1.5.2
  3. 1.5.1
  4. 1.5.0
  5. 1.4.3

Willingness to contribute

Fokko commented 2 months ago

@rphadke-trinity Thanks for reporting this, do you also have a full stacktrace? That would help to localize the problem.

rphadke-trinity commented 2 months ago

Adding full stracktrace @Fokko

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 12.0 failed 4 times, most recent failure: Lost task 0.3 in stage 12.0 (TID 205) (10.139.64.20 executor 1): java.lang.UnsupportedOperationException: Byte-buffer read unsupported by org.apache.hadoop.fs.BufferedFSInputStream
    at org.apache.hadoop.fs.FSDataInputStream.read(FSDataInputStream.java:160)
    at com.databricks.spark.metrics.FSInputStreamWithMetrics.$anonfun$read$1(FileSystemWithMetrics.scala:77)
    at com.databricks.spark.metrics.FSInputStreamWithMetrics.withTimeAndBytesReadMetric(FileSystemWithMetrics.scala:67)
    at com.databricks.spark.metrics.FSInputStreamWithMetrics.read(FileSystemWithMetrics.scala:77)
    at org.apache.hadoop.fs.FSDataInputStream.read(FSDataInputStream.java:156)
    at org.apache.iceberg.shaded.org.apache.parquet.hadoop.util.H2SeekableInputStream$H2Reader.read(H2SeekableInputStream.java:82)
    at org.apache.iceberg.shaded.org.apache.parquet.hadoop.util.H2SeekableInputStream.readFully(H2SeekableInputStream.java:91)
    at org.apache.iceberg.shaded.org.apache.parquet.hadoop.util.H2SeekableInputStream.readFully(H2SeekableInputStream.java:76)
    at org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:584)
    at org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:799)
    at org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:666)
    at org.apache.iceberg.parquet.ReadConf.newReader(ReadConf.java:238)
    at org.apache.iceberg.parquet.ReadConf.<init>(ReadConf.java:81)
    at org.apache.iceberg.parquet.VectorizedParquetReader.init(VectorizedParquetReader.java:90)
    at org.apache.iceberg.parquet.VectorizedParquetReader.iterator(VectorizedParquetReader.java:99)
    at org.apache.iceberg.spark.source.BatchDataReader.open(BatchDataReader.java:109)
    at org.apache.iceberg.spark.source.BatchDataReader.open(BatchDataReader.java:41)
    at org.apache.iceberg.spark.source.BaseReader.next(BaseReader.java:143)
    at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:122)
    at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:160)
    at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1(DataSourceRDD.scala:64)
    at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1$adapted(DataSourceRDD.scala:64)
    at scala.Option.exists(Option.scala:376)
    at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:64)
    at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.advanceToNextIter(DataSourceRDD.scala:99)
    at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:64)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:32)
    at com.google.common.collect.Iterators$PeekingImpl.hasNext(Iterators.java:1139)
    at com.databricks.photon.NativeRowBatchIterator.hasNext(NativeRowBatchIterator.java:44)
    at 0xaa81a92 <photon>.HasNext(external/workspace_spark_3_5/photon/jni-wrappers/jni-row-batch-iterator.cc:50)
    at 0x5f51b8e <photon>.OpenImpl(external/workspace_spark_3_5/photon/exec-nodes/shuffle-sink-node.cc:171)
    at com.databricks.photon.JniApiImpl.open(Native Method)
    at com.databricks.photon.JniApi.open(JniApi.scala)
    at com.databricks.photon.JniExecNode.open(JniExecNode.java:71)
    at com.databricks.photon.PhotonPreShuffleResultHandler.$anonfun$getResult$1(PhotonExec.scala:825)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.photon.PhotonResultHandler.timeit(PhotonResultHandler.scala:30)
    at com.databricks.photon.PhotonResultHandler.timeit$(PhotonResultHandler.scala:28)
    at com.databricks.photon.PhotonPreShuffleResultHandler.timeit(PhotonExec.scala:818)
    at com.databricks.photon.PhotonPreShuffleResultHandler.getResult(PhotonExec.scala:825)
    at com.databricks.photon.PhotonBasicEvaluatorFactory$PhotonBasicEvaluator$$anon$1.hasNext(PhotonBasicEvaluatorFactory.scala:207)
    at com.databricks.photon.CloseableIterator$$anon$10.hasNext(CloseableIterator.scala:211)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at com.databricks.photon.MetadataOnlyShuffleWriter.write(MetadataOnlyShuffleWriter.scala:50)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:56)
    at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$3(ShuffleMapTask.scala:92)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$1(ShuffleMapTask.scala:87)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:58)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:39)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:201)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:186)
    at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:151)
    at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:129)
    at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:145)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:958)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:105)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:961)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:853)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3908)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3830)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3817)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3817)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1695)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1680)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1680)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:4154)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:4066)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:4054)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:54)
Caused by: java.lang.UnsupportedOperationException: Byte-buffer read unsupported by org.apache.hadoop.fs.BufferedFSInputStream
    at org.apache.hadoop.fs.FSDataInputStream.read(FSDataInputStream.java:160)
    at com.databricks.spark.metrics.FSInputStreamWithMetrics.$anonfun$read$1(FileSystemWithMetrics.scala:77)
    at com.databricks.spark.metrics.FSInputStreamWithMetrics.withTimeAndBytesReadMetric(FileSystemWithMetrics.scala:67)
    at com.databricks.spark.metrics.FSInputStreamWithMetrics.read(FileSystemWithMetrics.scala:77)
    at org.apache.hadoop.fs.FSDataInputStream.read(FSDataInputStream.java:156)
    at org.apache.iceberg.shaded.org.apache.parquet.hadoop.util.H2SeekableInputStream$H2Reader.read(H2SeekableInputStream.java:82)
    at org.apache.iceberg.shaded.org.apache.parquet.hadoop.util.H2SeekableInputStream.readFully(H2SeekableInputStream.java:91)
    at org.apache.iceberg.shaded.org.apache.parquet.hadoop.util.H2SeekableInputStream.readFully(H2SeekableInputStream.java:76)
    at org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:584)
    at org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:799)
    at org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:666)
    at org.apache.iceberg.parquet.ReadConf.newReader(ReadConf.java:238)
    at org.apache.iceberg.parquet.ReadConf.<init>(ReadConf.java:81)
    at org.apache.iceberg.parquet.VectorizedParquetReader.init(VectorizedParquetReader.java:90)
    at org.apache.iceberg.parquet.VectorizedParquetReader.iterator(VectorizedParquetReader.java:99)
    at org.apache.iceberg.spark.source.BatchDataReader.open(BatchDataReader.java:109)
    at org.apache.iceberg.spark.source.BatchDataReader.open(BatchDataReader.java:41)
    at org.apache.iceberg.spark.source.BaseReader.next(BaseReader.java:143)
    at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:122)
    at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:160)
    at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1(DataSourceRDD.scala:64)
    at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1$adapted(DataSourceRDD.scala:64)
    at scala.Option.exists(Option.scala:376)
    at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:64)
    at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.advanceToNextIter(DataSourceRDD.scala:99)
    at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:64)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:32)
    at com.google.common.collect.Iterators$PeekingImpl.hasNext(Iterators.java:1139)
    at com.databricks.photon.NativeRowBatchIterator.hasNext(NativeRowBatchIterator.java:44)
    at 0xaa81a92 <photon>.HasNext(external/workspace_spark_3_5/photon/jni-wrappers/jni-row-batch-iterator.cc:50)
    at 0x5f51b8e <photon>.OpenImpl(external/workspace_spark_3_5/photon/exec-nodes/shuffle-sink-node.cc:171)
    at com.databricks.photon.JniApiImpl.open(Native Method)
    at com.databricks.photon.JniApi.open(JniApi.scala)
    at com.databricks.photon.JniExecNode.open(JniExecNode.java:71)
    at com.databricks.photon.PhotonPreShuffleResultHandler.$anonfun$getResult$1(PhotonExec.scala:825)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.photon.PhotonResultHandler.timeit(PhotonResultHandler.scala:30)
    at com.databricks.photon.PhotonResultHandler.timeit$(PhotonResultHandler.scala:28)
    at com.databricks.photon.PhotonPreShuffleResultHandler.timeit(PhotonExec.scala:818)
    at com.databricks.photon.PhotonPreShuffleResultHandler.getResult(PhotonExec.scala:825)
    at com.databricks.photon.PhotonBasicEvaluatorFactory$PhotonBasicEvaluator$$anon$1.hasNext(PhotonBasicEvaluatorFactory.scala:207)
    at com.databricks.photon.CloseableIterator$$anon$10.hasNext(CloseableIterator.scala:211)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at com.databricks.photon.MetadataOnlyShuffleWriter.write(MetadataOnlyShuffleWriter.scala:50)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:56)
    at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$3(ShuffleMapTask.scala:92)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$1(ShuffleMapTask.scala:87)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:58)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:39)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:201)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:186)
    at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:151)
    at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:129)
    at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:145)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:958)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:105)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:961)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:853)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
rphadke-trinity commented 2 months ago

@Fokko I'm able to write to iceberg using 14.3 LTS (Spark 3.5, Scala 2.12, Iceberg 1.5.2) - but not read For the same iceberg table that is written on the above config and runtime, I'm able to read it using 12.2 LTS (Spark 3.3, Scala 2.12, Iceberg 1.4.3)

Fokko commented 2 months ago

@rphadke-trinity Thanks for the stacktrace. I'll dig into it 👍

db-trin-life commented 2 months ago

@Fokko the root cause is that Loki is not supported for buffer reading with iceberg right now. We got this from the databricks support team: The error you're encountering, UnsupportedOperationException: Byte-buffer read unsupported by LokiAbfsInputStream, suggests that the LokiAbfsInputStream class does not support the read(ByteBuffer buffer) method, which is being called by Apache Iceberg. This method reads data into a ByteBuffer, a container for data of a specific primitive type. As a result, when Iceberg tries to read data from Azure Blob Storage using this method, it encounters an error because LokiAbfsInputStream does not support it.

We are able to get around it for now. But we will probably need to build in this support. Loki: https://grafana.com/docs/loki/latest/configure/examples/configuration-examples/