delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.18k stars 1.62k forks source link

I can write DataFrames with NullType column but I can't read when using Delta. #450

Closed renatoferreira656 closed 3 years ago

renatoferreira656 commented 4 years ago

I have a group of DataFrames where the schema is inferred, so some columns may be inferred as NullType.

I get it that parquet doesn't support NullType columns and Delta drop it before writing, but if Delta let me write NullType columns, I believe I should be able read it, even if the NullType columns was dropped.

What do you think about this behavior?

Code explaning the behavior:

scala> val df = Seq(("valueA", null)).toDF("columna", "columnb")
df: org.apache.spark.sql.DataFrame = [columna: string, columnb: null]

scala> df.write.format("delta").save("/tmp/table-a")

scala> spark.read.format("delta").load("/tmp/table-a")
org.apache.spark.sql.AnalysisException: Parquet data source does not support null data type.;
  at org.apache.spark.sql.execution.datasources.DataSourceUtils$$anonfun$verifySchema$1.apply(DataSourceUtils.scala:69)
  at org.apache.spark.sql.execution.datasources.DataSourceUtils$$anonfun$verifySchema$1.apply(DataSourceUtils.scala:67)
  at scala.collection.Iterator$class.foreach(Iterator.scala:891)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
  at org.apache.spark.sql.types.StructType.foreach(StructType.scala:99)
  at org.apache.spark.sql.execution.datasources.DataSourceUtils$.verifySchema(DataSourceUtils.scala:67)
  at org.apache.spark.sql.execution.datasources.DataSourceUtils$.verifyReadSchema(DataSourceUtils.scala:41)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:400)
  at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
  ... 53 elided

scala> spark.read.parquet("/tmp/table-a").show()
+-------+
|columna|
+-------+
| valueA|
+-------+
brkyvz commented 4 years ago

Thank you for reporting this bug. This is indeed very annoying. Would anyone from the community help in fixing this bug?

koertkuipers commented 3 years ago

we are running into this also. given type inference not supporting nulltype would be inconvenient. i think the ideal solution would be for delta to support nulltype, but not store it in parquet files. it seems it already does this upon writing. could we read from parquet without the columns that are nulltype and then re-insert them?

or should we go for the simpler alternative of just refusing to write nulltype?

brkyvz commented 3 years ago

since no one volunteered, I'll fix this.

brkyvz commented 3 years ago

@renatoferreira656 @koertkuipers Would you be fine with the behavior if Delta didn't return the null columns back when you read the table? Seems like being able to read back null values is REALLY hard.

renatoferreira656 commented 3 years ago

Hi @brkyvz thank you for solving this for us and sorry for not helping you..

The way you are proposing to solve is fine for us...

Thank you again.

aksmiyazaki commented 3 years ago

Hello!

Any updates in this issue?

Thanks!

brkyvz commented 3 years ago

We are trying to fix it for 0.8.0. One complication is that a PR was merged in Apache Spark which blocks all table creations with null types, so we need to do work in Apache Spark as well.

manub22 commented 3 years ago

Before storing to parquet I'm thinking to convert the NULL type to String for all such columns present in the DataFrame. I'm trying to iterate over columns and change the datatype by using WithColumn() function, but not able to take all such columns dynamically and create a new dataframe.

myDF.dtypes.foreach { f =>
  val fColName = f._1
  val fColType = f._2
  if (fColType == "NullType") { 
    println("Col-Name: %s, Col-Type: %s".format(fColName , fColType))
    myDF.withColumn(fColName, myDF(fColName).cast("string"))
  }
}
tnixon commented 3 years ago

@renatoferreira656 @koertkuipers Would you be fine with the behavior if Delta didn't return the null columns back when you read the table? Seems like being able to read back null values is REALLY hard.

I've run into this myself, and I think a better fix would be to report an error when saving the table, possibly with a hint to the user that they can cast the NULL to some other type, which should be storable. It seems like silently making columns disappear without an error or warning is not a good pattern.

gbarreiro commented 3 years ago

It's 10th December and I have faced the same issue, it seems like it hasn't been fixed yet, 5 months since it was first reported. I found a workaround, that is reading the table as Parquet, that means, doing spark.read.format("parquet").load("mytable") instead spark.read.format("delta").load("mytable"). Apparently it works fine, since Delta relies on Parquet, but if anyone thinks this could lead to any kind of issue, I would love to know it.

aksmiyazaki commented 3 years ago

It's 10th December and I have faced the same issue, it seems like it hasn't been fixed yet, 5 months since it was first reported. I found a workaround, that is reading the table as Parquet, that means, doing spark.read.format("parquet").load("mytable") instead spark.read.format("delta").load("mytable"). Apparently it works fine, since Delta relies on Parquet, but if anyone thinks this could lead to any kind of issue, I would love to know it.

Hey @gbarreiro just be careful with your approach!
Delta keeps the past data as parquets in the same folder.
The parquets that composes the current delta table are described into the _symlink_format_manifest.
So, be careful, or you may end up reading historical data as part of your current delta table.

Best Regards,

gbarreiro commented 3 years ago

It's 10th December and I have faced the same issue, it seems like it hasn't been fixed yet, 5 months since it was first reported. I found a workaround, that is reading the table as Parquet, that means, doing spark.read.format("parquet").load("mytable") instead spark.read.format("delta").load("mytable"). Apparently it works fine, since Delta relies on Parquet, but if anyone thinks this could lead to any kind of issue, I would love to know it.

Hey @gbarreiro just be careful with your approach! Delta keeps the past data as parquets in the same folder. The parquets that composes the current delta table are described into the _symlink_format_manifest. So, be careful, or you may end up reading historical data as part of your current delta table.

Best Regards,

That's good to know. In the project where I faced this issue, that wouldn't be a problem, since the data is static, but I have to admit that I didn't know that, so thank you very much for letting me know.

brkyvz commented 3 years ago

A better fix would be to allow the Parquet reader in Apache Spark to just read NullType as nulls, which it already should do for columns that doesn't exist in the schema. That's a better solution, but I'm not sure if we can get that behavior into Spark 3.1. Therefore I propose moving forward with dropping the columns during the read to unblock people. For the next Spark release we can improve on the behavior.

brkyvz commented 3 years ago

The fix for this is in master. We will not return the nulltype columns when reading for now.

OzzyArmas commented 2 years ago

Hi, so we are running on a similar issue and feel it's related to this thread. We have some columns where we set nullable = False. However, when we save and open the same data frame from file storage. All columns are set to nullable = True.

RussellJungwirth commented 2 years ago

here's a workaround to fix a bad column in a non-managed delta lake table. note that this will truncate your table history.

pyspark code:

bad_column_name = 'bad_col'
bad_column_type = 'string'
dbfs_delta_path = 'dbfs:/path/to/bad_table'
dbutils.fs.rm(f'{dbfs_delta_path}/_delta_log', True)
spark.sql(f"CONVERT TO DELTA parquet.`{dbfs_delta_path}`")
spark.sql(f"alter table delta.`{dbfs_delta_path}` add column {bad_column_name} {bad_column_type}")