apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.23k stars 2.17k forks source link

Partition Pruning not happening as expected #5909

Closed arunb2w closed 1 year ago

arunb2w commented 1 year ago

Apache Iceberg version

0.14.0

Query engine

EMR

Please describe the bug šŸž

spark version : 3.2.1 iceberg version: 0.14.0 Running the below code in EMR with Glue catalog Target: I have an iceberg table(EPAYMENT) which contains 457M records in it and it is about 89.4G in size. It is a partitioned table based on the column _CONTEXTID and it has 18K partitions. This table resides in S3 and has catalog as Glue catalog. Input: My incoming batch to update to this iceberg table contains 335K records which needs to access 5K partitions to update these 335K records. The incoming batch is in the form of spark dataframe so am creating a view out of it and using it in the merge to upsert the records. Merge query am using to upsert the incoming batch.

deflateDf.createOrReplaceTempView("deflate_table")

merge into glue_dev.epaymentnightly.epayment as target
        using (
            select * from deflate_table as deflate
        )
        on (cast(target._context_id_ as int) = cast(deflate._context_id_ as int) and cast(target.id as int) = cast(deflate.id as int) )
        when matched 
        then update set 
        target.AMOUNT = cast(if(array_contains(deflate.changed_cols, 'AMOUNT'), deflate.AMOUNT, target.AMOUNT) as decimal(12,2)),target.AUTHORIZATIONCODE = cast(if(array_contains(deflate.changed_cols, 'AUTHORIZATIONCODE'), deflate.AUTHORIZATIONCODE, target.AUTHORIZATIONCODE) as string),target.AUTOMATEDYN = cast(if(array_contains(deflate.changed_cols, 'AUTOMATEDYN'), deflate.AUTOMATEDYN, target.AUTOMATEDYN) as string),target.CONTRACTID = cast(if(array_contains(deflate.changed_cols, 'CONTRACTID'), deflate.CONTRACTID, target.CONTRACTID) as decimal(12,0)),target.CREATED = cast(if(array_contains(deflate.changed_cols, 'CREATED'), deflate.CREATED, target.CREATED) as timestamp),target.CREATEDBY = cast(if(array_contains(deflate.changed_cols, 'CREATEDBY'), deflate.CREATEDBY, target.CREATEDBY) as string),target.DELETED = cast(if(array_contains(deflate.changed_cols, 'DELETED'), deflate.DELETED, target.DELETED) as timestamp),target.DELETEDBY = cast(if(array_contains(deflate.changed_cols, 'DELETEDBY'), deflate.DELETEDBY, target.DELETEDBY) as string),target.DEVICESOFTWAREVERSION = cast(if(array_contains(deflate.changed_cols, 'DEVICESOFTWAREVERSION'), deflate.DEVICESOFTWAREVERSION, target.DEVICESOFTWAREVERSION) as string),target.EPAYMENTCARDID = cast(if(array_contains(deflate.changed_cols, 'EPAYMENTCARDID'), deflate.EPAYMENTCARDID, target.EPAYMENTCARDID) as decimal(12,0)),target.GATEWAYRESPONSECODE = cast(if(array_contains(deflate.changed_cols, 'GATEWAYRESPONSECODE'), deflate.GATEWAYRESPONSECODE, target.GATEWAYRESPONSECODE) as string),target.GATEWAYRESPONSEMESSAGE = cast(if(array_contains(deflate.changed_cols, 'GATEWAYRESPONSEMESSAGE'), deflate.GATEWAYRESPONSEMESSAGE, target.GATEWAYRESPONSEMESSAGE) as string),target.HOSTRESPONSECODE = cast(if(array_contains(deflate.changed_cols, 'HOSTRESPONSECODE'), deflate.HOSTRESPONSECODE, target.HOSTRESPONSECODE) as string),target.HOSTRESPONSEMESSAGE = cast(if(array_contains(deflate.changed_cols, 'HOSTRESPONSEMESSAGE'), deflate.HOSTRESPONSEMESSAGE, target.HOSTRESPONSEMESSAGE) as string),target.ID = cast(if(array_contains(deflate.changed_cols, 'ID'), deflate.ID, target.ID) as decimal(12,0)),target.LASTMODIFIED = cast(if(array_contains(deflate.changed_cols, 'LASTMODIFIED'), deflate.LASTMODIFIED, target.LASTMODIFIED) as timestamp),target.LASTMODIFIEDBY = cast(if(array_contains(deflate.changed_cols, 'LASTMODIFIEDBY'), deflate.LASTMODIFIEDBY, target.LASTMODIFIEDBY) as string),target.LOCKDATE = cast(if(array_contains(deflate.changed_cols, 'LOCKDATE'), deflate.LOCKDATE, target.LOCKDATE) as timestamp),target.LOCKID = cast(if(array_contains(deflate.changed_cols, 'LOCKID'), deflate.LOCKID, target.LOCKID) as string),target.MANUALLYRESOLVEDYN = cast(if(array_contains(deflate.changed_cols, 'MANUALLYRESOLVEDYN'), deflate.MANUALLYRESOLVEDYN, target.MANUALLYRESOLVEDYN) as string),target.ORIGINALDEVICEID = cast(if(array_contains(deflate.changed_cols, 'ORIGINALDEVICEID'), deflate.ORIGINALDEVICEID, target.ORIGINALDEVICEID) as string),target.ORIGINALEPAYMENTID = cast(if(array_contains(deflate.changed_cols, 'ORIGINALEPAYMENTID'), deflate.ORIGINALEPAYMENTID, target.ORIGINALEPAYMENTID) as decimal(12,0)),target.QUICKPAYCODE = cast(if(array_contains(deflate.changed_cols, 'QUICKPAYCODE'), deflate.QUICKPAYCODE, target.QUICKPAYCODE) as string),target.RAWREQUEST = cast(if(array_contains(deflate.changed_cols, 'RAWREQUEST'), deflate.RAWREQUEST, target.RAWREQUEST) as string),target.RAWRESPONSE = cast(if(array_contains(deflate.changed_cols, 'RAWRESPONSE'), deflate.RAWRESPONSE, target.RAWRESPONSE) as string),target.RECEIPTSENTDATE = cast(if(array_contains(deflate.changed_cols, 'RECEIPTSENTDATE'), deflate.RECEIPTSENTDATE, target.RECEIPTSENTDATE) as timestamp),target.REQUESTDATE = cast(if(array_contains(deflate.changed_cols, 'REQUESTDATE'), deflate.REQUESTDATE, target.REQUESTDATE) as timestamp),target.REQUESTTYPE = cast(if(array_contains(deflate.changed_cols, 'REQUESTTYPE'), deflate.REQUESTTYPE, target.REQUESTTYPE) as string),target.RESPONSEDATE = cast(if(array_contains(deflate.changed_cols, 'RESPONSEDATE'), deflate.RESPONSEDATE, target.RESPONSEDATE) as timestamp),target.ROUTE = cast(if(array_contains(deflate.changed_cols, 'ROUTE'), deflate.ROUTE, target.ROUTE) as string),target.STATUS = cast(if(array_contains(deflate.changed_cols, 'STATUS'), deflate.STATUS, target.STATUS) as string),target.TERMINALID = cast(if(array_contains(deflate.changed_cols, 'TERMINALID'), deflate.TERMINALID, target.TERMINALID) as string),target.TYPE = cast(if(array_contains(deflate.changed_cols, 'TYPE'), deflate.TYPE, target.TYPE) as string),target._ETL_RUN_ID_ = cast(if(array_contains(deflate.changed_cols, '_ETL_RUN_ID_'), deflate._ETL_RUN_ID_, target._ETL_RUN_ID_) as decimal(38,0)),target._ETL_MODIFIED_ = cast(if(array_contains(deflate.changed_cols, '_ETL_MODIFIED_'), deflate._ETL_MODIFIED_, target._ETL_MODIFIED_) as timestamp),target._EXTRACTED_ = cast(if(array_contains(deflate.changed_cols, '_EXTRACTED_'), deflate._EXTRACTED_, target._EXTRACTED_) as timestamp),target._SOURCE_EXTRACTED_ = cast(if(array_contains(deflate.changed_cols, '_SOURCE_EXTRACTED_'), deflate._SOURCE_EXTRACTED_, target._SOURCE_EXTRACTED_) as timestamp),target._LAST_MODIFIED_SEQ_ = cast(if(array_contains(deflate.changed_cols, '_LAST_MODIFIED_SEQ_'), deflate._LAST_MODIFIED_SEQ_, target._LAST_MODIFIED_SEQ_) as decimal(38,0)),target._SCHEMA_CLASS_ = cast(if(array_contains(deflate.changed_cols, '_SCHEMA_CLASS_'), deflate._SCHEMA_CLASS_, target._SCHEMA_CLASS_) as string),target._CONTEXT_ID_ = cast(if(array_contains(deflate.changed_cols, '_CONTEXT_ID_'), deflate._CONTEXT_ID_, target._CONTEXT_ID_) as decimal(12,0)),target._IS_DELETED_ = cast(if(array_contains(deflate.changed_cols, '_IS_DELETED_'), deflate._IS_DELETED_, target._IS_DELETED_) as boolean)
        when not matched
        then insert 
        (AMOUNT,AUTHORIZATIONCODE,AUTOMATEDYN,CONTRACTID,CREATED,CREATEDBY,DELETED,DELETEDBY,DEVICESOFTWAREVERSION,EPAYMENTCARDID,GATEWAYRESPONSECODE,GATEWAYRESPONSEMESSAGE,HOSTRESPONSECODE,HOSTRESPONSEMESSAGE,ID,LASTMODIFIED,LASTMODIFIEDBY,LOCKDATE,LOCKID,MANUALLYRESOLVEDYN,ORIGINALDEVICEID,ORIGINALEPAYMENTID,QUICKPAYCODE,RAWREQUEST,RAWRESPONSE,RECEIPTSENTDATE,REQUESTDATE,REQUESTTYPE,RESPONSEDATE,ROUTE,STATUS,TERMINALID,TYPE,_ETL_RUN_ID_,_ETL_MODIFIED_,_EXTRACTED_,_SOURCE_EXTRACTED_,_LAST_MODIFIED_SEQ_,_SCHEMA_CLASS_,_CONTEXT_ID_,_IS_DELETED_) values (cast(deflate.AMOUNT as decimal(12,2)),cast(deflate.AUTHORIZATIONCODE as string),cast(deflate.AUTOMATEDYN as string),cast(deflate.CONTRACTID as decimal(12,0)),cast(deflate.CREATED as timestamp),cast(deflate.CREATEDBY as string),cast(deflate.DELETED as timestamp),cast(deflate.DELETEDBY as string),cast(deflate.DEVICESOFTWAREVERSION as string),cast(deflate.EPAYMENTCARDID as decimal(12,0)),cast(deflate.GATEWAYRESPONSECODE as string),cast(deflate.GATEWAYRESPONSEMESSAGE as string),cast(deflate.HOSTRESPONSECODE as string),cast(deflate.HOSTRESPONSEMESSAGE as string),cast(deflate.ID as decimal(12,0)),cast(deflate.LASTMODIFIED as timestamp),cast(deflate.LASTMODIFIEDBY as string),cast(deflate.LOCKDATE as timestamp),cast(deflate.LOCKID as string),cast(deflate.MANUALLYRESOLVEDYN as string),cast(deflate.ORIGINALDEVICEID as string),cast(deflate.ORIGINALEPAYMENTID as decimal(12,0)),cast(deflate.QUICKPAYCODE as string),cast(deflate.RAWREQUEST as string),cast(deflate.RAWRESPONSE as string),cast(deflate.RECEIPTSENTDATE as timestamp),cast(deflate.REQUESTDATE as timestamp),cast(deflate.REQUESTTYPE as string),cast(deflate.RESPONSEDATE as timestamp),cast(deflate.ROUTE as string),cast(deflate.STATUS as string),cast(deflate.TERMINALID as string),cast(deflate.TYPE as string),cast(deflate._ETL_RUN_ID_ as decimal(38,0)),cast(deflate._ETL_MODIFIED_ as timestamp),cast(deflate._EXTRACTED_ as timestamp),cast(deflate._SOURCE_EXTRACTED_ as timestamp),cast(deflate._LAST_MODIFIED_SEQ_ as decimal(38,0)),cast(deflate._SCHEMA_CLASS_ as string),cast(deflate._CONTEXT_ID_ as decimal(12,0)),cast(deflate._IS_DELETED_ as boolean))

Spark command used to run: spark-submit --deploy-mode cluster--packages org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:0.14.0,software.amazon.awssdk:bundle:2.17.257,software.amazon.awssdk:url-connection-client:2.17.257 --conf spark.yarn.submit.waitAppCompletion=true --conf "spark.executor.extraJavaOptions=-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=\"/opt/spark\"" --conf spark.dynamicAllocation.enabled=true --conf spark.executor.maxMemory=32g --conf spark.dynamicAllocation.executorIdleTimeout=300 --conf spark.shuffle.service.enabled=true --driver-memory 8g --num-executors 1 --executor-memory 8g --executor-cores 5 iceberg_main.py

The problem here is, when i view the job in spark UI, i could see that shuffle write size and the number of records to upsert is very high compared to the actual number which should be 335K records based on the incoming batch. So, it looks like the partition pruning is not happening as expected. Because of this huge shuffle write my EMR cluster is running out of memory and could not able to complete the job. Please see the attached image

Screenshot 2022-10-03 at 4 27 05 PM

Please provide some insights on what went wrong here

singhpk234 commented 1 year ago

@arunb2w can u please attach the spark plans (logical & physical) for the same.

github-actions[bot] commented 1 year ago

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.

github-actions[bot] commented 1 year ago

This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale'