delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.59k stars 1.7k forks source link

[Feature Request][Spark] reveal generation expression for generated columns #2580

Open keen85 opened 9 months ago

keen85 commented 9 months ago

Feature request

Which Delta project/connector is this regarding?

Overview

Delta allows specifying generated columns, for which values are calculated at runtime during insert. After table creation, there is no (easy) way to look up the generation expression using delta-Spark.

Feature request: I propose implementing one of the two options...

Motivation

We have the scenario where we'd like to check if a given Delta table complies to a given specification (schema, partition columns, generated columns). Currently I need to read and parse the delta_log manually to obtain the generation expression.

Further details

Reproduce the issue with

from delta.tables import DeltaTable
dt = (
    DeltaTable.createOrReplace(spark)
    .tableName("tmp_delta_table_generated_column")
    .addColumn("c1", dataType = "LONG", nullable = False)
    .addColumn("c2", dataType = "LONG", generatedAlwaysAs = "c1 + 1")
    .partitionedBy("c2")
    .execute()
)

dt.toDF().schema
>>>StructType([StructField('c1', LongType(), False), StructField('c2', LongType(), True)])

delta-Spark vs delta-rs

When using delta-rs DeltaTable().schema() already contains delta.generationExpression as in option b)

from deltalake import DeltaTable
DeltaTable(...).schema()
>>>Schema([Field(c1, PrimitiveType("long"), nullable=False), Field(c2, PrimitiveType("long"), nullable=True, metadata={'delta.generationExpression': 'c1 + 1'})])

Willingness to contribute

The Delta Lake Community encourages new feature contributions. Would you or another member of your organization be willing to contribute an implementation of this feature?

tdas commented 9 months ago

According to the protocol, the general expressions is stored as column metadata in the delta log. So ideally it should be present the Spark schema. If its not, that sounds like a bug to me.

does spark.table("delta_table").schema() have the metadata in the column?

tdas commented 9 months ago

Probably this is where the metadata is being scrubbed - https://github.com/delta-io/delta/blob/13739bf7740a9a2bee11d4e46dcab6e52d4fa8f3/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala#L167

@bart-samwel @ryan-johnson-databricks not sure what the original reason of doing this was, but any reason to scrub the column metadata of generated / default columns in the StructType schema? Is there any risk to exposing it? I can't seem to think any.

bart-samwel commented 9 months ago

The scrubbing scrubs this list: https://github.com/delta-io/delta/blob/13739bf7740a9a2bee11d4e46dcab6e52d4fa8f3/spark/src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala#L523

Those are annotations that are used internally in Spark / Delta query plans and that shouldn't be stored.

However, it also calls this function: https://github.com/delta-io/delta/blob/13739bf7740a9a2bee11d4e46dcab6e52d4fa8f3/spark/src/main/scala/org/apache/spark/sql/delta/ColumnWithDefaultExprUtils.scala#L168

That's what scrubs it. It wouldn't scrub it if we called it with keepGeneratedColumns = true, keepIdentityColumns = true.

I'm not sure what the exact policy here is. I think at some point in the past we were probably writing schemas without scrubbing, so now we have to scrub on ingest. That's the first list. The second part removes metadata about generated columns and identity columns. That one is interesting -- if the metadata stays there when you fetch the schema, what happens when you use that schema for another table? Would we still strip it out before writing? I see a lot of calls to removeInternalMetadata in the code, so we might. In that case there might be no harm in just preserving it on the way out. @ryan-johnson-databricks WDYT?

tdas commented 9 months ago

Aah, that is the problem. If we expose these column metadata in the Dataframe's schema, then those metadata will get transparently forwarded through dataframe operations. And if we are writing the final dataframe to another table, those column metadata will be persisted to that table as well. For non-Delta tables, it should not matter though, it should not recognize the delta-specific column metadata and therefore should ignore them. However, if we write it out to a delta table, then a column of the table will get the column metadata, even though its not a generated column in the target table.

Then a safe alternative might be to expose the full schema via different mechanism than dataframe's schema. Something like DeltaTable(...).schemaWithFullMetadata()?

keen85 commented 9 months ago

@tdas what about proposed option a): returning the schemaString (containing generationExpression) from Delta log when calling DeltaTable.detail()

ryan-johnson-databricks commented 9 months ago

@tdas -- I'm afraid I don't know enough about the intricacies of schema metadata handling in spark to have much of an opinion here... but I do know enough to be afraid of changing it, and so would generally favor some additive change so that callers have to opt in to get an unfiltered schema.

bart-samwel commented 9 months ago

Then a safe alternative might be to expose the full schema via different mechanism than dataframe's schema. Something like DeltaTable(...).schemaWithFullMetadata()?

@tdas - I like this alternative. It's guaranteed to be a safe change when users have to opt in. We'd have to document in that function's API that it may contain metadata keys that will wreak havoc in other systems, when exposed to Spark, or when used on other Delta tables. I.e., it's for informational purposes only.

keen85 commented 9 months ago

I just made another discovery: DeltaTable.forName(spark, ...).toDF().schema → 🛑 does not include delta.generationExpression-metadata DeltaTable.forPath(spark, ...).toDF().schema → ✅ does include delta.generationExpression-metadata So I guess, its a problem with Hive metastore?