delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.17k stars 1.62k forks source link

[Spark] Skip collecting commit stats to prevent computing Snapshot State #2718

Closed istreeter closed 2 months ago

istreeter commented 3 months ago

Which Delta project/connector is this regarding?

Description

Before this PR, Delta computes a SnapshotState during every commit. Computing a SnapshotState is fairly slow and expensive, because it involves reading the entirety of a checkpoint, sidecars, and log segment.

For many types of commit, it should be unnecessary to compute the SnapshotState.

After this PR, a transaction can avoid computing the SnapshotState of a newly created snapshot. Skipping the computation is enabled via a spark configuration option spark.databricks.delta.commitStats.collect=false

This change can have a big performance impact when writing into a Delta Table. Especially when the table comprises a large number of underlying data files.

How was this patch tested?

Simple demo job that triggers computing SnapshotState, before this PR:

val spark = SparkSession
  .builder
  .appName("myapp")
  .master("local[*]")
  .config("spark.sql.warehouse.dir", "./warehouse")
  .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
  .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
  .getOrCreate

spark.sql("""CREATE TABLE test_delta(id string) USING DELTA """)

spark.sql("""
  INSERT INTO test_delta (id) VALUES (42)
  """)

spark.close()

Does this PR introduce any user-facing changes?

Yes, after this PR the user can set spark config option spark.databricks.delta.commitStats.collect=false to avoid computing SnapshotState after a commit.

talgo10 commented 3 months ago

Can you provide more details what is the impact/perfomance degradation when reading from the table? What would happen in batch read and stream read?

istreeter commented 3 months ago

Hi @talgo, you asked about reading from the table. This change has no impact whatsoever on reading, because it does not touch that part of the code. This is entirely about writing to the table.

I can provide some more details about the impact on writing to the table. Here is some log output from a spark job which wrote 1278 new rows into a Delta Table.

