delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
6.98k stars 1.6k forks source link

Add support for delta atomic commit read #1079

Open hzzlzz opened 2 years ago

hzzlzz commented 2 years ago

Description

This PR adds support for atomic commits read when reading data from a delta table.

PIP

Resolves #1026

How was this patch tested?

Unit tests

Does this PR introduce any user-facing changes?

Add a new config when reading delta as a streaming source

val q = spark.readStream
  .format("delta")
  .option("readAtomicCommits", "true")
  .load(path)
hzzlzz commented 2 years ago

I wonder if there is an API to manually create multi-action commits with custom data. We found some samples in DeltaLogSuite, but they seem for creating logs purely. Appreciate any idea how this option should be implemented and tested!

allisonport-db commented 2 years ago

Hi @hzzlzz thanks for making this PR and writing up the PIP. We'll review this as soon as possible.

I wonder if there is an API to manually create multi-action commits with custom data. We found some samples in DeltaLogSuite, but they seem for creating logs purely. Appreciate any idea how this option should be implemented and tested!

I think testing it the same way we test maxFilesPerTrigger and maxBytesPerTrigger should be good. Is there a reason you want to do something else?

hzzlzz commented 2 years ago

Hi @hzzlzz thanks for making this PR and writing up the PIP. We'll review this as soon as possible.

I wonder if there is an API to manually create multi-action commits with custom data. We found some samples in DeltaLogSuite, but they seem for creating logs purely. Appreciate any idea how this option should be implemented and tested!

I think testing it the same way we test maxFilesPerTrigger and maxBytesPerTrigger should be good. Is there a reason you want to do something else?

That is good. Please go ahead and review. The reason for asking is I need multi-action commits to test this option and I'm relying on repartition for creating that right now. But it seems not guaranteed to produce the exact number of files (for instance 2) if the data volume is small in my offline test so I write 50 items each time in the unit test.

zsxwing commented 2 years ago

@hzzlzz If I understand correctly, the behavior of readAtomicCommits would be if maxFilesPerTrigger or maxBytesPerTrigger causes us to stop in the middle of commit log file, readAtomicCommits will force us to continue to read the log file until we reach the end. Right?

hzzlzz commented 2 years ago

@hzzlzz If I understand correctly, the behavior of readAtomicCommits would be if maxFilesPerTrigger or maxBytesPerTrigger causes us to stop in the middle of commit log file, readAtomicCommits will force us to continue to read the log file until we reach the end. Right?

Exactly! And in the case of isStartingVersion=true, it will read the whole snapshot in one micro-batch.

zsxwing commented 2 years ago

And in the case of isStartingVersion=true, it will read the whole snapshot in one micro-batch.

I'm a bit concerned about this. Loading the entire snapshot likely will cause stability issues. You mentioned that you built a service for Delta Lake: https://github.com/delta-io/delta/issues/1026#issuecomment-1105975136 How do you plan to handle the case that a big snapshot may take down your service?

guoliqiang commented 2 years ago

And in the case of isStartingVersion=true, it will read the whole snapshot in one micro-batch.

I'm a bit concerned about this. Loading the entire snapshot likely will cause stability issues. You mentioned that you built a service for Delta Lake: #1026 (comment) How do you plan to handle the case that a big snapshot may take down your service?

This is a totally reasonable concern.
Because snapshot has lost previous transaction info, we finalized two solutions in our internal discussion.

  1. Do NOT support this new option (e.g. throw an exception) if the application use isStartingVersion=true
  2. Read the whole snapshot in one micro-batch (like @hzzlzz mentioned) when the application use isStartingVersion=true , and explicitly call out this behavior in the documentation to let the application make a decision.

We adopt No. 2 in our system because we have small/medium tables which is OK to read the whole snapshot, and this is our cold start use case too.
Not sure which one delta lake prefers (or any other idea)?

zsxwing commented 2 years ago

I feel neither is ideal. We don't want to build a feature that a user can shoot themself easily. In a second thought, will your problem be solved by using Merge Into? We have a few examples in our doc to show how to use Merge Into to track updates and make idempotent changes a Delta table: https://docs.delta.io/latest/delta-update.html#merge-examples

guoliqiang commented 2 years ago

Thanks @zsxwing Although our state store/sink is not delta tables due to various reasons (e.g. perf), as we mentioned on #1026, adding additional information (e.g. timestamp for each key) and do not perform hard-delete operation could also resolve our problem partly, we thought change streaming reader behavior is a cheapest & generic solution.

Thinking further, I thought only delta table stream source could provide such ability, Kafka/Eventhub has no straight forward way.

zsxwing commented 2 years ago

Thinking further, I thought only delta table stream source could provide such ability, Kafka/Eventhub has no straight forward way.

Yeah. If you would like to provide some general solution for data sources other than Delta, changing just Delta would not be sufficient. Looks like you have to add something to the data in order to achieve this. Do you still want to add this into Delta Lake? Otherwise, I'm inclined to close this one as I prefer to not add an unscalable solution if possible.

hzzlzz commented 2 years ago

Thanks @zsxwing To clarify, with this change, we don't have to add anything to the data, just Delta side change is sufficient. In our use case, there is within transaction data disordering caused by multiple worker nodes generating multiple parquet files in parallel. If the entire transaction is consumed in one micro-batch, there is no need to perform data dedup/reordering in a cross micro-batch manner, which would require a state store or state sink.

guoliqiang commented 2 years ago

Yeah. If you would like to provide some general solution for data sources other than Delta, changing just Delta would not be sufficient. Looks like you have to add something to the data in order to achieve this. Do you still want to add this into Delta Lake? Otherwise, I'm inclined to close this one as I prefer to not add an unscalable solution if possible.

Thanks @zsxwing When I say "only delta table source could provide such ability", I intend to mean this option will be a unique feature for delta lake to attract customers :) . In fact, in our internal system, we are moving away from Kafka/EventHub to delta lake due to its cool ACID/unified Batch and Streaming read features. Because our source is only delta table, we will not need to change the data with this option.
This is our thought. :)

scottsand-db commented 2 years ago

I feel neither is ideal. We don't want to build a feature that a user can shoot themself easily. In a second thought, will your problem be solved by using Merge Into? We have a few examples in our doc to show how to use Merge Into to track updates and make idempotent changes a Delta table: https://docs.delta.io/latest/delta-update.html#merge-examples

@zsxwing , what's the next step for figuring out what to do with these two options?

zsxwing commented 2 years ago

@scottsand-db since you are working on CDF and it actually needs to read a commit entirely, could you think about how to unify CDF and the normal streaming Delta source together?

scottsand-db commented 2 years ago

Hi @hzzlzz - as @zsxwing mentioned, I am working on CDF (Change Data Feed) #1105. The next PR I'm working on is CDF + Streaming.

One thing that CDF generates for UPDATE operations is pre-image and post-image row-level changes. For CDC reads of AddCDCFiles, we want to admit the entire commit or nothing at all (to avoid having the pre-image and post-image row-level changes in separate micro-batches). (For AddFile or RemoveFIle, we do not have this requirement).

So, I think it makes sense to wait for this CDF PR to be merged first (it's a WIP) and then you can add your read-atomic-commits to the latter part of CDF (for Add and Remove files).

Does this sound good?

scottsand-db commented 2 years ago

Update: that CDF PR is here: https://github.com/delta-io/delta/pull/1154