apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.16k stars 2.14k forks source link

Implementing Storage Partition join and reducing the time for MERGE command #7832

Closed rajasinghr closed 2 days ago

rajasinghr commented 1 year ago

Query engine

AWS Glue Spark Cluster info: Environment: AWS Glue Worker type: G 1X (4vCPU and 32GB RAM) No of workers: 10

Spark config:

("spark.eventLog.compress", "true"),
        ("spark.eventLog.rolling.enabled", "true"),
        (
            f"spark.sql.catalog.{iceberg_catalog}",
            "org.apache.iceberg.spark.SparkCatalog",
        ),
        (
            f"spark.sql.catalog.{iceberg_catalog}.catalog-impl",
            "org.apache.iceberg.aws.glue.GlueCatalog",
        ),
        (f"spark.sql.catalog.{iceberg_catalog}.glue.skip-archive", "true"),
        (
            f"spark.sql.catalog.{iceberg_catalog}.io-impl",
            "org.apache.iceberg.aws.s3.S3FileIO",
        ),
        (f"spark.sql.catalog.{iceberg_catalog}.warehouse", iceberg_warehouse),
        (
            "spark.sql.extensions",
            "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
        ),
        ("spark.sql.iceberg.handle-timestamp-without-timezone", "true"),
        ("spark.sql.iceberg.planning.preserve-data-grouping", "true"),
        ("spark.sql.join.preferSortMergeJoin", "false"),
        ("spark.sql.sources.partitionOverwriteMode", "dynamic"),
        ("spark.sql.sources.v2.bucketing.enabled", "true"),
        ("spark.sql.parquet.enableVectorizedReader", "true"),
        ("spark.sql.autoBroadcastJoinThreshold", 134217728),
        ("spark.sql.shuffle.partitions",255)

table properties:, 'write.metadata.delete-after-commit.enabled'='true', 'history.expire.max-snapshot-age-ms'='604800000', 'history.expire.min-snapshots-to-keep'='10', 'commit.manifest.min-count-to-merge'='50', 'format'='parquet', 'format-version'='2', 'write.distribution-mode'='hash'

Question

I am running a MERGE between an iceberg table with ~50M rows and temp_view with ~30k rows. The merge is taking around 40 mins to complete. I am trying to improve the performance of MERGE and have been trying to enable storage-partition join but based on my settings the job does a full batch scan for the 50M during joins.

How can we avoid the full batch join and fetch only the selected partitions during the joins? Also how can we improve the performance of the merge command.

We are also trying to implement storage partition join but the job does a shuffledhash join and scans the whole 50M rows every time.

Any help would be appreciated. Thanks

rajasinghr commented 1 year ago

Attaching the recent spark plan. localhost_18080_history_spark-application-1686605110730_SQL_execution__id=5

rajasinghr commented 1 year ago

@RussellSpitzer Any help would be more helpful. Thanks

RussellSpitzer commented 1 year ago

Your plan does not show a SPJ. It shows both sides being shuffled into spark.shuffle.partitions before being hash joined. Fixing this would reduce the number of partitions scanned.

rajasinghr commented 1 year ago

I am using spark.shuffle.partitions because to achieve SPJ I read that we need to have same number of partitions on both side. What am I missing here to fix this?

SInce the documentation is difficult it is very difficult to read more on this

RussellSpitzer commented 1 year ago

If SPJ was being invoked it wouldn't matter what you set spark.shuffle.partitions to. I'm saying that plan shows that all of the criteria for SPJ were not met.

rajasinghr commented 1 year ago

Got it. What are the criteria required to meet SPJ? Is there any spark config to use?

RussellSpitzer commented 1 year ago

Look here https://github.com/apache/iceberg/blob/master/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java#L59 and follow the tests

rajasinghr commented 1 year ago

Thank you so much. Let me try that.

rajasinghr commented 1 year ago

Hi @RussellSpitzer , I tested SPJ just similar to what you have shared above. Still I couldnt get SPJ and see three exchange components instead of one. Let me know if miss anything.


# creating session
%session_id_prefix native-iceberg-sql-
%glue_version 3.0
%idle_timeout 600
%extra_jars s3://com.rippling.controlplane.usw2.datainfrabuilds/iceberg/iceberg-spark-runtime-3.3_2.12-1.3.0.jar
%%configure 
{
  "--conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
  "--datalake-formats": "iceberg"
}
catalog_name = "default"
bucket_name = "dev_bucket"
bucket_prefix = ""
database_name = "core"
warehouse_path = f"s3://{bucket_name}/{bucket_prefix}"