2024-02-29T08:30:58.129Z INFO org.apache.spark.sql.delta.OptimisticTransaction - [tableId=d19f5423,txnId=a3ba89ad] Attempting to commit version 251406 with 12 actions with Serializable isolation level
2024-02-29T08:31:13.146Z INFO org.apache.spark.sql.delta.DeltaLog - Creating a new snapshot v251407 for commit version 251406
2024-02-29T08:31:13.147Z INFO org.apache.spark.sql.delta.DeltaLog - Loading version 251407 starting from checkpoint version 251400.
2024-02-29T08:31:13.166Z INFO org.apache.spark.sql.delta.DeltaLogFileIndex - Created DeltaLogFileIndex(Parquet, numFilesInSegment: 1, totalFileSize: 457025090)
2024-02-29T08:31:13.166Z INFO org.apache.spark.sql.delta.DeltaLogFileIndex - Created DeltaLogFileIndex(JSON, numFilesInSegment: 7, totalFileSize: 77086)
2024-02-29T08:31:14.178Z INFO org.apache.spark.sql.delta.Snapshot - [tableId=d19f5423-16f3-4107-9454-f4c4816f9aa3] Created snapshot Snapshot(path=gs://<REDACTED>/prod/events/_delta_log, version=251407, metadata=REDACTED
2024-02-29T08:31:14.182Z INFO org.apache.spark.sql.delta.DeltaLog - Updated snapshot to Snapshot(path=gs://<REDACTED>/prod/events/_delta_log, version=251407, metadata=REDACTED
2024-02-29T08:31:14.182Z INFO org.apache.spark.sql.delta.Snapshot - [tableId=d19f5423-16f3-4107-9454-f4c4816f9aa3] DELTA: Compute snapshot for version: 251407
2024-02-29T08:32:19.519Z INFO org.apache.spark.sql.delta.Snapshot - [tableId=d19f5423-16f3-4107-9454-f4c4816f9aa3] DELTA: Done
2024-02-29T08:32:19.519Z INFO org.apache.spark.sql.delta.OptimisticTransaction - [tableId=d19f5423,txnId=a3ba89ad] Committed delta #251406 to gs://<REDACTED>/prod/events/_delta_log

From the timestamps on the left, you can see there is a 65 second gap between the lines DELTA: Compute snapshot for version: 251407 and DELTA: Done. This is where the Delta library is computing the snapshot state.

After this PR, we completely eliminate that 65 second gap. The overall time to write rows is made faster by 65 seconds.

This change will have most impact on streaming jobs that write into Delta, because those types of jobs need to do frequent commits.

felipepessoto commented 2 months ago

@vkorukanti if you have a chance to review this. Looks a great improvement 137s vs 63s, reduction of 54%: https://github.com/delta-io/delta/pull/2718#issuecomment-1978290956

And it only impact Stats Logs.

scovich commented 2 months ago

Can you provide more details what is the impact/perfomance degradation when reading from the table? What would happen in batch read and stream read?

you asked about reading from the table. This change has no impact whatsoever on reading, because it does not touch that part of the code. This is entirely about writing to the table.

@istreeter If I understand correctly, this change would shift the cost of state reconstruction from the writer who produces the post-commit snapshot, to a subsequent reader who consumes that snapshot on the same cluster. If no such reader exists (e.g. the table is write-mostly or the reader is on a different cluster), then the change is a net win. That's what the microbenchmark above was measuring: just writes, no read afterward.

For an iterated workload that does write -> read -> ... -> write -> read, I would expect no change in the overall latency of a write -> read pair. Does that sound correct?

istreeter commented 2 months ago

Hi @scovich yes that sounds correct, as you described it. If every write is always followed by a read on the same cluster, then there is no net change in the number of state reconstructions. This PR would just shift the state reconstruction to a different place.

However, if you consider any other workload pattern, (in which not every write is followed by a read on the same cluster) then there is benefit from not reconstructing the state as part of the write. For example, this workload pattern would also benefit: write -> write -> read -> write -> write -> read -> write -> write -> read.

At my company, we use a dedicated cluster just for writing. So the pattern is simply write -> write -> write -> write -> write etc. There should be no reason to ever do state reconstruction on that usage pattern.

ryan-johnson-databricks commented 2 months ago

At my company, we use a dedicated cluster just for writing. So the pattern is simply write -> write -> write -> write -> write etc. There should be no reason to ever do state reconstruction on that usage pattern.

Is the dedicated cluster writing repeatedly? Or is it ephemeral, with each cluster instance performing one write and then shutting down?

Asking because even "blind" writes actually do involve reads. At a minimum, they must access the snapshot's protocol and metadata. If the workload uses app ids to track stream progress, then that triggers a full state reconstruction as well. If it's a merge or update instead of a blind insert, then there's also a full-blown query in the write path.

So for write -> write on the same cluster, we have:

In such a case, this PR wouldn't change overall timing. It just changes whether "post-commit snapshot" or "write" step pays the snapshot cost.

Note: I 100% agree the optimization is helpful when it applies, and that we should probably incorporate it into Delta. I just worry it might not apply as often as we wish it did, in practice. Is the microbenchmark fully representative of the actual workload you hope to optimize?

felipepessoto commented 2 months ago

If we have multiple writers, computing the Snapshot on read also improves perf by skipping loading a snapshot (after my write) that will be stale anyway because of the other writers?

istreeter commented 2 months ago

Hi @ryan-johnson-databricks the cluster we use for writing is long-lived and not ephemeral. Once the cluster is up, and once the spark context is created, then we re-use that spark context and write a batch of rows in Delta every ~1 minute. Each write is an append; never a merge, never an update.

Is the microbenchmark fully representative of the actual workload you hope to optimize?

The logs I shared in this comment are real logs from a production workload. The logs were taken after the cluster had already been up and running for several hours. Imagine a repeating loop with those log lines repeated again and again.

I am certain that eliminating the state reconstruction would make this workload faster. However, I believe this PR in its current state only goes half way to solving that problem! After this PR, Delta will avoid computing state for the post-commit snapshot. But currently, Delta also computes state for the pre-commit snapshot on this line:

val readRowIdHighWatermark =
  RowId.extractHighWatermark(snapshot).getOrElse(RowId.MISSING_HIGH_WATER_MARK)

So far I have not worked out how to avoid computing state on that line, although I believe it should be possible. For our workload, we are not interested in row tracking or watermarks. (And even if we were -- I still reckon it could be achieved without computing snapshot state).

One final thought... it would be awesome if we could have a unit test that proves that we don't compute snapshot state during writes. This was beyond my abilities to add such a unit test!

ryan-johnson-databricks commented 2 months ago

the cluster we use for writing is long-lived and not ephemeral. Once the cluster is up, and once the spark context is created, then we re-use that spark context and write a batch of rows in Delta every ~1 minute. Each write is an append; never a merge, never an update.

Is the microbenchmark fully representative of the actual workload you hope to optimize?

The logs I shared in https://github.com/delta-io/delta/pull/2718#issuecomment-1978290956 are real logs from a production workload. The logs were taken after the cluster had already been up and running for several hours. Imagine a repeating loop with those log lines repeated again and again.

My questions actually came from looking at those logs -- they ONLY show the commit path itself, and creation of the post-commit snapshot (251407). They do not include creation of the snapshot the transaction read from (251406). If the workload runs in a loop, then one minute later we'd get a second transaction that starts from 251407 (and triggers P&M at a minimum), and which tries to create 251408. Thus, the first transaction avoids the cost but the second transaction ends up paying it one minute later.

Can you check what happens across 2-3 inserts instead of just the commit step of one insert in isolation? I expect that the 65 seconds which disappeared from that log snippet really just moved somewhere else.

it would be awesome if we could have a unit test that proves that we don't compute snapshot state during writes

Should be easy enough to check Snapshot.stateReconstructionTriggered after the operation you care about, at least for the really expensive computed state. It wouldn't cover protocol, metadata, or checkpoint size tho, because those don't rely on state reconstruction in the first place.

NOTE: In general, we can't avoid computing at least protocol and metadata, because they're required to interpret the snapshot correctly (table schema, table features, etc). But they're also much cheaper to compute than the full state reconstruction.

ryan-johnson-databricks commented 2 months ago

Delta also computes state for the pre-commit snapshot on this line:

val readRowIdHighWatermark =
  RowId.extractHighWatermark(snapshot).getOrElse(RowId.MISSING_HIGH_WATER_MARK)

So far I have not worked out how to avoid computing state on that line, although I believe it should be possible.

That actually looks like a simple (performance) bug: AFAIK, the rowid high watermark only matters if the table supports rowids, but the extractHighWatermark method body lacks any isSupported(snapshot.protocol) check that could early-return None?

istreeter commented 2 months ago

Can you check what happens across 2-3 inserts instead of just the commit step of one insert in isolation? I expect that the 65 seconds which disappeared from that log snippet really just moved somewhere else

Sure, I ran a simple script like this:

(1 to 100).foreach { i =>
  spark.sql(s"INSERT INTO delta.`file:///tmp/test_delta` (v) values ($i)")
}

I attach here two versions of the stdout logs:

From the logs, you will see that before this PR, we get log lines like DELTA: Compute snapshot for version: x for every single commit. Whereas after this PR, we only get that log line once every 10 commits, corresponding to the checkpoint interval. For all other non-checkpointing commits, it does not compute the snapshot state.


That actually looks like a simple (performance) bug: AFAIK, the rowid high watermark only matters if the table supports rowids, but the extractHighWatermark method body lacks any isSupported(snapshot.protocol) check that could early-return None?

The delta spec here makes a distinction between when row tracking is supported vs when row tracking is enabled. I think ideally we want to avoid computing snapshot state whenever row tracking is not enabled. Whereas I think your suggested change would only avoid the computation if row tracking is not supported.

johanl-db commented 2 months ago

The delta spec here makes a distinction between when row tracking is supported vs when row tracking is enabled. I think ideally we want to avoid computing snapshot state whenever row tracking is not enabled. Whereas I think your suggested change would only avoid the computation if row tracking is not supported.

The spec does require keeping track of the high-water mark and setting base row ids / default row commit whenever the feature is supported, not only enabled.

The readRowIdHighWatermark value isn't currently used when the feature isn't supported: https://github.com/delta-io/delta/blob/aa0af003068a9491b7b6830712082af3f6fc87de/spark/src/main/scala/org/apache/spark/sql/delta/ConflictChecker.scala#L521

so as Ryan suggested, the simplest would be to skip the expensive snapshot recomputation if the feature isn't supported

ryan-johnson-databricks commented 2 months ago

running the same script including my PR and also changing this line to simply val readRowIdHighWatermark = RowId.MISSING_HIGH_WATER_MARK i.e. the other problem I mentioned which I've not fixed yet.

From the logs, you will see that before this PR, we get log lines like DELTA: Compute snapshot for version: x for every single commit. Whereas after this PR, we only get that log line once every 10 commits, corresponding to the checkpoint interval. For all other non-checkpointing commits, it does not compute the snapshot state.

Ah, that makes sense now. I didn't know you had made additional changes to avoid triggering snapshot state on the read path, in addition to the write path changes in this PR.

That actually looks like a simple (performance) bug: AFAIK, the rowid high watermark only matters if the table supports rowids, but the extractHighWatermark method body lacks any isSupported(snapshot.protocol) check that could early-return None?

The delta spec here makes a distinction between when row tracking is supported vs when row tracking is enabled. I think ideally we want to avoid computing snapshot state whenever row tracking is not enabled. Whereas I think your suggested change would only avoid the computation if row tracking is not supported.

I had to word the my suggested change the way I did, in order to comply with the spec:

When the feature rowTracking exists in the table protocol's writerFeatures, then we say that Row Tracking is supported. In this situation, writers must assign Row IDs and Commit Versions

Writers can't assign row ids without knowing the current high watermark, so if the feature is "supported" in the table protocol then we have to pull the high watermark in order to commit.

In practice, I don't think the distinction between "enabled" and "supported" should matter, because "supported" is anyway a transient state, used e.g. while a table is trying to enable rowids for the first time:

A backfill operation may be required to commit add and remove actions with the baseRowId field set for all data files before the table property delta.enableRowTracking can be set to true.

Update: ninja'd by @johanl-db

istreeter commented 2 months ago

OK I read bit more of the protocol, and now I understand the point you're making: the writer MUST calculate the high watermark if rowTracking exists in the table protocol's writerFeatures.

That is OK for me, because the tables I write to do not have rowTracking in the table's features. So I will take your suggestion and I will amend extractHighWatermark to return None if row tracking is not supported.

(I do wonder if it is possible to implement a cheap way of checking the high watermark. Similar to how there is a cheap way to check protocol and metadata. Then tables with row tracking enabled could also get the benefit of the change I'm making. But I won't pursue that idea in this current PR).


I have a question for the people following this thread....

I do believe this PR will make a big performance difference for many uses of Delta. But currently, users would only get that speed-up if they change commitStats.collect to false. Do you think I should abandon this configuration option? Maybe it is better to say that all writers should avoid computing snapshot state during transactions. Users should get the benefit of this change without needing to know about the config option.

I only added the configuration option because I didn't want to upset someone who is depending on that logging.

scovich commented 2 months ago

I do believe this PR will make a big performance difference for many uses of Delta. But currently, users would only get that speed-up if they change commitStats.collect to false. Do you think I should abandon this configuration option? Maybe it is better to say that all writers should avoid computing snapshot state during transactions. Users should get the benefit of this change without needing to know about the config option.

I only added the configuration option because I didn't want to upset someone who is depending on that logging.

That's a great question and I don't have a good answer. More configs are always annoying, but people also complain (loudly) if their workload breaks.

felipepessoto commented 2 months ago

I think numFilesTotal and sizeInBytesTotal are important stats and we should keep the config. It could be useful to track the growth of a table

istreeter commented 2 months ago

@felipepessoto I do see your point that it's useful to track the growth of a table over time. But it seems that information just isn't naturally available inside OptimisticTransaction, and so I don't think it's the natural place to put that logging.

How would you feel if numFilesTotal and sizeInBytesTotal were tracked every time the snapshot state is computed? I'm thinking I could add a line somewhere around here like:

val stats = SnapshotStateStats(
  numFilesTotal = _computedState.numOfFiles,
  sizeInBytesTotal = _computedState.sizeInBytes  
)
recordDeltaEvent(deltaLog, "delta.snapshot.stats", data = stats)

Admittedly you would not get the stats for every transaction. But you would get it every time Delta does a post-commit checkpoint. And you would get it every time Spark reads a version of the table.

felipepessoto commented 2 months ago

I also don’t have a good answer, depends on how people use it, that’s why I think we need the config flag.

@sezruby any thoughts about it?

sezruby commented 2 months ago

Some users might use it to monitor their workload. There's no good way to monitor Delta table workload, so it would be better to keep the config. It's existing code, no regression risk on keeping it. Yet, there's a way to recalculate the metrics when it's needed, we could remove it I think. We can't rely on tracking everytime, as concurrent updates could be done in different cluster/app.

istreeter commented 2 months ago

If we keep the config option, and if it is set to true by default (i.e. the backwards-compatible behaviour) then we all know realistically most users will never change that config to something different.

I just think that is a real lost opportunity! I'll stress it again: I strongly believe this PR can make Delta workloads much faster for many users under many circumstances. I suspect, without proof, that Delta users will appreciate that performance gain more so than they would appreciate the logging.

I think the options available to us are:

  1. Keep the config option, and set the default option to commitStats.collect = true. Users do not get the performance benefit of this PR by default, but it means logging is the same as it was before.
  2. Keep the config option, and set the default option to commitStats.collect = false. Users do get the performance benefit by default. And if someone cares about monitoring those stats then they can enable the config option again.
  3. Simply do not have a config option. All users get the performance gain. To partially compensate for removing those stats from the logs, we can instead add some extra logging in the place I mentioned before

I really think 3 is the best option. But you folks here have better knowledge than me of what Delta users might want.

scovich commented 2 months ago

I tend to agree that we should ~enable~disable the config by default if we use a config, [so users get the performance]. I wish there were a way to know whether people actually rely on the logging in practice to make a decision between config vs. no config. Safe choice might be to have the config (enabled by default) and remove the config in a future release if nobody squawks?

Update: Fix inverted logic...

istreeter commented 2 months ago

Thank you everyone for pointing me in the right direction :)

I have made the following changes to the PR:

felipepessoto commented 2 months ago

@istreeter, now that we are only conditionally collecting numFile and size, I wonder if would be possible to cheaply and reliably calculate it?

example: size = previousSize + bytesNew numFiles = previousNumFiles + addfiles - removedFiles

felipepessoto commented 2 months ago

UPDATE: bytesNew is only added files, doesn't take removed files in consideration And RemoveFile.size is optional field

istreeter commented 2 months ago

@istreeter, now that we are only conditionally collecting numFile and size, I wonder if would be possible to cheaply and reliably calculate it?

@felipepessoto I think the only way to get previousSize and previousNumFiles is to compute the Snapshot State of the pre-commit snapshot. The great thing about this PR in its current state is we don't need to compute the Snapshot State of either the pre- or post- commit snapshot states.

If you think it's possible to cheaply do size = previousSize + bytesNew then I'm willing to take a go, but you'll have to point me in the right direction how to do it.

prakharjain09 commented 2 months ago

@istreeter Thanks for working on this change. Please resolve the conflicts on the PR so that it could be merged?

istreeter commented 2 months ago

@prakharjain09 Fixed!! It was an easy rebase. Nothing significant changed since it was approved.

prakharjain09 commented 2 months ago

Seems like some unit tests are failing with this change and needs to be fixed.

istreeter commented 2 months ago

Hi @prakharjain09 the unit tests now all pass 🎉

I pushed one extra commit which amends a couple of pre-existing unit tests. I think the fix is reasonable -- it just sets things up so the starting point is from the same place that is was before this PR.

I pushed it as an extra commit so it is obvious what I have changed.