delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.49k stars 1.68k forks source link

[Feature Request] get the last table version consumed by a client #1996

Open ahuret opened 1 year ago

ahuret commented 1 year ago

Feature request

I'm looking at a way to get the last table version consumed by a client, the one that is somehow stored in checkpoints. Is there a proper API to do it ?

Which Delta project/connector is this regarding?

Overview

Currently the way I found to get the last table version consumed by a consumer is to directly read a file stored in the checkpoints directory and then unmarshal the json and get the reservoirVersion column, which is the needed version. I would like to use a proper API to get this version, passing the checkpointLocation as parameter.

Motivation

We're working on a complex way to automate table schema mutation on delta lake. There's a step where we need to set the version by using startingVersion parameter to overpass an exception due to schema change. This feature would allow to get the last version consumed by a consumer.

Further details

Willingness to contribute

The Delta Lake Community encourages new feature contributions. Would you or another member of your organization be willing to contribute an implementation of this feature?

vkorukanti commented 1 year ago

@ahuret If I understand correctly, this looks like a question for Spark Structured Streaming.

Could you give me an example of schema change and the step you are taking to bypass the exception? It may help check whether this can be done within Delta without reading the Spark Structured Streaming offset checkpoints.

ahuret commented 1 year ago

basically on a delta table, if you ALTER TABLE once it works as expected, but if you ALTER TABLE twice in a row, we get this error org.apache.spark.sql.streaming.StreamingQueryException: Detected schema change in version 53: , so we're trying to bypass this exception by setting the version to 54.

vkorukanti commented 1 year ago

What specific functionality does ALTER TABLE statements do? Are they adding a new column or renaming/dropping a column? I remember there was some work done to get the schema fixed when a column is renamed or dropped. If possible, could you please give a repro on a small table? It will help debug the problem.

piggito commented 1 year ago

hi @vkorukanti we do a rename followed by an add column, e.g. we have a column "myColumn" with type DOUBLE then we rename "myColumn" to "BACKUP_myColumn" and then add column "myColumn" with type STRING

vkorukanti commented 1 year ago

@jackierwzhang fixed this issue in 3.0.0 (not yet released, but RC is available as 3.0.0rc1). Here are the details on how it works.

piggito commented 1 year ago

@vkorukanti sorry my previous example was not correct, it is actually like this:

  1. Spark stream read uses schema with "myColumn1" type DOUBLE and "myColumn2" type DOUBLE
  2. Rename "myColumn1" to "BACKUP_myColumn1"
  3. Add "myColumn1" type STRING
  4. Rename "myColumn2" to "BACKUP_myColumn2"
  5. Add "myColumn2" type STRING
  6. Update spark stream read schema with "myColumn1" type STRING and "myColumn2" type STRING