delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.63k stars 1.71k forks source link

[BUG] Column invariant not enforced on write #1239

Open wjones127 opened 2 years ago

wjones127 commented 2 years ago

Bug

Describe the problem

I'm trying to understand the column invariant enforcement in delta lake, so it can implement it in delta-rs. However, I am unable to get PySpark to throw an error when writing values that violate the invariant. Am I misunderstanding the spec? Or is this a bug?

Steps to reproduce

import pyarrow as pa
import pyspark
import pyspark.sql.types
import pyspark.sql.functions as F
import delta
from delta.tables import DeltaTable

def get_spark():
    builder = (
        pyspark.sql.SparkSession.builder.appName("MyApp")
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config(
            "spark.sql.catalog.spark_catalog",
            "org.apache.spark.sql.delta.catalog.DeltaCatalog",
        )
    )
    return delta.configure_spark_with_delta_pip(builder).getOrCreate()

spark = get_spark()

schema = pyspark.sql.types.StructType([
    pyspark.sql.types.StructField(
        "c1", 
        dataType = pyspark.sql.types.IntegerType(), 
        nullable = False, 
        metadata = { "delta.invariants": "c1 > 3" } 
    )
])

table = DeltaTable.create(spark) \
    .tableName("testTable") \
    .addColumns(schema) \
    .execute()

# This should fail, but doesn't
spark.createDataFrame([(2,)], schema=schema).write.saveAsTable(
    "testTable",
    mode="append",
    format="delta",
)

Observed results

The write succeeds, even though the delta.invariants key is clearly in schema, the writer protocol is set to 2, and the min value of the write clearly violates the invariant.

First log file:

