Closed johanl-db closed 2 months ago
FWIW we noticed something similar recently with inserts on merge, the implicit cast failed if struct fields were missing
FWIW we noticed something similar recently with inserts on merge, the implicit cast failed if struct fields were missing
I believe that's expected in that case, unless you enable schema evolution, MERGE / UPDATE rejects writes with missing struct fields. Batch (non-streaming) INSERT also rejects that - although I've recently looked closer at batch insert behavior and the truth is that it's a bit all over the place..
Rejecting missing fields is a reasonable behavior when schema evolution is disabled, as it means we instead fall back to schema enforcement. Streaming writes didn't behave that way though, which I missed
Rejecting missing fields is a reasonable behavior when schema evolution is disabled, as it means we instead fall back to schema enforcement. Streaming writes didn't behave that way though, which I missed
This is missing fields in the source, not the target, so not schema evolution related.
Easy example:
import pyspark.sql.functions as F
from delta import DeltaTable
# Create table of nested struct<id: long, value: long>
spark.range(10).select(F.struct('id', (F.col('id') * 10).alias('value')).alias('nested')).write.format('delta').save('/tmp/merge-test')
# Works without "nested.value"
spark.range(10, 20).select(F.struct('id').alias('nested')).write.format('delta').mode('append').save('/tmp/merge-test')
table = DeltaTable.forPath(spark, "/tmp/merge-test")
# Fails with "Cannot cast struct<id:bigint> to struct<id:bigint,value:bigint>. All nested columns must match
table.alias('target').merge(spark.range(20, 30).select(F.struct('id').alias('nested')).alias('source', 'target.nested.id = source.nested.id').whenNotMatchedInsertAll().execute()
Rejecting missing fields is a reasonable behavior when schema evolution is disabled, as it means we instead fall back to schema enforcement. Streaming writes didn't behave that way though, which I missed
This is missing fields in the source, not the target, so not schema evolution related.
Easy example:
import pyspark.sql.functions as F from delta import DeltaTable # Create table of nested struct<id: long, value: long> spark.range(10).select(F.struct('id', (F.col('id') * 10).alias('value')).alias('nested')).write.format('delta').save('/tmp/merge-test') # Works without "nested.value" spark.range(10, 20).select(F.struct('id').alias('nested')).write.format('delta').mode('append').save('/tmp/merge-test') table = DeltaTable.forPath(spark, "/tmp/merge-test") # Fails with "Cannot cast struct<id:bigint> to struct<id:bigint,value:bigint>. All nested columns must match table.alias('target').merge(spark.range(20, 30).select(F.struct('id').alias('nested')).alias('source', 'target.nested.id = source.nested.id').whenNotMatchedInsertAll().execute()
Right, save()
/saveAsTable()
will accept missing fields, but if you try SQL INSERT INTO t (nested) VALUES (..)
, then I expect that will fail. I think insert by position - df.insertInto()
, INSERT INTO t VALUES (..)
- will also fail.
We do handle SQL insert by name and SQL/DF insert by position in DeltaAnalysis and apply schema enforcement, but let save()
/saveAsTable()
get through unhandled and pretty much don't do any schema validation beyond basic compatibility when writing the data.
That's what I meant by insert behavior is a bit all over the place, I looked at it a couple of weeks ago to see if it could be fixed - and allow implicit casting in save()
/saveAsTable()
- but this would bring a lot of breaking behavior changes
That's what I meant by insert behavior is a bit all over the place, I looked at it a couple of weeks ago to see if it could be fixed - and allow implicit casting in
save()
/saveAsTable()
- but this would bring a lot of breaking behavior changes
Would it make more sense to do a Dataset.to
type operation instead of a cast? That seems more like what is trying to be achieved.
Would it make more sense to do a
Dataset.to
type operation instead of a cast? That seems more like what is trying to be achieved.
That seems too limited, Dataset.to
also cast types and eventually calls to Cast.canANSIStoreAssign() which for example doesn't allow many casts that we do support today - we allow most or all valid implicit casts, string -> int would be valid - and doesn't respect the value of spark.sql.storeAssignmentPolicy
that write operations should respect.
The column reordering part does make sense though, although the issue isn't so much how to apply it as much as making sure we're not breaking existing workloads if we start doing it in more cases
So it seems like the whole issue stems from the fact that castIfNeeded
treats a missing source field as "schema evolution". But that is very different from what most would consider as "schema evolution", a missing field in the target, and the schema of the target table changing as a result of some sort of write. It's really "null filling". If you remove allowStructEvolution
from castIfNeeded
, and don't make it throw an exception if the source is missing a target field, that would likely resolve a lot of the issues/difference with the write paths, and make this sink cast a non-issue. If you really think null-filling should be configurable (whether defaulting to false or true), that should be a different setting than "schema merging", as the use case is very different.
So it seems like the whole issue stems from the fact that
castIfNeeded
treats a missing source field as "schema evolution". But that is very different from what most would consider as "schema evolution", a missing field in the target, and the schema of the target table changing as a result of some sort of write. It's really "null filling". If you removeallowStructEvolution
fromcastIfNeeded
, and don't make it throw an exception if the source is missing a target field, that would likely resolve a lot of the issues/difference with the write paths, and make this sink cast a non-issue. If you really think null-filling should be configurable (whether defaulting to false or true), that should be a different setting than "schema merging", as the use case is very different.
+1, I have it on my todo list to look into this. UpdateExpressionsSupport
is historically the place that implements (part of) schema evolution/enforcement for MERGE and UPDATE, but the checks could be moved somewhere else and applied during analysis.
@Kimahriman I finally got around to implementing some of the changes we discussed previously PR: https://github.com/delta-io/delta/pull/3822 I've asked @tomvanbussel to review it but you can also have a look if you'd like
E.p. castIfNeeded
now uses allowMissingStructField
instead of allowStructEvolution
as a more accurate description of what the associated behavior is, and the fact that streaming writes actually allow missing struct fields even when schema evolution is disabled.
I'm still making that behavior configurable because we need to preserve existing behavior, and UPDATE/MERGE behave differently than streaming writes today (UPDATE/MERGE only allow missing struct fields when schema evolution is enabled)
Description
https://github.com/delta-io/delta/pull/3443 introduced implicit casting when writing to a Delta table using a streaming query.
We are disabling this change for now as it regresses behavior when a struct field is missing in the input data. This previously succeeded, filling the missing fields with
null
but would now fail with:Note: batch INSERT fails in this scenario with:
but since streaming write allowed this, we have to preserve that behavior.
How was this patch tested?
Tests added as part of https://github.com/delta-io/delta/pull/3443, e.p. with flag disabled.
Does this PR introduce any user-facing changes?
Disabled behavior change that was to be introduced with https://github.com/delta-io/delta/pull/3443.