apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.23k stars 2.39k forks source link

[SUPPORT]Slow Performance With Spark Structured Streaming #3324

Open conanxjp opened 3 years ago

conanxjp commented 3 years ago

Describe the problem you faced We are trying to POC Hudi with our existing streaming job which consumes from AWS Kinesis Data Stream, and deliver as parquet in S3. The job is running continuously in an AWS EMR cluster. With the EMR size, and input rate of 70 record/s, the streaming job should be able to process above 200 record/s. But when output as Hudi, it can only process ~ 15 record/s.

What have I tried:

turn off compaction with "hoodie.parquet.small.file.limit" -> "0" use insert instead of upsert use SIMPLE INDEX with "hoodie.index.type" -> "SIMPLE"

Didn't find any effects from the above setting, so I used desired options(listed in the end), and I did some investigations to isolate the operation that causes the delay in my code.

I am using structured streaming with a batch function (1 min interval), the batch function does the following:

df.withColumn("parsed_data", from_json(col("data"), schema))
  .select("parsed_data.*")
  .withColumn("date", col("timestamp").cast(DateType))
  .write
  .format("org.apache.hudi")
  .options(hudiOptions)
  .options(compactOptions)
  .mode(Append)
  .save(outputPath)

The parsed_data contains ~ 1000 columns, and the select("parsed_data.*") is just to move all columns up a level.

But with my tests, the number of columns projected is clearly the bottle neck of the slow performance. For example, if I do this, I can get the over 200 r/s processing rate.

df.withColumn("parsed_data", from_json(col("data"), schema))
  .withColumn("date", col("parsed_data.timestamp").cast(DateType))
  .select("parsed_data.uuid", "parsed_data.timestamp", "date")
  .write
  .format("org.apache.hudi")
  .options(hudiOptions)
  .options(compactOptions)
  .mode(Append)
  .save(outputPath)

If I do an explicit select and controls the number of columns selected, I can clearly observe the difference in performance:

500 columns ~ 37 record/s 250 columns ~ 68 record/s 128 columns ~ 104 record/s 64 columns ~ 160 record/s

Roughly an O(n) performance with the number of columns selected. (deviation from ideal linear correlation should be from the constant testing input rates and overhead).

This behavior is not normal, and I also did this:

df.withColumn("parsed_data", from_json(col("data"), schema))
  .withColumn("date", col("parsed_data.timestamp").cast(DateType))
  .select("parsed_data.uuid", "parsed_data.timestamp", "date", "parsed_data")
  .write
  .format("org.apache.hudi")
  .options(hudiOptions)
  .options(compactOptions)
  .mode(Append)
  .save(outputPath)

It can also performs well even with the columns nested in the parsed_data column. And the existing version without Hudi do the select * without performance issue as well.

Here are the DAG of the slow stage(5)

Screen Shot 2021-07-21 at 6 57 39 PM

And the slow job(13) Screen Shot 2021-07-21 at 3 32 44 PM

I have gone through most of the issues related to performance and tried them, so far didn't find anything that can fix this. Please help to give me some advice on what could be the reason for this performance issue.

Thank you!

Here are some configs:

val hudiOptions: Map[String, String] = Map(
  TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE",
  RECORDKEY_FIELD_OPT_KEY -> "uuid",
  PARTITIONPATH_FIELD_OPT_KEY -> "date",
  PRECOMBINE_FIELD_OPT_KEY -> "timestamp",
  OPERATION_OPT_KEY -> "upsert",
  TABLE_NAME -> "hudi_test",
  "hoodie.upsert.shuffle.parallelism"->  "6",
  "hoodie.insert.shuffle.parallelism"-> "6",
  "hoodie.bulkinsert.shuffle.parallelism"-> "6",
  "hoodie.parquet.small.file.limit" -> "104857600",
  "hoodie.index.type" -> "BLOOM"
)
val compactOptions: Map[String, String] = Map(
  "hoodie.compact.inline.max.delta.commits" -> "5",
  "hoodie.cleaner.commits.retained" -> "4",
  "hoodie.cleaner.fileversions.retained" -> "4",
  "hoodie.keep.min.commits" -> "5",
  "hoodie.keep.max.commits" -> "6",
  "hoodie.parquet.compression.codec" -> "snappy"
)
--deploy-mode cluster \
--num-executors 7 \
--executor-cores 1 \
--executor-memory 2g \
--conf spark.memory.fraction=0.1 \
--conf spark.executor.memoryOverhead=1024 \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.executorAllocationRatio=1 \
--conf spark.dynamicAllocation.executorIdleTimeout=200s \
--conf spark.dynamicAllocation.cachedExecutorIdleTimeout=200s \
--conf spark.sql.hive.convertMetastoreParquet=false \

Environment Description

nsivabalan commented 2 years ago

Hey @conanxjp : sorry for the late turn around. Would recommend using MOR table as its write optimized. Especially for streaming use-cases, COW might not work out since it takes previous snapshot and merges incoming records and rewrites new version for data files. with MOR(Merge on Read) table, writes are just appended as log files and later merged during real time read. You can schedule compaction which will compact the base data files and log files to a single compacted base file (newer version of data file).

Once you have similar data w/ MOR table type, we can investigate further for index configs and so on. hoodie.datasource.write.table.type: MERGE_ON_READ

nsivabalan commented 2 years ago

with MOR, there are 3 types of queries that could be of benefit to you. Config : https://hudi.apache.org/docs/configurations#query_type_opt_key Snapshot/Realtime read : reads entire data for latest snapshot.

ReadOptimized query: "read_optimized" As I was telling you earlier, for a given data file, depending on your compaction schedule, there could be some delta log files. For snapshot reads, these will be merged with base data files and then served. Where as for ReadOptimized query, only the base data files will be read. If you can give up on freshness, your queries will be much faster since there is not real time merge involved.

And then you have incremental read which will give you delta records between commits.

nsivabalan commented 2 years ago

@conanxjp : Gentle ping. Did you get a chance to try w/ MOR table.

nsivabalan commented 2 years ago

@conanxjp : did you get a chance to try the above suggestion

conanxjp commented 2 years ago

@nsivabalan Sorry for the delay, here are some updates.

The weird behavior I reported maybe caused by an amazon version of spark, but depends on the versions, it sometimes can be triggered by a combination of hudi and amazon spark.

To the MOR table, I did give it a try, as well as the clustering feature. The compaction for MOR doesn't have a good use case for our streaming app as the app is doing deduplication on the run and every records delivered will not be modified, not by the streaming app itself. We do have batch external modifications jobs that ran occasionally, but we have the requirements to not interrupt the running streaming app as the most fresh data is always used. With the hudi commits, it seems we can't run parallel hudi jobs committing to the same location even though the streaming app and the external modification jobs are touching different partitions of the table. Not sure whether there is any way that we can achieve this.

vinothchandar commented 2 years ago

I am gathering these issues under perf. and give it a go.

nsivabalan commented 1 year ago

@conanxjp : we have done lot of improvements around perf w/ hudi. https://hudi.apache.org/blog/2022/06/29/Apache-Hudi-vs-Delta-Lake-transparent-tpc-ds-lakehouse-performance-benchmarks

Can you try out 0.12 and let us know if you are still facing the issue. and we can further investigate.

nsivabalan commented 1 year ago

@conanxjp : I see you have asked about concurrent writers. we do have multi writer support from 0.11 that you can give it a try. https://hudi.apache.org/docs/concurrency_control feel free to close out the issue if you got the issue resolved.