apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.23k stars 2.39k forks source link

[SUPPORT] Process killed with no additional info when loading large parquet files in Spark #10697

Closed alberttwong closed 5 months ago

alberttwong commented 5 months ago

Trying to load a 1.1 GB parquet file and it just exits with no message. You can get the parquet file from https://forum.starrocks.io/t/retail-ecommerce-funnel-analysis-demo-with-1-million-members-and-87-million-record-dataset-using-starrocks/269

Use case: I'd like to have duplicate entries... meaning if I have 1000 rows in my parquet... I want 1000 rows in my table even if they are duplicates. No updates to existing entries, only append. If 2 identical records are loaded, hudi retains them as two records, rather than one.

using ./spark-shell --driver-memory 4G with code

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import scala.collection.JavaConversions._

val df = spark.read.parquet("s3a://huditest/user_behavior_sample_data.parquet")

val databaseName = "hudi_sample"
val tableName = "hudi_coders_hive"
val basePath = "s3a://huditest/hudi_coders"

df.write.format("hudi").
  option(org.apache.hudi.config.HoodieWriteConfig.TABLE_NAME, tableName).
  option(RECORDKEY_FIELD_OPT_KEY, "UserID").
  option(PRECOMBINE_FIELD_OPT_KEY, "UserID").  
  option("hoodie.datasource.hive_sync.enable", "true").
  option("hoodie.datasource.hive_sync.mode", "hms").
  option("hoodie.datasource.hive_sync.database", databaseName).
  option("hoodie.datasource.hive_sync.table", tableName).
  option("hoodie.datasource.hive_sync.metastore.uris", "thrift://hive-metastore:9083").
  option("fs.defaultFS", "s3://huditest/").  
  mode(Overwrite).
  save(basePath)

error message is

warning: one deprecation; for details, enable `:setting -deprecation' or `:replay -deprecation'
29623 [main] WARN  org.apache.hudi.common.config.DFSPropertiesConfiguration  - Cannot find HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf
29641 [main] WARN  org.apache.hudi.common.config.DFSPropertiesConfiguration  - Properties file file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file
29666 [main] WARN  org.apache.hudi.HoodieSparkSqlWriter$  - hoodie table at s3a://huditest/hudi_coders already exists. Deleting existing data & overwriting with new data.
31627 [main] WARN  org.apache.hudi.metadata.HoodieBackedTableMetadata  - Metadata table was not found at path s3a://huditest/hudi_coders/.hoodie/metadata
32069 [main] WARN  org.apache.hadoop.fs.s3a.S3ABlockOutputStream  - Application invoked the Syncable API against stream writing to hudi_coders/.hoodie/metadata/files/.files-0000_00000000000000.log.1_0-0-0. This is unsupported
35905 [Executor task launch worker for task 0.0 in stage 7.0 (TID 7)] WARN  org.apache.hadoop.metrics2.impl.MetricsConfig  - Cannot locate configuration: tried hadoop-metrics2-hbase.properties,hadoop-metrics2.properties
./spark-shell: line 47:  2169 Killed                  "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@"
ad1happy2go commented 5 months ago

@alberttwong Are you running on local mode? Parquet files are compressed, so uncompressed data might be more and probably need more memory. Can you try increasing driver-memory.

ad1happy2go commented 5 months ago

@alberttwong Also can you try once, spark.read.parquet(1 gb parquet file) and see if spark is able to process that with 4 GB of memory.

alberttwong commented 5 months ago

Tried ./spark-shell --driver-memory 8G --executor-memory 8G and getting same error. It's just process killed.

ad1happy2go commented 5 months ago

@alberttwong Are you running spark on local mode? Could you read this parquet file using spark.read.parquet ?

alberttwong commented 5 months ago

I am running it in local mode. when you say read this parquet file using spark.read.parquet? What do you mean?

[root@spark-hudi bin]# ./spark-shell --driver-memory 8G --executor-memory 8G
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/spark-3.2.1-bin-hadoop3.2/jars/spark-unsafe_2.12-3.2.1.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
8003 [main] WARN  org.apache.hadoop.util.NativeCodeLoader  - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
9045 [main] WARN  org.apache.spark.util.Utils  - Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
Spark context Web UI available at http://spark-hudi:4042
Spark context available as 'sc' (master = local[*], app id = local-1708364567333).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.2.1
      /_/

Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 11.0.16.1)
Type in expressions to have them evaluated.
Type :help for more information.
scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._

scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row

scala> import org.apache.spark.sql.SaveMode._
import org.apache.spark.sql.SaveMode._

scala> import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceReadOptions._

scala> import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.DataSourceWriteOptions._

scala> import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.config.HoodieWriteConfig._

scala> import scala.collection.JavaConversions._
import scala.collection.JavaConversions._

scala> 

scala> val df = spark.read.parquet("s3a://huditest/user_behavior_sample_data.parquet")
df: org.apache.spark.sql.DataFrame = [UserID: bigint, ItemID: bigint ... 3 more fields]

scala> 

scala> val databaseName = "hudi_sample"
databaseName: String = hudi_sample

scala> val tableName = "hudi_coders_hive"
tableName: String = hudi_coders_hive

scala> val basePath = "s3a://huditest/hudi_coders"
basePath: String = s3a://huditest/hudi_coders

scala> 

scala> df.write.format("hudi").
     |   option(org.apache.hudi.config.HoodieWriteConfig.TABLE_NAME, tableName).
     |   option(RECORDKEY_FIELD_OPT_KEY, "UserID").
     |   option(PRECOMBINE_FIELD_OPT_KEY, "UserID").  
     |   option("hoodie.datasource.hive_sync.enable", "true").
     |   option("hoodie.datasource.hive_sync.mode", "hms").
     |   option("hoodie.datasource.hive_sync.database", databaseName).
     |   option("hoodie.datasource.hive_sync.table", tableName).
     |   option("hoodie.datasource.hive_sync.metastore.uris", "thrift://hive-metastore:9083").
     |   option("fs.defaultFS", "s3://huditest/").  
     |   mode(Overwrite).
     |   save(basePath)
warning: one deprecation; for details, enable `:setting -deprecation' or `:replay -deprecation'
405045 [main] WARN  org.apache.hudi.metadata.HoodieBackedTableMetadata  - Metadata table was not found at path s3a://huditest/hudi_coders/.hoodie/metadata
405550 [main] WARN  org.apache.hadoop.fs.s3a.S3ABlockOutputStream  - Application invoked the Syncable API against stream writing to hudi_coders/.hoodie/metadata/files/.files-0000_00000000000000.log.1_0-0-0. This is unsupported
./spark-shell: line 47:  4872 Killed                  "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@"
alberttwong commented 5 months ago

upgrading from hudi 0.11 to 0.14.1

[root@spark-hudi bin]# spark-shell --packages org.apache.hudi:hudi-spark$SPARK_VERSION-bundle_2.12:0.14.1 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar' --driver-memory 4G
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/spark-3.2.1-bin-hadoop3.2/jars/spark-unsafe_2.12-3.2.1.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
:: loading settings :: url = jar:file:/spark-3.2.1-bin-hadoop3.2/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.hudi#hudi-spark3.2-bundle_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-9b4a8c4b-e4e2-4b55-b29b-cacc399b9481;1.0
        confs: [default]
        found org.apache.hudi#hudi-spark3.2-bundle_2.12;0.14.1 in central
:: resolution report :: resolve 202ms :: artifacts dl 2ms
        :: modules in use:
        org.apache.hudi#hudi-spark3.2-bundle_2.12;0.14.1 from central in [default]
        ---------------------------------------------------------------------
        |                  |            modules            ||   artifacts   |
        |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
        ---------------------------------------------------------------------
        |      default     |   1   |   0   |   0   |   0   ||   1   |   0   |
        ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-9b4a8c4b-e4e2-4b55-b29b-cacc399b9481
        confs: [default]
        0 artifacts copied, 1 already retrieved (0kB/7ms)
24/02/19 18:15:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/19 18:15:57 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
Spark context Web UI available at http://spark-hudi:4042
Spark context available as 'sc' (master = local[*], app id = local-1708366558050).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.2.1
      /_/

Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 11.0.16.1)
Type in expressions to have them evaluated.
Type :help for more information.

scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._

scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row

scala> import org.apache.spark.sql.SaveMode._
import org.apache.spark.sql.SaveMode._

scala> import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceReadOptions._

scala> import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.DataSourceWriteOptions._

scala> import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.config.HoodieWriteConfig._

scala> import scala.collection.JavaConversions._
import scala.collection.JavaConversions._

scala> 