#spark configuration
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .config("spark.sql.warehouse.dir", warehouse_path) \
    .config(f"spark.sql.catalog.{catalog_name}", "org.apache.iceberg.spark.SparkCatalog") \
    .config(f"spark.sql.catalog.{catalog_name}.warehouse", warehouse_path) \
    .config(f"spark.sql.catalog.{catalog_name}.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
    .config(f"spark.sql.catalog.{catalog_name}.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.sources.v2.bucketing.enabled", "true") \
    .config("spark.sql.sources.v2.bucketing.push.part.values.enabled", "true") \
    .config("spark.sql.autoBroadcastJoinThreshold", -1) \
    .config("spark.sql.adaptive.enabled", "false") \
    .config("spark.sql.requireAllClusterKeysForCoPartition", "false") \
    .config("spark.sql.iceberg.planning.preserve-data-grouping", "true") \
    .config("spark.sql.shuffle.partitions", 4) \
    .getOrCreate()

%%sql
CREATE OR REPLACE TABLE  default.core.tbl1 (id BIGINT, dep STRING)
USING iceberg
PARTITIONED BY (bucket(8, id), dep)
TBLPROPERTIES ('read.split.target-size'='16777216','read.split.open-file-cost'='16777216')

%%sql
INSERT INTO default.core.tbl1 VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'), (5, 'e'), (6, 'f');

%%sql
CREATE OR REPLACE TABLE  default.core.tbl2 (id BIGINT, dep STRING)
USING iceberg
PARTITIONED BY (bucket(8, id), dep)
TBLPROPERTIES ('read.split.target-size'='16777216','read.split.open-file-cost'='16777216')

%%sql
INSERT INTO default.core.tbl2 VALUES (1, 'a'), (2, 'b'), (3, 'c');

query = """EXPLAIN SELECT t1.id FROM default.core.tbl1 t1 
INNER JOIN default.core.tbl2 t2 
ON t1.id = t2.id AND t1.dep = t2.dep
ORDER BY t1.id"""
tex = spark.sql(query)
tex.show(truncate=False)

Here is the plan

|== Physical Plan ==
*(6) Sort [id#37L ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(id#37L ASC NULLS FIRST, 4), ENSURE_REQUIREMENTS, [id=#112]
   +- *(5) Project [id#37L]
      +- *(5) SortMergeJoin [id#37L, dep#38], [id#39L, dep#40], Inner
         :- *(2) Sort [id#37L ASC NULLS FIRST, dep#38 ASC NULLS FIRST], false, 0
         :  +- Exchange hashpartitioning(id#37L, dep#38, 4), ENSURE_REQUIREMENTS, [id=#96]
         :     +- *(1) Filter (isnotnull(id#37L) AND isnotnull(dep#38))
         :        +- BatchScan[id#37L, dep#38] default.core.tbl1 [filters=id IS NOT NULL, dep IS NOT NULL]
         +- *(4) Sort [id#39L ASC NULLS FIRST, dep#40 ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(id#39L, dep#40, 4), ENSURE_REQUIREMENTS, [id=#104]
               +- *(3) Filter (isnotnull(id#39L) AND isnotnull(dep#40))
                  +- BatchScan[id#39L, dep#40] default.core.tbl2 [filters=id IS NOT NULL, dep IS NOT NULL]

|
rami-lv commented 1 year ago

I think it is spark.sql.sources.v2.bucketing.pushPartValues.enabled and not spark.sql.sources.v2.bucketing.push.part.values.enabled. Also, can you try setting spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled to true.

dev-goyal commented 1 year ago

Facing the same issue here; merge into using iceberg does not appear to be taking advantage of the bucketing.

Bucketing SQL:

spark.sql(
            f"MERGE INTO ml_recommendations.ratings target "
            f"USING (SELECT * FROM {temp_view}) source "
            f"    ON target.{bucket_col} = source.{bucket_col} "
            f"WHEN MATCHED AND target.{v.RATING_STATE} IS NOT NULL THEN UPDATE SET * "
            f"WHEN NOT MATCHED THEN INSERT *",
        ).show()

Config:

        "spark.sql.session.timeZone=UTC",
        # Enable bucketing on the cluster
        "spark.sql.sources.bucketing.enabled=true",
        "spark.sql.sources.v2.bucketing.enabled=true",
        "spark.sql.iceberg.planning.preserve-data-grouping=true",
        "spark.sql.sources.v2.bucketing.pushPartValues.enabled=true",
        "spark.sql.requireAllClusterKeysForCoPartition=false",
        "spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled=true",

        # Enable Iceberg format on the cluster
        "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
        "spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog",
        "spark.sql.catalog.iceberg.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog",
        "spark.sql.catalog.iceberg.io-impl=org.apache.iceberg.aws.s3.S3FileIO",
        "spark.sql.defaultCatalog=iceberg",

Notice the two exchanges below:

Screenshot 2023-08-09 at 2 35 04 PM Screenshot 2023-08-09 at 2 37 18 PM
RussellSpitzer commented 1 year ago

Both target and source must both be Iceberg Tables with the same partitioning. The plan shown has one side as parquet and one as Iceberg.

dev-goyal commented 1 year ago

Ah interesting - going to try and write a temp iceberg table, and retry!

Btw - do I need to set something akin to

("spark.sql.join.preferSortMergeJoin", "false")

to prefer SPJs?

dev-goyal commented 1 year ago

@RussellSpitzer okay that did it - many thanks once again!

dev-goyal commented 1 year ago

Is there a reason the inner join (WHEN MATCHED) is SPJ, but the left outer join (WHEN NOT MATCHED) is ShuffledHashJoin? Can post an example, but wondering if this is expected behaviour for some reason?

RussellSpitzer commented 1 year ago

Could you raise a new issue? The reason is technical.

dev-goyal commented 1 year ago

Is there a reason the inner join (WHEN MATCHED) is SPJ, but the left outer join (WHEN NOT MATCHED) is ShuffledHashJoin? Can post an example, but wondering if this is expected behaviour for some reason?

@RussellSpitzer ty! https://github.com/apache/iceberg/issues/8387

github-actions[bot] commented 2 weeks ago

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.

github-actions[bot] commented 2 days ago

This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale'