snowflakedb / spark-snowflake

Snowflake Data Source for Apache Spark.
http://www.snowflake.net
Apache License 2.0
215 stars 99 forks source link

OutOfMemoryError when uploading large partition to GCP snowflake #518

Open kposborne opened 1 year ago

kposborne commented 1 year ago

In the code, it looks like this buffers the whole partition to memory. This was hit when attempting to sync a Postgres table to snowflake. The workaround was to reduce the partition size by setting the partitionColumn, lowerBound, upperBound in the reader options, but this is not always convenient.

https://github.com/snowflakedb/spark-snowflake/blob/fedb7a3c93a6f59c966f81197ea2df34928b24bf/src/main/scala/net/snowflake/spark/snowflake/io/CloudStorageOperations.scala#L717

   // For GCP, the rows are cached and then uploaded.
    else if (fileTransferMetadata.isDefined) {
      // cache the data in buffer
      val outputStream = new ByteArrayOutputStream(4 * 1024 * 1024)
      while (rows.hasNext) {
        outputStream.write(rows.next.getBytes("UTF-8"))
        outputStream.write('\n')
        rowCount += 1
      }
      val data = outputStream.toByteArray
      dataSize = data.size
      outputStream.close()
2023-07-13T16:05:54,485 ERROR executor.Executor: Exception in task 0.0 in stage 8.0 (TID 34)
java.lang.OutOfMemoryError: null
    at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:125) ~[?:?]
    at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:119) ~[?:?]
    at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:95) ~[?:?]
    at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:156) ~[?:?]
    at java.io.OutputStream.write(OutputStream.java:122) ~[?:?]
    at net.snowflake.spark.snowflake.io.CloudStorage.doUploadPartition(CloudStorageOperations.scala:717) ~[net_snowflake_spark_snowflake_2.12_2.12.0_spark_3.4_spark_snowflake_2.12_2.12.0_spark_3.4.jar:2.12.0-spark_3.4]
    at net.snowflake.spark.snowflake.io.CloudStorage.uploadPartition(CloudStorageOperations.scala:611) ~[net_snowflake_spark_snowflake_2.12_2.12.0_spark_3.4_spark_snowflake_2.12_2.12.0_spark_3.4.jar:2.12.0-spark_3.4]
    at net.snowflake.spark.snowflake.io.CloudStorage.uploadPartition$(CloudStorageOperations.scala:594) ~[net_snowflake_spark_snowflake_2.12_2.12.0_spark_3.4_spark_snowflake_2.12_2.12.0_spark_3.4.jar:2.12.0-spark_3.4]
    at net.snowflake.spark.snowflake.io.InternalGcsStorage.uploadPartition(CloudStorageOperations.scala:1726) ~[net_snowflake_spark_snowflake_2.12_2.12.0_spark_3.4_spark_snowflake_2.12_2.12.0_spark_3.4.jar:2.12.0-spark_3.4]
    at net.snowflake.spark.snowflake.io.InternalGcsStorage.$anonfun$upload$2(CloudStorageOperations.scala:1855) ~[net_snowflake_spark_snowflake_2.12_2.12.0_spark_3.4_spark_snowflake_2.12_2.12.0_spark_3.4.jar:2.12.0-spark_3.4]
    at net.snowflake.spark.snowflake.io.InternalGcsStorage.$anonfun$upload$2$adapted(CloudStorageOperations.scala:1839) ~[net_snowflake_spark_snowflake_2.12_2.12.0_spark_3.4_spark_snowflake_2.12_2.12.0_spark_3.4.jar:2.12.0-spark_3.4]
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:905) ~[spark_core_core.jar:?]
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:905) ~[spark_core_core.jar:?]
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark_core_core.jar:?]
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) ~[spark_core_core.jar:?]
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) ~[spark_core_core.jar:?]
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92) ~[spark_core_core.jar:?]
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) ~[spark_core_core.jar:?]
    at org.apache.spark.scheduler.Task.run(Task.scala:139) ~[spark_core_core.jar:?]
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554) ~[spark_core_core.jar:?]
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529) ~[spark_core_core.jar:?]
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557) [spark_core_core.jar:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
    at java.lang.Thread.run(Thread.java:829) [?:?]
sixdimensionalarray commented 1 year ago

Any update on the status of this issue?