{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"e8204eae-cd90-41c2-b685-92f22126b54a","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{\"delta.invariants\":\"c1 > 3\"}}]}","partitionColumns":[],"configuration":{},"createdTime":1656459957813}}
{"commitInfo":{"timestamp":1656459957820,"operation":"CREATE TABLE","operationParameters":{"isManaged":"true","description":null,"partitionBy":"[]","properties":"{}"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Apache-Spark/3.2.1 Delta-Lake/1.2.1","txnId":"6d370f8e-211f-4624-8a40-6fbd67e905c8"}}

Second log:

{"add":{"path":"part-00000-0d61b29d-60ee-47d1-a121-2641fbc3ae1d-c000.snappy.parquet","partitionValues":{},"size":326,"modificationTime":1656459958951,"dataChange":true,"stats":"{\"numRecords\":0,\"minValues\":{},\"maxValues\":{},\"nullCount\":{}}"}}
{"add":{"path":"part-00003-b30e416e-c616-4d80-87b6-182baf8f0830-c000.snappy.parquet","partitionValues":{},"size":479,"modificationTime":1656459958981,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c1\":2},\"maxValues\":{\"c1\":2},\"nullCount\":{\"c1\":0}}"}}
{"commitInfo":{"timestamp":1656459958996,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":0,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"2","numOutputRows":"1","numOutputBytes":"805"},"engineInfo":"Apache-Spark/3.2.1 Delta-Lake/1.2.1","txnId":"00a036ec-243d-4543-b7d2-186f031ca2f1"}}

Expected results

I expected it to throw an exception. This should be identical to this unit test, right? https://github.com/delta-io/delta/blob/5d3d73fe714f47bbe30e0414a8f9132000d8932c/core/src/test/scala/org/apache/spark/sql/delta/schema/InvariantEnforcementSuite.scala#L218-L232

Further details

Environment information

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

I have no experience with Scala, so if this is a bug I may not be able to fix it. But I'd be happy to add further clarification to the Protocol spec to clear up the expectations around delta.invariants.

vkorukanti commented 2 years ago

Looking at the invariant enforcement code here, it looks like if the column is not nullable, the invariant by delta.invariants is not considered at all. The existing test has nullable=true and test you written has nullable=false.

I am not sure if it is a bug. Will spend sometime understanding the protocol and get back to you.

wjones127 commented 2 years ago

Oh very interesting. If I switch it to nullable=True, it then rejects the SQL string provided. So I must also not be formatting the condition correctly, though I don't see any good descriptions anywhere. Any hints?

Traceback (most recent call last):
  File "<stdin>", line 4, in <module>
  File "/home/wjones/delta-rs/python/venv/lib/python3.10/site-packages/delta/tables.py", line 1146, in execute
    jdt = self._jbuilder.execute()
  File "/home/wjones/delta-rs/python/venv/lib/python3.10/site-packages/py4j/java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "/home/wjones/delta-rs/python/venv/lib/python3.10/site-packages/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/home/wjones/delta-rs/python/venv/lib/python3.10/site-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o50.execute.
: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'c1': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (String)"c1 > 3"; line: 1, column: 3]
        at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2337)
        at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:720)
        at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:2903)
        at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:1949)
        at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:781)
        at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4684)
        at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4586)
        at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3548)
        at com.fasterxml.jackson.module.scala.ScalaObjectMapper.readValue(ScalaObjectMapper.scala:206)
        at com.fasterxml.jackson.module.scala.ScalaObjectMapper.readValue$(ScalaObjectMapper.scala:205)
        at org.apache.spark.sql.delta.util.JsonUtils$$anon$1.readValue(JsonUtils.scala:27)
        at org.apache.spark.sql.delta.constraints.Invariants$.$anonfun$getFromSchema$2(Invariants.scala:81)
        at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
        at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:75)
        at scala.collection.TraversableLike.map(TraversableLike.scala:286)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
        at scala.collection.AbstractTraversable.map(Traversable.scala:108)
        at org.apache.spark.sql.delta.constraints.Invariants$.getFromSchema(Invariants.scala:76)
        at org.apache.spark.sql.delta.actions.Protocol$.requiredMinimumProtocol(actions.scala:132)
        at org.apache.spark.sql.delta.actions.Protocol$.$anonfun$apply$1(actions.scala:100)
        at scala.Option.map(Option.scala:230)
        at org.apache.spark.sql.delta.actions.Protocol$.apply(actions.scala:100)
        at org.apache.spark.sql.delta.actions.Protocol$.forNewTable(actions.scala:119)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.updateMetadataInternal(OptimisticTransaction.scala:326)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.updateMetadataInternal$(OptimisticTransaction.scala:290)
        at org.apache.spark.sql.delta.OptimisticTransaction.updateMetadataInternal(OptimisticTransaction.scala:98)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.updateMetadata(OptimisticTransaction.scala:283)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.updateMetadata$(OptimisticTransaction.scala:278)
        at org.apache.spark.sql.delta.OptimisticTransaction.updateMetadata(OptimisticTransaction.scala:98)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.updateMetadataForNewTable(OptimisticTransaction.scala:391)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.updateMetadataForNewTable$(OptimisticTransaction.scala:389)
        at org.apache.spark.sql.delta.OptimisticTransaction.updateMetadataForNewTable(OptimisticTransaction.scala:98)
        at org.apache.spark.sql.delta.commands.CreateDeltaTableCommand.createTransactionLogOrVerify$1(CreateDeltaTableCommand.scala:181)
        at org.apache.spark.sql.delta.commands.CreateDeltaTableCommand.$anonfun$run$2(CreateDeltaTableCommand.scala:193)
        at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:120)
        at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:118)
        at org.apache.spark.sql.delta.commands.CreateDeltaTableCommand.recordFrameProfile(CreateDeltaTableCommand.scala:49)
        at org.apache.spark.sql.delta.metering.DeltaLogging.$anonfun$recordDeltaOperation$5(DeltaLogging.scala:114)
        at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:77)
        at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:67)
        at org.apache.spark.sql.delta.commands.CreateDeltaTableCommand.recordOperation(CreateDeltaTableCommand.scala:49)
        at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:113)
        at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:98)
        at org.apache.spark.sql.delta.commands.CreateDeltaTableCommand.recordDeltaOperation(CreateDeltaTableCommand.scala:49)
        at org.apache.spark.sql.delta.commands.CreateDeltaTableCommand.run(CreateDeltaTableCommand.scala:110)
        at org.apache.spark.sql.delta.catalog.DeltaCatalog.org$apache$spark$sql$delta$catalog$DeltaCatalog$$createDeltaTable(DeltaCatalog.scala:164)
        at org.apache.spark.sql.delta.catalog.DeltaCatalog.createTable(DeltaCatalog.scala:213)
        at org.apache.spark.sql.execution.datasources.v2.CreateTableExec.run(CreateTableExec.scala:42)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
        at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
        at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:128)
        at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:133)
        at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:130)
        at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:148)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:166)
        at org.apache.spark.sql.execution.QueryExecution.withCteMap(QueryExecution.scala:73)
        at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:163)
        at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:163)
        at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:214)
        at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:259)
        at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:228)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:98)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at io.delta.tables.DeltaTableBuilder.execute(DeltaTableBuilder.scala:357)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.base/java.lang.Thread.run(Thread.java:829)
