apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.26k stars 2.18k forks source link

View is no longer in sync with table after catalog cache entry expires #8977

Open sethwhite-sf opened 11 months ago

sethwhite-sf commented 11 months ago

Apache Iceberg version

1.4.1 (latest release)

Query engine

Spark

Please describe the bug 🐞

We have found that temporary views that reference an iceberg table become stale when catalog caching is enabled: spark.sql.catalog.catalog-name.cache-enabled=true.

Initially, when a view is created

Dataset rdd = spark.read().format("iceberg").load("table1"); rdd.createOrReplaceTempView("view1");

The view and catalog cache reference the same org.apache.iceberg.Table object and the view reflects any changes that the application makes when it is queried:

spark.sql("SELECT * from view1").show(); // query returns latest state of the table

However, once cache expiry occurs (after 30 seconds by default when caching is enabled), subsequent updates to the table, such as

spark.sql("DELETE FROM table1 AS t WHERE t.id IS NULL");

cause a new entry for the table to be created in the cache and the view no longer sees any of the changes that are made---it becomes stale---because the view is still using the original org.apache.iceberg.Table object which references an Iceberg table snapshot that is now no longer current. The view and cache are no longer in sync.

spark.sql("SELECT * from view1").show(); // No longer returns latest state of the table

The unit test below illustrates the problem. The test fails when the default catalog caching is enabled.

@Test
  public void testViewConsistencyAfterCacheExpiration() throws Exception {
    Assume.assumeFalse("Avro does not support metadata delete", fileFormat.equals("avro"));
    createAndInitUnpartitionedTable();

    sql("INSERT INTO TABLE %s VALUES (1, 'hr'), (2, 'hardware'), (null, 'hr')", tableName);

    Dataset<Row> rdd = spark.read().format("iceberg").load(tableName);
    rdd.createOrReplaceTempView("view1");
    assertEquals(
            "Should have expected rows",
            ImmutableList.of(row(null, "hr"),row(1, "hr"), row(2, "hardware")),
            sql("SELECT * FROM %s ORDER BY id", "view1"));

    Thread.sleep(40000);
   // Default cache expiration is 30 seconds.

    sql("DELETE FROM %s AS t WHERE t.id IS NULL", tableName);
    assertEquals(
            "Should have expected rows",
            ImmutableList.of(row(1, "hr"), row(2, "hardware")),
            sql("SELECT * FROM %s ORDER BY id", "view1"));
  }
singhpk234 commented 11 months ago

This is interesting, sounds more of spark's view handling issue than iceberg spark integration to the best of my understanding the spark temp views (resolved plan) get cached as well in the session catalog :

https://github.com/apache/spark/blame/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala#L127

wondering where exactly the fix should be

namrathamyske commented 10 months ago

@singhpk234 , rdd ( which view1 is created on) has a reference to the logical plan which has a reference to older versions of iceberg table. After cache expires, no one is refreshing older version table in rdd. Iceberg should ideally refresh the tables irrespective of cache enabled or disabled.

github-actions[bot] commented 1 week ago

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.