Closed andrei-ionescu closed 1 year ago
fwiw, both spark and pyarrow give the wrong result in different ways.
import pyarrow.parquet
path = "data-dimension-vehicle-20210609T222533Z-4cols-14rows.parquet"
table = pyarrow.parquet.read_table(path)
print(table["dimension_load_date"])
pyarrow.Field<dimension_load_date: timestamp[ns]>
[
[
1816-03-29 05:56:08.066277376,
1815-03-30 05:56:08.066277376,
2021-06-09 00:02:37.000000000,
...
]
]
While it provides the correct result in your case, it only reads up to microseconds (i.e. it truncates nanoseconds). See source code, the nanos / NANOS_PER_MICROS
truncates nanoseconds. So, it allows 9999-12-31 just because this date in microseconds happens to still fit in an i64 (but other larger ones do not).
I do not think there is a correct answer here: "9999-12-31" is not represented by i64 in nanoseconds. Given that int96 original scope was to support nanoseconds, pyarrow seems to preserve that behavior. OTOH, to avoid crashing, it spits something, even if that something is meaningless in this context.
Panicking is a bit too harsh, but at least it does not allow you to go back to the 19th century xD
Note that int96 has been deprecated.
Here are multiple things to discuss.
Even though INT96
is deprecated it is not yet removed from Parquet and still used in Spark, Flink and may other frameworks. By default Spark 3 comes with the spark.sql.parquet.outputTimestampType
option set by default to INT96
(see here). There are lots of parquet file created with columns having the INT96
type even though they may contain values that fit into INT64
only because that is the default setting. I would say that it would be useful to have a consistent behaviour: support INT96
, mark it as deprecated and remove it when and if parquet will remove it.
Regarding the Spark implementation here is a function that returns the nanos: https://github.com/apache/spark/blob/HEAD/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala#L192. The nanos precision is not discarded in Spark.
Apache Flink maps the Timestamp
type to INT96
too: https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/parquet/. Also, Impala still uses it.
Can you provide the part of the code where the overflow issue happens? I would like to understand more.
Here are the values returns regardless if is INT96
or INT64
:
+------------+------------------+------------------+-------------------+------------+
|licence_code|vehicle_make |fuel_type |dimension_load_date|dms_long |
+------------+------------------+------------------+-------------------+------------+
|odc-odbl |**Not Provided** |**Not Provided** |9999-12-31 02:00:00|253402214400|
|odc-odbl |**Not Applicable**|**Not Applicable**|9998-12-31 02:00:00|253370678400|
|odc-odbl |SAVIEM |Petrol |2021-06-09 03:02:37|1623196957 |
|odc-odbl |YAMAHA |Petrol |2021-06-09 03:43:47|1623199427 |
|odc-odbl |VAUXHALL |Petrol |2020-10-18 03:23:47|1602980627 |
|odc-odbl |VAUXHALL |Petrol |2021-06-09 03:02:37|1623196957 |
|odc-odbl |BMW |Petrol |2021-06-09 03:38:39|1623199119 |
|odc-odbl |MG |Petrol |2020-10-18 03:23:47|1602980627 |
|odc-odbl |PEUGEOT |Diesel |2020-10-18 03:35:16|1602981316 |
|odc-odbl |FORD |Diesel |2020-10-18 03:23:47|1602980627 |
|odc-odbl |FORD |Petrol |2020-10-18 03:12:55|1602979975 |
|odc-odbl |SKODA |Diesel |2021-06-09 03:02:37|1623196957 |
|odc-odbl |SHOGUN |Diesel |2020-10-18 03:12:55|1602979975 |
|odc-odbl |MITSUBISHI |Diesel |2021-06-10 01:15:47|1623276947 |
+------------+------------------+------------------+-------------------+------------+
Pretty consistent.
Point taken wrt to the int96 deprecation.
The datetime "9999-12-31" is 253402214400
seconds in unix timestamp:
$ python -c "import datetime; print(datetime.datetime(year=9999,month=12,day=31).timestamp())"
253402214400.0
in nanoseconds, this corresponds to 253402214400 * 10^9 = 253_402_214_400_000_000_000
. The maximum i64
in Rust equals to 9_223_372_036_854_775_807
. Comparing the two, we have:
253_402_214_400_000_000_000 >
9_223_372_036_854_775_807
This was the rational I used to conclude that we can't fit "9999-12-31" in an i64 nanosecond since epoch. Since Java's Long is also i64 with the same maximum as Rust, I concluded that Spark must be discarding something to fit such a date in a Long, since there is simply not enough precision to represent that date in i64 ns. So, I looked for what they did.
int96
represents [i64 nanos, i32 days]
. When reading such bytes from parquet, the interface that Spark uses must be something that consumes such types, and fromJulianDay(days: Int, nanos: Long)
is the only one that does such a thing. As I mentioned, that code truncates the nanoseconds, which is consistent with being able to read that date in microseconds (the two numbers above do not differ by more than 1000).
I may be wrong.
The parquet code in Rust is here. Note that it only goes to millis. The conversion to ns is done here.
I boiled down the conversion in a minimal example (playground):
// to nanos
pub fn int96_to_i64_ns(value: [u32; 3]) -> i64 {
let nanoseconds = value[0] as i64 + ((value[1] as i64) << 32);
let days = value[2] as i64;
const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588;
const SECONDS_PER_DAY: i64 = 86_400;
const NANOS_PER_SECOND: i64 = 1_000_000_000;
let seconds = (days - JULIAN_DAY_OF_EPOCH) * SECONDS_PER_DAY;
seconds * NANOS_PER_SECOND + nanoseconds
}
// to micros
pub fn int96_to_i64_us(value: [u32; 3]) -> i64 {
let nanoseconds = value[0] as i64 + ((value[1] as i64) << 32);
let days = value[2] as i64;
const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588;
const SECONDS_PER_DAY: i64 = 86_400;
const MICROS_PER_SECOND: i64 = 1_000_000;
let seconds = (days - JULIAN_DAY_OF_EPOCH) * SECONDS_PER_DAY;
seconds * MICROS_PER_SECOND + nanoseconds / 1000
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_nanos() {
let value = [1, 0, 2_440_588 + 2_932_896];
let a = int96_to_i64_ns(value);
}
#[test]
fn test_micros() {
let value = [1, 0, 2_440_588 + 2_932_896];
let a = int96_to_i64_us(value);
}
}
where 2_440_588 + 2_932_896
is the number of days of "9999-12-31" since the julian epoch.
test_nanos
does not pass while test_micros
passes
EDIT: changed 2932896
to 2440588 + 2932896
since int96 is days since julian, not unix, epoch ; it does not change the conclusion, though.
Thank you @jorgecarleitao!
I do understand that the nanos, that is expressed as INT96
in parquet and supported by other frameworks, do not fit into the i64
rust type hence the overflow
panic.
From my calculations, the current implementation over i64
can hold up to 2262-04-11T23:47:16.854Z
with nanosecond precision. And since 9999-12-31
is 7000 yers over that max date time it fails with overflow panic.
Now, what can we do?
i128
Rust type (there is no i96
in Rust) to totally remove the panic, but this may require extensive effort to have it implemented (I'm no expert on datafusion
nor arrow-rs/parquet
but I want to learn - my background is distributed processing with Spark and Scala as programming language).i64
limitation and:
2262-04-11
and reading parquet files with timestamp fields greater value will failI hope I ain't a nuisance for you. 😀
(Transferred to arrow-rs, since this is not related to datafusion per-se but to the parquet reading).
I was trying to communicate that what Spark does does not solve the problem in general, just the particular date 9999; it essentially shifts the problem by a factor of 1000x.
I would argue that 200 years for the world to migrate away from int96 should be enough, but probably someone has said the same about the imperial system 200 years ago and here we are. ^_^
IMO panicking is not a valid behavior because it is susceptible to DOS (e.g. an application accepting parquet files from the internet will now panic and unwind on every request).
I think that there are 3 options within the current arrow specification:
I am tempted to argue for 3 because it preserves the two important semantic properties (order and nano precision), but I would be fine with any of them, tbh.
Not a nuisance at all; quite fun problem!
I'm also inclining for 3. It is somehow covering multiple aspects and one of the most important ones is the backward compatibility.
https://github.com/apache/arrow-rs/pull/2481 adopts option 2, as it is consistent with how we handle overflow elsewhere, and more importantly doesn't have a runtime cost. Signed overflow in Rust is defined as twos complement, it isn't UB like it technically is in C++, and arrow-rs follows this example.
I hope that is ok, I wasn't actually aware of this ticket before I implemented #2481 :sweat_smile: FWIW we do now have the schema inference plumbing where we could opt to truncate to milliseconds or similar... :thinking:
Describe the bug Reading Parquet file with timestamp column containing a future date like
9999-12-31 02:00:00
year results in overflow panic with the following output:To Reproduce Steps to reproduce the behavior:
data-dimension-vehicle-20210609T222533Z-4cols-14rows.parquet
file.cargo new read-parquet
, create adata
folder in your project and put the parquet file in thedata
folder inside your project.Cargo.toml
file to contain the following:[dependencies] tokio = "1.14" arrow = "6.0" datafusion = "6.0"
cargo run
.Expected behavior To be able to read that parquet file. The parquet file can be read with
parquet-tools
CLI and Apache Spark.Additional context The root cause is the fact that the parquet file contains some rows with
9999-12-31 02:00:00
in thedimension_load_date
column. This future date is supported by Parquet and Spark.The content of the parquet file is:
To find out more about how the root cause was detected you can follow apache/arrow-datafusion#1359.