apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.47k stars 2.24k forks source link

Caching Tables in SparkCatalog via CachingCatalog by default leads to stale data #2319

Closed edgarRd closed 6 months ago

edgarRd commented 3 years ago

I've been experiencing a few issues with refreshing table metadata in Iceberg. I think caching in Iceberg is a bit flawed in the sense that if we use spark3 via SparkCatalog with cache-enable=true leading to wrap the Iceberg Catalog with CachingCatalog - which is the default - those tables will pretty much stay stale until:

  1. They are evicted, or
  2. There's a commit, which will trigger a refresh.

With this, it's a bit dangerous to have long lived TableOperations objects, e.g. multiple long lived Spark sessions reading the same table that gets modified. 

I don't think the TableOperations are cache friendly unless we expect to have stale data results in different sessions, e.g. with cache-enable=true we have the following behavior:

  1. First session reads table1:
  2. Second session reads table1. <-- up to this point, both sessions see the same data
  3. First session commits to table1
  4. Second session reads table1 <-- this read is stale, due to caching - changes in 3) are not reflected

In order for this flow to work, as in Hive tables, and represent the up-to-date data in both sessions, we can't use caching right now. While not checking for the up-to-date metadata location saves client calls, I think we should do checks in TableOperations to refresh the metadata when metadata location changes, with this we could cache the objects and have correctness on data freshness.

Caching is enabled by default in SparkCatalog (Spark 3) - For now, I think the default should be false, especially since currently it could lead to data inconsistency.

What do you think @rdblue @aokolnychyi ?

zhangdove commented 3 years ago

I also agree that the default 'cache-enable' is set to false and it is up to the user to turn this parameter on or not.

In particular, some long lived Spark Sessions may have exceptions due to default caching. There are two kinds of situations that we encounter:

  1. Asynchronous job clears old snapshots and data files (the higher the frequency of triggering, the more likely it is to occur)
  2. Asynchronous job cleaning orphan files will clear older metadata.json (write.metadata.previous-versions-max default value is 100)
aokolnychyi commented 3 years ago

None of the solutions is ideal. I wish Spark had a bit more control over that. My only worry about setting it as false is breaking self joins. Maybe, we can delegate that concern to Spark rather than to the caching catalog in Iceberg? I would not mind changing the default if there is enough consensus. I did ask this question earlier myself.

I'd like to hear from @rdblue @Parth-Brahmbhatt @danielcweeks as folks seem to rely on the current behavior.

pvary commented 3 years ago

I found the same issue and tried to start a discussion on the dev list discussion about it. The main points are:

Also chatted a little bit about it with @rdblue, and he mentioned that in Spark the CachingCatalog is also used for making sure that the same version of the table is retrieved every time during the same session. So getting back stale data is a feature, not a bug.

Based on this discussion my feeling is that the best solution would be to create a metadata cache around TableMetadataParser.read(FileIO io, InputFile file) where the cache key is the file.location().

The snapshots are immutable and I guess (no hard numbers on it yet) that the most resource intensive part of the table creation is metadata fetching from S3 and file parsing, so this would help us more and allows us to have a least complicated solution.

edgarRd commented 3 years ago

I think caching tables indefinitely in sessions (until garbage collected or a change happens) returning stale results is unintuitive (non-obvious) and inconsistent to be set enabled by default. Seems like multiple people have brought up this behavior in the past - at least @aokolnychyi and @pvary - signaling how unintuitive returning stale results by default is. Also, it's inconsistent with how Hive and Presto handle Iceberg tables; but also how Spark handles queries to non-Iceberg tables.

I'm not arguing for removing caching altogether - as @pvary mentioned, it can be a feature in some cases - but instead that the default should be the most intuitive and consistent behavior. If there are use cases that need to fix the state of the table at some point, then cache can be used explicitly rather than using it implicitly by default. In fact, Spark provides such constructs with https://spark.apache.org/docs/3.0.1/sql-ref-syntax-aux-cache-cache-table.html - shouldn't that be more in line with Spark's expected handling of cached tables?

My only worry about setting it as false is breaking self joins

If caching by default is needed in a full environment and it depends on having it enabled, this can be set in spark-defaults.conf and set the cache-enable=true regardless of the default value - it's anyways good practice to avoid depending on defaults. I think this is better rather than the other way around, expecting users to know they have to refresh their table to see if the table had changes. If the change of the default were to happen, we'd definitely need to include in docs/release notes. On the other hand, we could recommend users to set always cache-enabled=false to avoid depending on defaults and have fresh state, but as mentioned before, this seems less intuitive if you don't rely or know of this config.

