AbsaOSS / hyperdrive

Extensible streaming ingestion pipeline on top of Apache Spark
Apache License 2.0
44 stars 13 forks source link

Spark Tests fail on Mac M1 chip #276

Closed kevinwallimann closed 1 year ago

kevinwallimann commented 1 year ago

Description

On Mac M1 chip, executing Spark Tests fails because the snappy library is missing.

How to reproduce

On Mac M1 chip, execute mvn clean package. The following error is thrown for Spark Tests (example below)

- should write to partition version=2 when version=1 already exists for the same date *** FAILED ***
  org.apache.spark.sql.streaming.StreamingQueryException: Query [id = 5f41332f-f6a0-4298-82c5-741549cc55d2, runId = 0cf2b14a-206d-4d5e-8004-4eb5224fc3aa] terminated with exception: Job aborted.
  at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:302)
  at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
  ...
  Cause: org.apache.spark.SparkException: Job aborted.
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
  at org.apache.spark.sql.execution.streaming.FileStreamSink.addBatch(FileStreamSink.scala:141)
  at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:537)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:78)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
  at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$14(MicroBatchExecution.scala:536)
  at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:351)
  at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:349)
  at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
  ...
  Cause: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 7.0 failed 1 times, most recent failure: Lost task 1.0 in stage 7.0 (TID 26, localhost, executor driver): org.apache.spark.SparkException: Task failed while writing rows.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:257)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:177)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.xerial.snappy.Snappy
    at org.apache.parquet.hadoop.codec.SnappyCompressor.compress(SnappyCompressor.java:67)
    at org.apache.hadoop.io.compress.CompressorStream.compress(CompressorStream.java:81)
    at org.apache.hadoop.io.compress.CompressorStream.finish(CompressorStream.java:92)
    at org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:165)
    at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95)
    at org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
    at org.apache.parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:235)
    at org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:122)
    at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172)
    at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:114)
    at org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:165)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetOutputWriter.scala:42)
    at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:57)
    at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:74)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:247)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:242)
    ... 9 more

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:1889)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1877)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1876)
  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:926)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:926)
  at scala.Option.foreach(Option.scala:407)
  ...
  Cause: org.apache.spark.SparkException: Task failed while writing rows.
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:257)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:177)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
  at org.apache.spark.scheduler.Task.run(Task.scala:121)
  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:750)
  ...
  Cause: java.lang.NoClassDefFoundError: Could not initialize class org.xerial.snappy.Snappy
  at org.apache.parquet.hadoop.codec.SnappyCompressor.compress(SnappyCompressor.java:67)
  at org.apache.hadoop.io.compress.CompressorStream.compress(CompressorStream.java:81)
  at org.apache.hadoop.io.compress.CompressorStream.finish(CompressorStream.java:92)
  at org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:165)
  at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95)
  at org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
  at org.apache.parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:235)
  at org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:122)
  at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172)
  at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:114)

Expected behaviour

Tests should pass without error.

Suggested fixes

Explicitly add the latest snappy library for Mac M1.

Workarounds

Run with Spark 3. The snappy-java dependency has version 1.1.8.4 in Spark 3.2.2 mvn clean package -Pspark-3,scala-2.12

lsulak commented 1 year ago

Hi @kevinwallimann, so the question here is whether to explicitly provide Snappy library v1.1.8.4 into dependencies of a project (no matter what the processor type is?), or on the localhost's side?

I'm a bit curious about the solution because I had this issue before.

kevinwallimann commented 1 year ago

Hi @lsulak No, I tried that solution, and it worked in IntelliJ, but then some tests (which actively use snappy) failed when I ran them using maven. Also it only affects Spark 2.4, because Spark 3.2. has snappy v1.1.8.4 Exception is this:

- should return the filenames that are missing in the metadata log *** FAILED ***
  org.apache.spark.sql.streaming.StreamingQueryException: Query [id = cbb7c2ef-2704-446e-9512-c87ced411728, runId = 6560e75c-56fa-45c4-99b5-42078693f0a0] terminated with exception: Job aborted.
  at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:302)
  at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
  ...
  Cause: org.apache.spark.SparkException: Job aborted.
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
  at org.apache.spark.sql.execution.streaming.FileStreamSink.addBatch(FileStreamSink.scala:141)
  at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:537)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:78)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
  at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$14(MicroBatchExecution.scala:536)
  at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:351)
  at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:349)
  at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
  ...
  Cause: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost, executor driver): org.apache.spark.SparkException: Task failed while writing rows.
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:257)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:177)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.IllegalArgumentException
        at java.nio.Buffer.limit(Buffer.java:275)
        at org.xerial.snappy.Snappy.compress(Snappy.java:156)
        at org.apache.parquet.hadoop.codec.SnappyCompressor.compress(SnappyCompressor.java:76)
        at org.apache.hadoop.io.compress.CompressorStream.compress(CompressorStream.java:81)
        at org.apache.hadoop.io.compress.CompressorStream.finish(CompressorStream.java:92)
        at org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:165)
        at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95)
        at org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
        at org.apache.parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:235)
        at org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:122)
        at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172)
        at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:114)
        at org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:165)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetOutputWriter.scala:42)
        at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:57)
        at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:74)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:247)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:242)

Btw, I added the dependency like this in the profiles section:

        <profile>
            <id>apple-m1</id>
            <activation>
                <os>
                    <name>mac os x</name>
                    <family>mac</family>
                    <arch>aarch64</arch>
                </os>
            </activation>
            <dependencies>
                <dependency>
                    <groupId>org.xerial.snappy</groupId>
                    <artifactId>snappy-java</artifactId>
                    <version>1.1.8.4</version>
                    <scope>test</scope>
                </dependency>
            </dependencies>
        </profile>

It should be easy to figure out what's the problem, since you just need to figure out what's the difference between running the test in IntelliJ vs running it via maven. However, I couldn't and gave up after a few hours, since the issue doesn't actually block us.