scala> val df = spark.read.parquet("s3a://huditest/user_behavior_sample_data.parquet")
df: org.apache.spark.sql.DataFrame = [UserID: bigint, ItemID: bigint ... 3 more fields]

scala> 

scala> val databaseName = "hudi_sample"
databaseName: String = hudi_sample

scala> val tableName = "hudi_coders_hive"
tableName: String = hudi_coders_hive

scala> val basePath = "s3a://huditest/hudi_coders"
basePath: String = s3a://huditest/hudi_coders

scala> 

scala> df.write.format("hudi").
     |   option(org.apache.hudi.config.HoodieWriteConfig.TABLE_NAME, tableName).
     |   option(RECORDKEY_FIELD_OPT_KEY, "UserID").
     |   option(PRECOMBINE_FIELD_OPT_KEY, "UserID").  
     |   option("hoodie.datasource.hive_sync.enable", "true").
     |   option("hoodie.datasource.hive_sync.mode", "hms").
     |   option("hoodie.datasource.hive_sync.database", databaseName).
     |   option("hoodie.datasource.hive_sync.table", tableName).
     |   option("hoodie.datasource.hive_sync.metastore.uris", "thrift://hive-metastore:9083").
     |   option("fs.defaultFS", "s3://huditest/").  
     |   mode(Overwrite).
     |   save(basePath)
warning: one deprecation; for details, enable `:setting -deprecation' or `:replay -deprecation'
24/02/19 18:16:18 WARN HoodieSparkSqlWriterInternal: hoodie table at s3a://huditest/hudi_coders already exists. Deleting existing data & overwriting with new data.
24/02/19 18:16:21 WARN S3ABlockOutputStream: Application invoked the Syncable API against stream writing to hudi_coders/.hoodie/metadata/files/.files-0000-0_00000000000000010.log.1_0-0-0. This is unsupported
/spark/bin/spark-shell: line 47: 12322 Killed                  "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@"
alberttwong commented 5 months ago

I also tried to upgrade spark to 3.4 from 3.2 and still use Hudi 0.14.1 and it still process killed.

alberttwong commented 5 months ago
[root@spark-hudi jars]# ls *aws*
aws-java-sdk-bundle-1.12.48.jar  hadoop-aws-3.3.1.jar
alberttwong commented 5 months ago

I tried to write the scala code another way and still have the same process killed issue.

import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.common.model.HoodieRecord
val tableName = "hudi_trips_cow"
val basePath = "s3a://hudi/hudi_trips_cow"
val df = spark.read.parquet("s3a://huditest/user_behavior_sample_data.parquet")
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "UserID").
option(RECORDKEY_FIELD_OPT_KEY, "UserID").
option(PARTITIONPATH_FIELD_OPT_KEY, "CategoryID").
option(TABLE_NAME, tableName).
  option("hoodie.datasource.hive_sync.enable", "true").
  option("hoodie.datasource.hive_sync.mode", "hms").
  option("hoodie.datasource.hive_sync.database", "hudi_sample").
  option("hoodie.datasource.hive_sync.table", tableName).
  option("hoodie.datasource.hive_sync.metastore.uris", "thrift://hive-metastore:9083").
mode(Overwrite).
save(basePath)
ad1happy2go commented 5 months ago

@alberttwong Thanks for trying all that. I issue here is not the code but memory. I guess you will be able to run same code with small parquet size.

Can you try to do df.write.format("parquet").save("some test parquet path") and see if that works.

If that also fails, then certainly this file is too big to process in available resources.

alberttwong commented 5 months ago

I have confirmed that ingestion of the 1.1 GB file works fine on onehouse.ai and I was able to query the data.

alberttwong commented 5 months ago

I had to allocate more memory for docker desktop. That fixed the issue.

Screenshot 2024-02-20 at 9 50 49 PM

ad1happy2go commented 5 months ago

Had working session with @alberttwong . The issue was docker instance was not given enough memory. We were able to read it using 8 GB memory. Closing this out. Thanks again @alberttwong .

alberttwong commented 5 months ago

I would also add that since the parquet was compressed with zippy with 90% compression on some columns, this is why spark had a tough time with processing the file. You can either 1. increase memory or 2. break up the parquet file into smaller files.

Also had to run driver memory with a lot of memory. executor memory was not needed since running spark locally.

spark-shell --packages org.apache.hudi:hudi-spark$SPARK_VERSION-bundle_2.12:0.14.1 --driver-memory 24G