Based on this discussion my feeling is that the best solution would be to create a metadata cache around TableMetadataParser.read(FileIO io, InputFile file) where the cache key is the file.location().

I agree, this would solve caching for saving resources. However, this does not address the self-join concerns mentioned before, since they rely on looking at the same snapshot.

I think @Parth-Brahmbhatt mentioned that there's a refresh store procedure, however I think that this goes in the wrong direction to support caching by default, i.e. users would need to know that tables are cached by default, which is problematic if the behavior is inconsistent with other compute engines or table formats. Instead, I think it's preferable to cache explicitly (either with https://spark.apache.org/docs/3.0.1/sql-ref-syntax-aux-cache-cache-table.html or a stored procedure); this makes the default behavior intuitive and consistent with other compute engines and other non-Iceberg tables in Spark.

pvary commented 3 years ago

Also, it's inconsistent with how Hive and Presto handle Iceberg tables; but also how Spark handles queries to non-Iceberg tables.

Hive also should use the same snapshot of the table on query level, but the refresh is expected between sessions and transactions (currently queries). Since Hive query execution spans multiple JVMs, we have to find our own way for snapshotting tables. We have already started working on this (See BaseTable serialization)

I agree, this would solve caching for saving resources. However, this does not address the self-join concerns mentioned before, since they rely on looking at the same snapshot.

I think the current CachingCatalog is too specific for general use but still has its own use-cases. Also, as this is a released feature some users might depend on its specific features. I would suggest to create a new one alongside it and when it is ready we might decide to deprecate the old.

Whatdo you think?

rdblue commented 3 years ago

Originally, we only cached tables for a short period of time, since the expectation is that we want to have fresh data if it is available, but have a reasonable window where results don't just change (i.e., while planning a self-join). The problem with this was that tables that were referenced by Spark plans would not get refreshed properly because the table would become an orphaned object and no other operations would call refresh after an update.

There are a few recent changes that make this less of a problem. First, Spark should now correctly invalidate cached plans, as should the Spark actions in Iceberg. I think that solving the problem the right way changes the trade-off and that we should re-introduce the table expiration.

Does that sound reasonable to everyone? @aokolnychyi, @pvary, @edgarRd, @zhangdove?

aokolnychyi commented 3 years ago

@rdblue, I am afraid the changes done in Spark and whatever we have in our actions invalidate only cached data. I don't think we invalidate plans. For example, if someone creates a view that references a V2 table, Spark won't invalidate it.

In our SparkTable, we have refreshEagerly flag, which is set as true whenever the caching catalog is not used. Using the caching catalog without expiry, ensures the view will always be invalidated whenever there is an operation in the current session. Operations happening in other query engines won't invalidate the Spark view, though.

It looks like we should be able to handle this case by moving the caching/invalidation logic to SparkTable. Thoughts?

nastra commented 3 years ago

I have to agree that the default caching behavior is unintuitive and rather surprising to users (as can be seen in https://github.com/projectnessie/nessie/issues/2165). Using the NessieCatalog with Spark, one can have a table X on branch dev and the CachingCatalog will eventually cache this table. Users can refer to this table also as X@dev and the NessieCatalog properly splits this name into the table name X and the branch name dev via TableReference. However, before this happens, we reach CachingCatalog#loadTable(..), where X@dev is treated as a separate table, meaning that an update to table X will leave an outdated X@dev table and users referring to that will always see stale data.

Unfortunately there's not much control that we have over the cache invalidation procedure to handle the described use case, so I was rather thinking that Catalogs should have a way to control their default caching behavior. For the NessieCatalog I think it makes more sense to disable caching by default.

@aokolnychyi @rdblue could you take a look at my proposal in https://github.com/apache/iceberg/pull/3230?

namrathamyske commented 1 year ago

@aokolnychyi agree with you that.

In our SparkTable, we have refreshEagerly flag, which is set as true whenever the caching catalog is not used. Using the caching catalog without expiry, ensures the view will always be invalidated whenever there is an operation in the current session. Operations happening in other query engines won't invalidate the Spark view, though.

@rdblue Do we have plans to invalidate spark plans ?

github-actions[bot] commented 7 months ago

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.

github-actions[bot] commented 6 months ago

This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale'