delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
6.98k stars 1.6k forks source link

With Delta lake, how to do an "overwrite" output mode using spark structured streaming without deleting all the data and the checkpoint #1346

Closed jbin121867740 closed 1 year ago

jbin121867740 commented 1 year ago

I have this delta lake in ADLS to sink data through spark structured streaming. We usually append new data from our data source to our delta lake, but there are some cases when we find errors in the data that we need to reprocess everything. So what we do is delete all the data and the checkpoints and re-run the pipeline, having the correct data inside our ADLS.

But the problem with this approach is that the end-users stay one whole day without the data to analyze (because we need to delete it to re-run). So, to fix that, I would like to know if there's a way to do an "overwrite" output using the structured streaming so we can overwrite the data into a new delta version, and the end-user could still query the data using the current version.

I don't know if it's possible using streaming, but I would like to know if anyone had a similar problem and how you went to solve it :)

Thanks!

nkarpov commented 1 year ago

Hi @jbin121867740 - I think based on your description you might be able to use RESTORE to wind back the table to a non-corrupted state (instead of deleting all of the data) while you attend to the upstream.

Would love to hear more if that's insufficient!

Kimahriman commented 1 year ago

I don't think this is related to corrupted state, we face similar problems. We stream data from raw files (json, csv, etc.) to Delta tables, but occasionally we need to completely reprocess the data, either to fix bugs or some other reason that can't easily be done with an update. Right now we version our Delta tables by the path name, so if we need to make an update to our v1 table, we would simply start a new stream to v2 and once that has caught up, switch our Hive table to point to the v2 path instead of the v1 path. Kinda manual and makes it difficult for anything streaming from these Delta tables that we then also need to switch and start over.

zsxwing commented 1 year ago

Will running a batch query to overwrite the table work? For example,

val df = // a dataframe of new data
df.createOrReplaceTempView("temp_view")
sql("INSERT OVERWRITE delta.`my-table-path` SELECT * FROM temp_view")

If you stop any concurrent writers and just run the above batch query, it will have no downtime for downstream batch queries.

Kimahriman commented 1 year ago

problem is if you do a batch overwrite you don't get the streaming state for the data you just wrote

nkarpov commented 1 year ago

As for the OP @jbin121867740, I've understood the two requirements as (1) reprocess & overwrite all the data & (2) simultaneously allow users to query the existing data. I think @zsxwing suggestion achieves that (you can also run RESTORE first so the table goes back to current - 1 version if you want your users to avoid querying version with errors in the data).

Having downstream stream readers @Kimahriman is definitely more complicated and I think wades away from core Delta table functionality and further into orchestration/infra. For example, I think a subset of Live Tables functionality addresses this multi-stream rewind/error handling.

Kimahriman commented 1 year ago

Yeah the stream readers makes it more complicated, but somewhat of a separate issue. Not sure about OP, but for us it's usually a bug in our transformations, not bad data, so there's nothing to RESTORE back to because it's all wrong. Also, even if you do know it's specifically a bad batch, I don't think RESTORE'ing resets any checkpoint state so you'd have to go in and manually delete the batches in the checkpoint you think had the bad data

zsxwing commented 1 year ago

For the downstream streaming queries, the issue cannot be solved within Delta Lake functionality. A possible solution would be making Structured Streaming support rollback, but it would be a large feature for Apache Spark.

Kimahriman commented 1 year ago

The "streaming overwrite" would fix the downstream streams issue if that stream can handle duplicates/reprocessing the same data and the table keeps the same ID during an overwrite. I might think about how this could work, maybe some flag that if it's the first batch of the stream it should overwrite the table during the first commit

zsxwing commented 1 year ago

Could you clarify what's the difference between the streaming overwrite you propose and a simple batch overwrite query?

Kimahriman commented 1 year ago

Mostly the risk of getting duplicate data. Not sure if this is exactly the same scenario the OP is talking about, but it sounds very similar to me.

We receive a bunch of raw data in all kinds of file formats. These files can be coming in as much as every few minutes. We save those files to HDFS and then we do a file stream depending on the file type, do a bunch of transformations that we have a whole library built around, and then save it to a Delta table sink. We discover some type of a bug in our transformations, or we just learn something new about one of the fields and realize we need to process it differently, and because a lot of the logic is baked into our somewhat complicated library, trying to run an update to fix things isn't super easy, so we need to just rewrite it from scratch.

If we tried to do the "batch" update, in my mind how that would work would be we stop our streaming job, and kick off the single batch overwrite job. The main issue is when we kick off our batch job, new files may have come in since so our batch query could process those as well. And then when we restart our streaming with our fixes, it won't think it has processed those files so it will process them again. You could try to be really clever/quick with the timing but can't really be guaranteed.

To complicate things more for us, some of our sources we can't process in a single batch, because we need a shuffle and the shuffle would be too large. But that's really another whole conversation even more complicated than this 😅.

jbin121867740 commented 1 year ago

We have the same scenarioes as @Kimahriman mentioned. Data needs to be reprocessed due to internal bug or other issue. And after reprocessing, we want to still keep the streaming. We have discussed the same solution as you mentioned (including version in path) @Kimahriman , but then we would like to find the recommened way to do this for streaming query. With overwrite a batch query to a streaming, I am not sure what will happen to the streaming. Will it break the streaming? (I assume the batch query reprocess the source data and overwrtie to sink place) In case it works, there still be some difficulties in practise. 1), we need to maintain the same transformation for both batch query and streaming query. 2) probably will be some duplication generated. For example, we have rows r1, r2, r3 in source, while only r1 and r2 are processed by the streaming query. when we do a batch query, r1, r2 and r3 are processed as it does not know current place where streaming processed. Then in the sink, after overwritting by the batch query, it will contain the transformation from r3 also. Finally we restart streaming, r3 will be processed again by the streaming query. Just let me know if I misunderstood your recommedation @zsxwing .

jbin121867740 commented 1 year ago

Sorry that I made a mistake by close this issue, it is reopened immediately.

Kimahriman commented 1 year ago

Actually I just realized I think this is technically possible since 2.0.0 using foreachBatch. You could do something like

df_stream = ...

def foreach_batch(batch_df, batch_id):
    if batch_id == 0:
        mode = 'overwrite'
    else:
        mode = 'append'

    batch_df.write.format('delta').mode(mode).option('txnVersion', batch_id).option('txnAppId', app_id).save(...)

df_stream.foreachBatch(foreach_batch).start()

Just stop your current job, delete the checkpoint, and then restart with something like that. This is assuming there's no weird conflict between using overwrite mode with the new txn... options. Haven't actually tested it out. Might explore adding this is a built-in option in the Delta sink.

jbin121867740 commented 1 year ago

@Kimahriman, it looks very promising. Thanks very much. We will try it to see whether it works for us.

zsxwing commented 1 year ago

Yep. The txnVersion and txnAppId are added to allow you getting exactly-once guarantee in foreachBatch. It should work as long as you provide the right txnVersion and txnAppId.

Kimahriman commented 1 year ago

Created a proof of concept for baking this option into the Delta sink

scottsand-db commented 1 year ago

Closing since we decided it was decided in #1375 not to embed this solution natively into delta. Users should use the solution provided here https://github.com/delta-io/delta/issues/1346#issuecomment-1235454065