Open wahani opened 2 months ago
For some reason the protocol requires invariants to be enabled even if it's V7. I honestly don't see why this is required since the idea behind table features is that you can opt in
@ion-elgreco I agree. Meanwhile I also created the same table using pyspark and see no difference in the protocol it creates, so maybe it is something else.
@ion-elgreco I agree. Meanwhile I also created the same table using pyspark and see no difference in the protocol it creates, so maybe it is something else.
Can you share the first json log of both tables, can just paste them as code or share as gists
@wahani once this lands: https://github.com/delta-io/delta-rs/pull/2712, you can manually add invariants to a table ;)
@ion-elgreco I agree. Meanwhile I also created the same table using pyspark and see no difference in the protocol it creates, so maybe it is something else.
Can you share the first json log of both tables, can just paste them as code or share as gists
python-0.19.1
{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["timestampNtz"],"writerFeatures":["timestampNtz"]}}
{"metaData":{"id":"6950fa52-78c5-4e69-8c62-14d6da440d58","name":null,"description":null,"format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"short\",\"nullable\":false,\"metadata\":{}},{\"name\":\"values\",\"type\":\"double\",\"nullable\":false,\"metadata\":{}},{\"name\":\"date\",\"type\":\"timestamp_ntz\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"createdTime":1726471364355,"configuration":{}}}
{"add":{"path":"part-00001-57ecdc29-2b6c-4c64-805d-537216102e83-c000.snappy.parquet","partitionValues":{},"size":1179,"modificationTime":1726471364356,"dataChange":true,"stats":"{\"numRecords\":2,\"minValues\":{\"values\":1.0,\"id\":1},\"maxValues\":{\"values\":2.0,\"id\":2},\"nullCount\":{\"id\":0,\"date\":2,\"values\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}}
{"commitInfo":{"timestamp":1726471364356,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists"},"clientVersion":"delta-rs.0.19.1"}}
python-0.15.3
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"ee5190ed-3786-4db9-b518-6b97d343a975","name":null,"description":null,"format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"short\",\"nullable\":false,\"metadata\":{}},{\"name\":\"values\",\"type\":\"double\",\"nullable\":false,\"metadata\":{}},{\"name\":\"date\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"createdTime":1726471484655,"configuration":{}}}
{"add":{"path":"0-f28acb3f-a592-43f7-b163-1e6cc6740429-0.parquet","partitionValues":{},"size":1172,"modificationTime":1726471484654,"dataChange":true,"stats":"{\"numRecords\": 2, \"minValues\": {\"id\": 1, \"values\": 1.0}, \"maxValues\": {\"id\": 2, \"values\": 2.0}, \"nullCount\": {\"id\": 0, \"values\": 0, \"date\": 2}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}}
{"commitInfo":{"timestamp":1726471484655,"operation":"CREATE TABLE","operationParameters":{"location":"file:///.../zz-delta-table","protocol":"{\"minReaderVersion\":1,\"minWriterVersion\":2}","mode":"ErrorIfExists","metadata":"{\"configuration\":{},\"createdTime\":1726471484655,\"description\":null,\"format\":{\"options\":{},\"provider\":\"parquet\"},\"id\":\"ee5190ed-3786-4db9-b518-6b97d343a975\",\"name\":null,\"partitionColumns\":[],\"schemaString\":\"{\\\"type\\\":\\\"struct\\\",\\\"fields\\\":[{\\\"name\\\":\\\"id\\\",\\\"type\\\":\\\"short\\\",\\\"nullable\\\":false,\\\"metadata\\\":{}},{\\\"name\\\":\\\"values\\\",\\\"type\\\":\\\"double\\\",\\\"nullable\\\":false,\\\"metadata\\\":{}},{\\\"name\\\":\\\"date\\\",\\\"type\\\":\\\"timestamp\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}}]}\"}"},"clientVersion":"delta-rs.0.17.0"}}
pyspark-3.5.0
{"commitInfo":{"timestamp":1726471612932,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"3","numOutputRows":"2","numOutputBytes":"2247"},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.2.0","txnId":"c2be61a6-4831-4b3b-8599-937b00ce781c"}}
{"metaData":{"id":"49fd7cb1-1ec1-4cd0-9415-5686380e8f66","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"values\",\"type\":\"float\",\"nullable\":true,\"metadata\":{}},{\"name\":\"date\",\"type\":\"timestamp_ntz\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1726471610014}}
{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["timestampNtz"],"writerFeatures":["timestampNtz"]}}
{"add":{"path":"part-00004-2e0632a4-9306-4fb7-9eb8-80caa199d2dc-c000.snappy.parquet","partitionValues":{},"size":887,"modificationTime":1726471612873,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":1,\"values\":2.0},\"maxValues\":{\"id\":1,\"values\":2.0},\"nullCount\":{\"id\":0,\"values\":0,\"date\":1}}"}}
{"add":{"path":"part-00009-9b8d1a42-dfa9-43b2-9c77-7550d83e0754-c000.snappy.parquet","partitionValues":{},"size":887,"modificationTime":1726471612873,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":2,\"values\":1.0},\"maxValues\":{\"id\":2,\"values\":1.0},\"nullCount\":{\"id\":0,\"values\":0,\"date\":1}}"}}
@wahani there is actually no difference between spark-delta and delta-rs here (python-0.15.3 you have to ignore though since that was never valid).
Are you sure your table does not have delta.invariants somewhere?
@ion-elgreco well it is just the code I provide that reproduces the error. Strictly speaking I would say the nullable property of a column is an invariant, but I don't know how those are handled really, or how those are represented other than in the metadata.
As I wrote, I think the problem must be something else then the protocol, it was just the error message from pyspark pointing into that direction.
@wahani if you create a table with nullable only columns, it does work then?
@ion-elgreco Seems to be the right direction. I can confirm with
schema = pa.schema(
[
pa.field("id", pa.int16(), True),
pa.field("values", pa.float64(), True),
pa.field("date", pa.timestamp("us"), True),
]
)
it does work. Tested now with python-0.20.0.
Ok so nullable columns implies invariants I guess, will take a look at this in couple weeks
To further complicate things, also adding a timezone to the timestamp data but keeping the nullable constraint works. I.e. schema
schema = pa.schema(
[
pa.field("id", pa.int16(), False),
pa.field("values", pa.float64(), False),
pa.field("date", pa.timestamp("us", "UTC"), True),
]
)
will not result in an error with pyspark. This is also how I resolved the issue in our codebase.
@wahani that creates a table without table features so that's why it works because then invariants is always enabled in lower than 3,7
Thanks for clarifying.
@wahani can you please recreate that spark table with nullable False, the previous one you shared had nullable is True on all cols, so it's not entirely clear to me how the logs will look like then
Sure. With the schema:
schema = pa.schema(
[
pa.field("id", pa.int16(), False),
pa.field("values", pa.float64(), False),
pa.field("date", pa.timestamp("us", "UTC"), False),
]
)
we get the following metadata:
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"20dc260d-15ba-4271-bb18-152d971bf71e","name":null,"description":null,"format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"short\",\"nullable\":false,\"metadata\":{}},{\"name\":\"values\",\"type\":\"double\",\"nullable\":false,\"metadata\":{}},{\"name\":\"date\",\"type\":\"timestamp\",\"nullable\":false,\"metadata\":{}}]}","partitionColumns":[],"createdTime":1727786433560,"configuration":{}}}
{"add":{"path":"part-00001-2d622120-57a6-4e1f-b433-ad0947430aff-c000.snappy.parquet","partitionValues":{},"size":1310,"modificationTime":1727786433600,"dataChange":true,"stats":"{\"numRecords\":2,\"minValues\":{\"date\":\"2024-10-01T14:40:33.545349Z\",\"values\":1.0,\"id\":1},\"maxValues\":{\"id\":2,\"values\":2.0,\"date\":\"2024-10-01T14:40:33.545355Z\"},\"nullCount\":{\"id\":0,\"date\":0,\"values\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}}
{"commitInfo":{"timestamp":1727786433601,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists"},"operationMetrics":{"execution_time_ms":46,"num_added_files":1,"num_added_rows":2,"num_partitions":0,"num_removed_files":0},"clientVersion":"delta-rs.0.20.0"}}
Sure. With the schema:
schema = pa.schema( [ pa.field("id", pa.int16(), False), pa.field("values", pa.float64(), False), pa.field("date", pa.timestamp("us", "UTC"), False), ] )
we get the following metadata:
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}} {"metaData":{"id":"20dc260d-15ba-4271-bb18-152d971bf71e","name":null,"description":null,"format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"short\",\"nullable\":false,\"metadata\":{}},{\"name\":\"values\",\"type\":\"double\",\"nullable\":false,\"metadata\":{}},{\"name\":\"date\",\"type\":\"timestamp\",\"nullable\":false,\"metadata\":{}}]}","partitionColumns":[],"createdTime":1727786433560,"configuration":{}}} {"add":{"path":"part-00001-2d622120-57a6-4e1f-b433-ad0947430aff-c000.snappy.parquet","partitionValues":{},"size":1310,"modificationTime":1727786433600,"dataChange":true,"stats":"{\"numRecords\":2,\"minValues\":{\"date\":\"2024-10-01T14:40:33.545349Z\",\"values\":1.0,\"id\":1},\"maxValues\":{\"id\":2,\"values\":2.0,\"date\":\"2024-10-01T14:40:33.545355Z\"},\"nullCount\":{\"id\":0,\"date\":0,\"values\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}} {"commitInfo":{"timestamp":1727786433601,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists"},"operationMetrics":{"execution_time_ms":46,"num_added_files":1,"num_added_rows":2,"num_partitions":0,"num_removed_files":0},"clientVersion":"delta-rs.0.20.0"}}
You need to re-create it with spark ;)
Yes Delta-Spark automatically creates (incorrect) "not-null invariants" for non-nullable columns, enabling the "invariant" feature: https://github.com/delta-io/delta/blob/8e0b133f46f641941ad15ed8cbe7c2d1cc777a5b/spark/src/main/scala/org/apache/spark/sql/delta/constraints/Invariants.scala#L73. I've been trying to get the arguably incorrect handling of nested not-null columns fixed for a while: https://github.com/delta-io/delta/issues/860
Environment
Delta-rs version: python-0.16.0+ (example works until 0.15.3, also tested with 0.19.2)
Binding: Python
Environment:
Bug
What happened:
Beginning with version python-0.16.0+ I receive a DeltaTableFeatureException when reading a delta table with a local spark session. This is related to the newly introduced handling of timezones. In 0.15.3 the write operation will create the following protocol
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
with 0.16.0 and onward we will see{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["timestampNtz"],"writerFeatures":["timestampNtz"]}}
. However, when reading the delta table using a local spark session, I will get the following error message:Adding 'invariants' to the writerFeatures manually will resolve the issue. So far I haven't been able to find an option to add the feature explicitly using the python bindings. E.g. by providing table configuration like "delta.minReaderVersion" or "delta.writerFeatures" - they seem to be ignored or not correct.
What you expected to happen:
I can write timestamp data with deltalake and read it back in using spark without any additional configuration (0.15.3 behavior).
How to reproduce it:
Thanks for any help or guidance with this.