vkorukanti commented 2 years ago

The expression needs to be in JSON format. In the example test this is the input {"delta.invariants":"{\"expression\":{\"expression\":\"value < 3\"}}"}

wjones127 commented 2 years ago

Ah yes I can confirm that works.

And I think the fact that it doesn't enforce these on non-nullable columns seems like a bug; I can't imagine why you would want to selectively enforce.

Updated example ```python import pyarrow as pa import pyspark import pyspark.sql.types import pyspark.sql.functions as F import delta from delta.tables import DeltaTable def get_spark(): builder = ( pyspark.sql.SparkSession.builder.appName("MyApp") .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config( "spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog", ) ) return delta.configure_spark_with_delta_pip(builder).getOrCreate() spark = get_spark() schema = pyspark.sql.types.StructType([ pyspark.sql.types.StructField( "c1", dataType = pyspark.sql.types.IntegerType(), nullable = True, metadata = { "delta.invariants": "{\"expression\": { \"expression\": \"c1 > 3\"} }" } ) ]) table = DeltaTable.create(spark) \ .tableName("testTable") \ .addColumns(schema) \ .execute() # This now fails spark.createDataFrame([(2,)], schema=schema).write.saveAsTable( "testTable", mode="append", format="delta", ) ```
vkorukanti commented 2 years ago

@wjones127 Heard from @zsxwing that this feature has bugs and is being deprecated in favor of the constraints.

vkorukanti commented 2 years ago

Closing this issue. @wjones127.

wjones127 commented 2 years ago

@vkorukanti I don't consider this closed. It seems to me that to consider this closed, we have to do one of the following

  1. Fix the bug in Spark implementation of Delta Lake
  2. Remove the invariants feature from the Delta Lake Protocol
  3. Make clear in the Delta Lake protocol that enforcing invariants is optional and deprecated for writer protocol V2, citing Spark as the precedent.

I'm fine with any of these options, but I don't think we should leave this as a lingering issue. As is, an implementor of a Delta Lake writer could be left with the impression that all other writers will respect invariants and create an API to configure them.

vkorukanti commented 2 years ago

@wjones127 Makes sense. Reopening. Not sure what is the procedure to deprecate the features. @tdas?

wjones127 commented 1 year ago

@tdas gentle ping.

It sounds like Spark does indeed still use invariants for NOT NULL columns, based on this conversation: https://delta-users.slack.com/archives/CJ70UCSHM/p1693348327628349

So maybe instead of deprecated the use case should be narrowed to that one? I don't know the Spark implementation well enough to know what the proper way forward here is.

li-boxuan commented 3 months ago

It seems to me that INVARIANTS now only represents "NOT NULL" constraint, but nothing else?