apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.48k stars 2.24k forks source link

Table maintenace procedure(expire_snapshots) not work as expceted #10907

Open toien opened 3 months ago

toien commented 3 months ago

Query engine

Spark SQL on AWS EMR(7.1.0)

Versions:

Question

First i create an iceberg table like:

spark-sql (test_db)> show create table my_catalog.test_db.dws_table;
CREATE TABLE my_catalog.test_db.dws_table (
  dt STRING NOT NULL,
  brand_code STRING NOT NULL,
  event_type STRING NOT NULL,
  sub_event_type STRING NOT NULL,
  success_count INT,
  failed_count INT)
USING iceberg
LOCATION 's3://xxx/test/test_db.db/dws_table'
TBLPROPERTIES (
  'current-snapshot-id' = '3745013875610091505',
  'format' = 'iceberg/parquet',
  'format-format' = '2',
  'format-version' = '2',
  'identifier-fields' = '[dt,brand_code,sub_event_type,event_type]',
  'write.metadata.delete-after-commit.enabled' = 'true',
  'write.metadata.previous-versions-max' = '5',
  'write.parquet.compression-codec' = 'zstd',
  'write.upsert.enabled' = 'true')

Flink streaming jobs will calc results and upsert into this table. so that would create many snapshots by Flink checkpoints:

spark-sql (test_db)> select COUNT(*) from my_catalog.test_db.dws_table.snapshots;
2130

Here is the problem: When I use Spark SQL do expire_snapshots, It DO cost time to execute this job

spark-sql (test_db)> CALL my_catalog.system.expire_snapshots(
                   >   table => 'test_db.dws_table',
                   >   retain_last => 5
                   > );
deleted_data_files_count        deleted_position_delete_files_count     deleted_equality_delete_files_count     deleted_manifest_files_count    deleted_manifest_lists_count    deleted_statistics_files_count
0       0       0       0       0       0
Time taken: 45.336 seconds, Fetched 1 row(s)

But nothing been deleted!

spark-sql (test_db)> select COUNT(*) from my_catalog.test_db.dws_table.snapshots;
2164

And data files on S3 still there.

Spark Job finished successfully: iceberg-maintenance-failed

The same problem occurs when call rewrite_data_files TOO, small data files are NOT been compacted(merged).

RussellSpitzer commented 3 months ago

Expire snapshots only removes data files which are no longer needed by any remaining snapshots. The output of your command shows no files were needed to be removed. Based on the fact that snapshot number seems to have increased in your second request I would be suspicious that you have snapshots older than default time for that command. Retain last is a "minimum" not a maximum, if your expire snapshots doesn't specifically state what the age limit is, it will only expire snapshots older than 5 days (I think, check the docs to be sure).

The bin-pack command you shows it only found 2 files to compact.

Most of the time when folks have this issue it is because they don't have enough small files in a given partition to trigger compaction. By default, the command will only compact files within a partition if there are 5 or more files in that partition that need compaction. See the docs for more info

toien commented 3 months ago

Snapshots number increased because Flink job still writing data to table.

In my opinion, it's better to clerify retain_last parameter's "minimum" function in doc:

Number of ancestor snapshots to preserve regardless of older_than.

Summary

After doing some tests, I finally start understanding iceberg's mainteinance procedures. Hope this help people are new to iceberg like me.

rewrite_data_files

Rewrite data files is a procedure reading source small files, compacting, and writing a new one. It won't delete old small files.

Data files, as leaf level of iceberg table layer, they belong manifest files. Deleting source small files will break its manifest file.

This procedure will optimize data files(usually merging) and create a new version(snapshot) of table.

rewrite_manifests

Unlike data files, rewrite_manifests will replace old ones.

This procedure will optimize manifest files(usually merging) and create a new version(snapshot) of table.

expire_snapshots

Always use older_than paramter.

If data files expected to be deleted still remains in S3 or HDFS, recheck metadata tables after executing procedure. They may be linked in manifests or entries.

Maintenance tips

Say we have a table upserting by flink jobs, which will create a lot data files and metadata. Hourly executing these would optimize iceberg table:

When it comes to partitioned table, say partition by day:

