apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.52k stars 2.25k forks source link

How to write to a bucket-partitioned table using PySpark? #5977

Closed RLashofRegas closed 2 months ago

RLashofRegas commented 2 years ago

Query engine

Spark

Question

In the documentation for Spark writes, under the section for Writing to partitioned tables, there are two Spark Jira issues linked that explain why (1) pre-write sort is required (SPARK-23889), and (2) manually registering the iceberg bucket function as a UDF is required for bucket-partitioned tables (SPARK-27658). Both of these issues are in the resolved state.

  1. Is the Iceberg documentation still valid?
  2. If not, what release versions of Spark/Iceberg support writing to bucket-partitioned tables without the steps above?
  3. If so, how can I write to a bucket-partitioned table with PySpark given that (as far as I can tell) the bucket function is not available as a Python API so I cannot register it as a UDF?
Neuw84 commented 2 years ago

Hi,

Any hints of this? Stuck on same problem!

Thanks!

robinsinghstudios commented 1 year ago

Hi,

My data is being appended to the partitioned table without registering any UDFs but the data seems to be written as one row per file which is creating a huge performance impact. Any suggestions or fixes will be really appreciated.

RussellSpitzer commented 1 year ago

@robinsinghstudios data needs to be globally sorted on the bucketing function, at least that is my guess with those symptoms. Luckily this should all be less of an issue once the function catalog things in Spark 3.3 are all sorted out.

robinsinghstudios commented 1 year ago

@RussellSpitzer I'm sorting the data frame globally before inserting the data but still facing the same issue.

RussellSpitzer commented 1 year ago

If you aren’t sorting on the partition transform the rows will be in random order with regards to that transform. So if you don’t have the UDF registered and you are just sorting on the column itself I would expect you to generate at least 1 file per bucket per Spark taskOn Dec 2, 2022, at 1:36 AM, robinsinghstudios @.***> wrote: @RussellSpitzer I'm sorting the data frame globally before inserting the data but still facing the same issue.

—Reply to this email directly, view it on GitHub, or unsubscribe.You are receiving this because you were mentioned.Message ID: @.***>

robinsinghstudios commented 1 year ago

@RussellSpitzer Any way to register the bucket UDF with Pyspark? Will the built-in bucket method not work for the same?

fireking77 commented 1 year ago

Hi Guys!

I would also curious about this: Any way to register the bucket UDF with Pyspark? Will the built-in bucket method not work for the same?

@robinsinghstudios have you figured out something? - are you able to write partitioned table with pyspark?

Thanks in advance, Darvi

robinsinghstudios commented 1 year ago

Hi @fireking77 , I was able to create bucketed tables by using SparkSQL query without any data. It just works for some reason.

fireking77 commented 1 year ago

@robinsinghstudios that is OK, but were youable to write data into that table with pyspark? I always get this: caused by: java.lang.IllegalStateException: Incoming records violate the writer assumption that records are clustered by spec and by partition within each spec. Either cluster the incoming records or switch to fanout writers.

-- and I try to make a order by with those specific UDFs as in the doc, but I cannot access it...

robinsinghstudios commented 1 year ago

@fireking77 Merge statement works with SparkSQL. Had the same issue with append pyspark method.

fireking77 commented 1 year ago

@robinsinghstudios Thanks! :D you were right :D I hope there will be someone fix insert too... ;) Thanks again! Darvi

mjf-89 commented 1 year ago

Hi, I don't have a deep understanding of pyspark internals but I think that you can write to a partitioned iceberg table with the following approach:

# registering iceberg udf for partition transformation bucket(32,string_column)
spark.sparkContext._jvm.org.apache.iceberg.spark.IcebergSpark.registerBucketUDF(spark._jsparkSession,'iceberg_bucket_str_32',spark.sparkContext._jvm.org.apache.spark.sql.types.DataTypes.StringType,32)

# sorting using the registerd udf and write to the partitioned iceberg table
df.sortWithinPartitions(F.expr("iceberg_bucket_str_32(string_column)")) \
  .writeTo("iceberg_table") \
  .using("iceberg") \
  .partitionedBy(F.bucket(32,"string_column")) 

Another option as far as I understand is to avoid sorting and use the fanout writer instead:

df.writeTo("iceberg_table") \
  .using('iceberg') \
  .option("fanout-enabled", "true") \
  .partitionedBy(F.bucket(32,"esum_id")) \
  .createOrReplace()
Gatsby-Lee commented 1 year ago

thank you

ignaski commented 1 year ago

Apparently there are sql catalog functions catalog.system.bucket() and catalog.system.truncate() which allows to pre-order the data before writing to partitioned table

pnain commented 10 months ago

df.writeTo("iceberg_table") \ .using('iceberg') \ .option("fanout-enabled", "true") \ .partitionedBy(F.bucket(32,"esum_id")) \ .createOrReplace()

-- This works well but create small files. Hash/Range options are not working despite using the sorting

This is because "F.bucket" can only be used with partitionedBy that means you can't sort on this column and hence can't use other write options (Hash/Range) https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.sql.functions.bucket.html

github-actions[bot] commented 3 months ago

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.

github-actions[bot] commented 2 months ago

This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale'