trinodb / trino

Official repository of Trino, the distributed SQL query engine for big data, formerly known as PrestoSQL (https://trino.io)
https://trino.io
Apache License 2.0
10.49k stars 3.02k forks source link

Iceberg connector gives different results for nested null checks with projection pushdown enabled vs disabled #20511

Open zpuller opened 9 months ago

zpuller commented 9 months ago

Iceberg connector gives different results for nested null checks with projection pushdown enabled vs disabled, specifically in the case of a schema with an optional struct containing a required inner field, eg.

message table {
  required int32 id = 1;
  required binary name (STRING) = 2;
  required int32 age = 3;
  optional group address = 4 {
    required binary street (STRING) = 5;
    optional group address_info = 6 {
      required binary city (STRING) = 7;
      required binary county (STRING) = 8;
      required binary state (STRING) = 9;
    }
  }
}

To reproduce, it is required to use something like Spark to write the table. We observed this with parquet format specifically. It cannot be created with Trino (to my knowledge) because SQL syntax does not permit specifying NOT NULL for nested types.

I created an iceberg table using the following spark sql query:

SELECT
  *
FROM VALUES
  (1, 'John Doe', 35, CAST(NULL AS STRUCT<street: STRING, address_info: STRUCT<city: STRING, county: STRING, state: TRING>>)),
  (2, 'Jane Doe', 27, named_struct(
    'street',
    CAST('456 Lane' AS STRING),
    'address_info',
    CAST(struct('San Francisco', 'San Francisco', 'CA') AS STRUCT<city: STRING, county: STRING, state: STRING>)
  )),
  (3, 'Mary Johnson', 30, named_struct(
    'street',
    CAST('789 Boulevard' AS STRING),
    'address_info',
    CAST(struct('Portland', 'Multnomah', 'OR') AS STRUCT<city: STRING, county: STRING, state: STRING>)
  )) AS t(id, name, age, address)

(I also had to manually tweak the schema to get the right set of optional and required fields)

then queried from Trino as follows:

set session iceberg.projection_pushdown_enabled=false
OR
set session iceberg.projection_pushdown_enabled=true

select
  id
from
  iceberg.schema.table
where
  address.street is null

This returns 1 row when pushdown is disabled, and 0 rows with pushdown enabled.

I verified this behavior on v437.

hashhar commented 9 months ago

cc: @alexjo2144 @findinpath

findepi commented 9 months ago

can this be anyhow related to https://github.com/trinodb/trino/pull/19479?

zpuller commented 9 months ago

can this be anyhow related to #19479?

One note is that we originally encountered this in v414, which I believe predates that PR

findinpath commented 9 months ago

@zpuller I've tried the following scenario:

testing/bin/ptl env up --environment singlenode-spark-iceberg

In spark-sql

Create the table:

CREATE TABLE spark_catalog.default.t1 (
  id INT,
  name STRING,
  age INT,
  address STRUCT<street: STRING NOT NULL, address_info: STRUCT<city: STRING, county: STRING, state: STRING>>)
USING iceberg;

Write data into the table:

INSERT INTO t1
SELECT
  *
FROM VALUES
  (1, 'John Doe', 35, CAST(NULL AS STRUCT<street: STRING, address_info: STRUCT<city: STRING, county: STRING, state: STRING>>)),
  (2, 'Jane Doe', 27, named_struct(
    'street',
    CAST('456 Lane' AS STRING),
    'address_info',
    CAST(struct('San Francisco', 'San Francisco', 'CA') AS STRUCT<city: STRING, county: STRING, state: STRING>)
  )),
  (3, 'Mary Johnson', 30, named_struct(
    'street',
    CAST('789 Boulevard' AS STRING),
    'address_info',
    CAST(struct('Portland', 'Multnomah', 'OR') AS STRUCT<city: STRING, county: STRING, state: STRING>)
  )) AS t(id, name, age, address);

The statement fails with the following exception message:

Cannot write incompatible data to table 'spark_catalog.default.t1':
- Cannot write nullable values to non-null field: 'address.street'.

Can you pls sketch in a more detailed manner how to reproduce the issue? You can use Scala if you are more at ease with it.

ebyhr commented 8 months ago

The SELECT statement returns the expected result on master and 437.

spark-sql> CREATE TABLE t USING iceberg AS SELECT
  *
FROM VALUES
  (1, 'John Doe', 35, CAST(NULL AS STRUCT<street: STRING, address_info: STRUCT<city: STRING, county: STRING, state: STRING>>)),
  (2, 'Jane Doe', 27, named_struct(
    'street',
    CAST('456 Lane' AS STRING),
    'address_info',
    CAST(struct('San Francisco', 'San Francisco', 'CA') AS STRUCT<city: STRING, county: STRING, state: STRING>)
  )),
  (3, 'Mary Johnson', 30, named_struct(
    'street',
    CAST('789 Boulevard' AS STRING),
    'address_info',
    CAST(struct('Portland', 'Multnomah', 'OR') AS STRUCT<city: STRING, county: STRING, state: STRING>)
  )) AS t(id, name, age, address);

trino:default> SET SESSION iceberg.projection_pushdown_enabled=true;
trino:default> SELECT id FROM t WHERE address.street is null;
 id
----
  1

trino:default> SET SESSION iceberg.projection_pushdown_enabled=false;
trino:default> SELECT id FROM t WHERE address.street is null;
 id
----
  1
zp-stripe commented 8 months ago

I think I will need to write some custom scala code as mentioned above to provide a better consistent repro of the issue. Let me take some time to figure that out and I will post back here.

ebyhr commented 8 months ago

Ideally, it should be written by SQL. We don't use Scala code in our tests.

ebyhr commented 8 months ago

@zpuller Gentle reminder.

zpuller commented 6 months ago

Sorry for the late response. In response to what @findinpath tried, it might just not like the syntax of your cast in the insert statement. Maybe try:

INSERT INTO t1
SELECT
  *
FROM VALUES
  (1, 'John Doe', 35, CAST(NULL AS STRUCT<street: STRING NOT NULL, address_info: STRUCT<city: STRING, county: STRING, state: STRING>>)),
  (2, 'Jane Doe', 27, named_struct(
    'street',
    CAST('456 Lane' AS STRING NOT NULL),
    'address_info',
    CAST(struct('San Francisco', 'San Francisco', 'CA') AS STRUCT<city: STRING, county: STRING, state: STRING>)
  )),
  (3, 'Mary Johnson', 30, named_struct(
    'street',
    CAST('789 Boulevard' AS STRING NOT NULL),
    'address_info',
    CAST(struct('Portland', 'Multnomah', 'OR') AS STRUCT<city: STRING, county: STRING, state: STRING>)
  )) AS t(id, name, age, address);
zpuller commented 6 months ago

@ebyhr , your create table statement is omitting the NON NULL schema declarations, that's why it doesn't yield the inconsistent results.

zpuller commented 6 months ago

could also try:

INSERT INTO t1
SELECT
  id,
  name,
  age,
  CASE
    WHEN id = 1 THEN NULL
    ELSE address
  END
FROM (
  SELECT
    *
  FROM VALUES
    (1, 'Jane Doe', 27, named_struct(
      'street',
      CAST('456 Lane' AS STRING NOT NULL),
      'address_info',
      CAST(struct('San Francisco', 'San Francisco', 'CA') AS STRUCT<city: STRING, county: STRING, state: STRING>)
    )),
    (2, 'Jane Doe', 27, named_struct(
      'street',
      CAST('456 Lane' AS STRING NOT NULL),
      'address_info',
      CAST(struct('San Francisco', 'San Francisco', 'CA') AS STRUCT<city: STRING, county: STRING, state: STRING>)
    )),
    (3, 'Mary Johnson', 30, named_struct(
      'street',
      CAST('789 Boulevard' AS STRING NOT NULL),
      'address_info',
      CAST(struct('Portland', 'Multnomah', 'OR') AS STRUCT<city: STRING, county: STRING, state: STRING>)
    )) AS t(id, name, age, address)
);