pvary commented 3 months ago

rewrite_manifests

Unlike data files, rewrite_manifests will replace old ones.

Actually, this procedure also just creates a new snapshot and keeps the old metadata files for the original snapshot. If you want to remove the old metadata files, you have to run the ExpireSnapshots procedure.

As you are using Flink to write data to an Iceberg table, you might want to follow https://github.com/orgs/apache/projects/358. This ongoing project aims to provide a Flink-specific solution for the problems mentioned above.

SanjayKhoros commented 2 months ago

I'm facing the exact issue myself !! The metadata/ folder is piling up and the contents from data/ & metadata/ are not getting deleted.

Sharing my table properties just to make sure I didn't make any errors there: Table Properties {history.expire.max-snapshot-age-ms=300000, write.metadata.previous-versions-max=10, write.parquet.compression-codec=zstd, write.manifest-target-size-bytes=33554432, read.split.metadata-target-size=67108864, write.metadata.delete-after-commit.enabled=true, write.target-file-size-bytes=134217728, read.split.target-size=134217728, history.expire.min-snapshots-to-keep=3}

newDelete() code is soft deleting the data & it doesn't appear in the Athena query:

Expression olderThanCutoff = Expressions.lessThan("created_epoch", cutoffDateMillis);
icebergTable.newDelete()
      .deleteFromRowFilter(olderThanCutoff)
       .commit()
icebergTable.refresh();

I'm also doing the rewriteManifests & expireSnapshots, The process is succeeding without errors but none of the contents are getting deleted. metadata/ folder only grows higher & higher !

            FileIO fileIO = icebergTable.io();
            icebergTable.rewriteManifests()
                    .clusterBy(file -> file.partition().get(0, String.class))
                    .rewriteIf(file -> file.length() < 10 * 1024 * 1024)
                    .deleteWith(fileIO::deleteFile)
                    .commit();

            icebergTable.expireSnapshots()
                    .expireOlderThan(cutoffDateMillis)
                    .commit();
            icebergTable.refresh();

Can someone please point out the issue in my code and why the metadata/ keeps rising ? I'm trying every options but I'm stuck without ideas now !

RussellSpitzer commented 2 months ago

My gut feeling there is that your cutoffDateMillis is not what you think it is. That looks correct to me though.

SanjayKhoros commented 2 months ago

Thanks for the quick reply @RussellSpitzer

Sharing little more details, Flink version - 1.20.0 Iceberg version - 1.6.1

long cutoffDateMillis = LocalDateTime.now() .minusDays(Long.parseLong(flinkConfig.dataCleanup.retentionPeriod)) .toInstant(ZoneOffset.UTC) .toEpochMilli();

I printed my cutOffDateMillis -> 1726313530137 Currently testing the issue in my Dev environment so changed the retain day to 2 days.

Like I mentioned earlier, Soft delete is working without any issues. When I query the records based on day, I only see 2 days of data, older records are not appearing ! Major issue is the data not getting cleaned up from S3.

My data/ folder is hardly around 600MB while metadata/ is around 1TB ! I get no errors executing the above rewriteManifests() & expireSnapshots() as well !

Based on your comments above, I thought maybe I should run deleteOrphanFiles so added the below support as well:

        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-spark-runtime-3.4_2.12</artifactId>
            <version>${iceberg.version}</version>
        </dependency>

And added it below expireSnapshots which is inside a try catch block

            icebergTable.expireSnapshots()
                    .expireOlderThan(cutoffDateMillis)
                    .commit();
            icebergTable.refresh();

            logger.info("executing deleteOrphanFiles " + System.currentTimeMillis());
            SparkActions.get().deleteOrphanFiles(icebergTable)
                    .olderThan(cutoffDateMillis)
                    .execute();
            logger.info("deleteOrphanFiles completed successfully");

Currently the service is on hold after "executing deleteOrphanFiles" log for the past 4 hours ! I'm hoping it does something or throws any error atleast.

If you have any suggestions please do share, I'm out of options and references at this point, Thank you !

UPDATE: SparkActions's deleteOrphanFiles wasn't helpful too !! The service just froze without erros.