apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
5.48k stars 1.01k forks source link

CREATE TABLE DDL does not save correct schema, resulting in mismatched plan vs execution (record batch) schema #7636

Open matthewgapp opened 9 months ago

matthewgapp commented 9 months ago

Describe the bug

ROW_NUMBER() function places a non-nullable constraint on its generated field and thus the resulting schema should label that column as nulllable: false. But instead, the logical plan resulting from a table created using CREATE TABLE ... shows a schema with that field as nullable: true. This results in a runtime panic with queries that involve joins (although, I'm not quite sure why it doesn't complain on queries that aren't joins).

Error message produced with minimal repo below

Thread 'main' panicked at 'query failed to execute: External(ArrowError(InvalidArgumentError("batches[0] schema is different with argument schema.\n            batches[0] schema: Schema { fields: [Field { name: \"id\", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"name\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"row_num\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} },\n            argument schema: Schema { fields: [Field { name: \"id\", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"name\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"row_num\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }\n            ")))', src/main.rs:42:10

To Reproduce

Minimal repro

Run this script which will result in the error

Thread 'main' panicked at 'query failed to execute: External(ArrowError(InvalidArgumentError("batches[0] schema is different with argument schema.\n            batches[0] schema: Schema { fields: [Field { name: \"id\", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"name\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"row_num\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} },\n            argument schema: Schema { fields: [Field { name: \"id\", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"name\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"row_num\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }\n            ")))', src/main.rs:42:10
use std::sync::Arc;

use datafusion::{
    arrow::{
        array::{Int32Array, StringArray},
        datatypes::{DataType, Field, Schema},
        record_batch::RecordBatch,
        util::pretty::print_batches,
    },
    datasource::MemTable,
    prelude::{SessionConfig, SessionContext},
};

#[tokio::main(flavor = "current_thread")]
async fn main() {
    let config = SessionConfig::new()
        .with_create_default_catalog_and_schema(true)
        .with_information_schema(true);

    let ctx = SessionContext::with_config(config);

    ctx.register_table("source_table", Arc::new(create_mem_table()))
        .unwrap();

    let create_table_query = r#"create table customers as SELECT *, ROW_NUMBER() OVER (ORDER BY id) AS row_num FROM source_table"#;

    let _ = ctx
        .sql(create_table_query)
        .await
        .unwrap()
        .collect()
        .await
        .unwrap();

    let batches = ctx
        // performing a (cross) join query because joins seem to complain about execution schema vs plan schema mismatch
        .sql("select a.*, b.* from customers a, customers b")
        .await
        .unwrap()
        .collect()
        .await
        .expect("query failed to execute");

    print_batches(&batches).unwrap();
}

// just some random able that we'll append a row_num column to
fn create_mem_table() -> MemTable {
    let schema = Arc::new(Schema::new(vec![
        Field::new("id", DataType::Int32, false),
        Field::new("name", DataType::Utf8, false),
    ]));

    let ids = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
    let names = Arc::new(StringArray::from(vec![
        "Alice", "Bob", "Charlie", "David", "Eve",
    ]));

    let batch = RecordBatch::try_new(schema.clone(), vec![ids as _, names as _]).unwrap();

    MemTable::try_new(schema, vec![vec![batch]]).unwrap()
}

Expected behavior

The schema that is saved when using create table should be correct (i.e., it should capture nullable: false requirements on fields). The logical plan shouldn't conflict with the observed record batches during execution. No panic should occur.

Additional context

A bit more context:

This is where nullable false is set. It's not being picked up in the create table statement. https://github.com/apache/arrow-datafusion/blob/78d9613e81557ca5e5db8b75e5c7dec47ccee0a1/datafusion/physical-expr/src/window/row_number.rs#L54

  fn field(&self) -> Result<Field> {
        let nullable = false;
        let data_type = DataType::UInt64;
        Ok(Field::new(self.name(), data_type, nullable))
    }

I haven't investigated how this field property on the WindowExpr is actually used (or omitted) when constructing the logical plan.

alamb commented 9 months ago

I wonder if the nullable information isn't being updated correctly for join output 🤔

The code that handles setting nullability is here:

https://github.com/apache/arrow-datafusion/blob/b1d134e9ff37724459cb5090a6346a85152a1db7/datafusion/expr/src/logical_plan/builder.rs#L1068

Maybe somehow the plans in question aren't using that function or there is a bug in that function

matthewgapp commented 9 months ago

I wonder if the nullable information isn't being updated correctly for join output 🤔

The code that handles setting nullability is here:

https://github.com/apache/arrow-datafusion/blob/b1d134e9ff37724459cb5090a6346a85152a1db7/datafusion/expr/src/logical_plan/builder.rs#L1068

Maybe somehow the plans in question aren't using that function or there is a bug in that function

I don't think this is the case because the schema is incorrect at create table time (before joins). Joins just seem to be the place where DF complains.

If you inspect the schema of the created table, the row_num column (created by the ROW_NUMBER() window function) is nullable: true when it should be nullable false, per the field implementation on the row number execution plan.

alamb commented 9 months ago

I think row_number() is marked as non-nullable because the implementation always produces a value (aka no nulls) and thus changing it to be nullable doesn't seem correct

In the example above, running

select * from customers

The output is this (there are no nulls):

+----+---------+---------+
| id | name    | row_num |
+----+---------+---------+
| 1  | Alice   | 1       |
| 2  | Bob     | 2       |
| 3  | Charlie | 3       |
| 4  | David   | 4       |
| 5  | Eve     | 5       |
+----+---------+---------+

I caught the error in the debugger and the error seems to be thrown in concat_batches called from here:

https://github.com/apache/arrow-datafusion/blob/32dfbb0bb5c51b55ad84eed4fc140840f0a76612/datafusion/physical-plan/src/joins/cross_join.rs#L138

it turns out that this exact check was removed in arrow 47.0.0 https://github.com/apache/arrow-rs/pull/4815 (merged into DataFusion 2 days ago in https://github.com/apache/arrow-datafusion/pull/7587) and so when I run this query against the latest version of DataFusion (not yet released to crates.io) I get:

+----+---------+---------+----+---------+---------+
| id | name    | row_num | id | name    | row_num |
+----+---------+---------+----+---------+---------+
| 1  | Alice   | 1       | 1  | Alice   | 1       |
| 1  | Alice   | 1       | 2  | Bob     | 2       |
| 1  | Alice   | 1       | 3  | Charlie | 3       |
| 1  | Alice   | 1       | 4  | David   | 4       |
| 2  | Bob     | 2       | 1  | Alice   | 1       |
| 2  | Bob     | 2       | 2  | Bob     | 2       |
| 2  | Bob     | 2       | 3  | Charlie | 3       |
| 2  | Bob     | 2       | 4  | David   | 4       |
| 3  | Charlie | 3       | 1  | Alice   | 1       |
| 3  | Charlie | 3       | 2  | Bob     | 2       |
| 3  | Charlie | 3       | 3  | Charlie | 3       |
| 3  | Charlie | 3       | 4  | David   | 4       |
| 4  | David   | 4       | 1  | Alice   | 1       |
| 4  | David   | 4       | 2  | Bob     | 2       |
| 4  | David   | 4       | 3  | Charlie | 3       |
| 4  | David   | 4       | 4  | David   | 4       |
| 5  | Eve     | 5       | 1  | Alice   | 1       |
| 5  | Eve     | 5       | 2  | Bob     | 2       |
| 5  | Eve     | 5       | 3  | Charlie | 3       |
| 5  | Eve     | 5       | 4  | David   | 4       |
| 1  | Alice   | 1       | 5  | Eve     | 5       |
| 2  | Bob     | 2       | 5  | Eve     | 5       |
| 3  | Charlie | 3       | 5  | Eve     | 5       |
| 4  | David   | 4       | 5  | Eve     | 5       |
| 5  | Eve     | 5       | 5  | Eve     | 5       |
+----+---------+---------+----+---------+---------+
matthewgapp commented 9 months ago

@alamb thanks for the continued review and for the heads up on the removal of that invariant from arrow. We should still ensure that the nullability information is correct at all times, even if arrow doesn't complain.

I completely agree that changing the column generated by ROW_NUMBER() should not be marked as nullable: true. To that end, my PR (#7638) specifically introduces nullable: false when it should be false for window functions in the logical plan. The code in main assumes that the column generated by all window functions is nullable: true which is wrong. In other words, my PR brings the logical plan in line with the actual schema produced by the execution plan.

Let me know if I'm missing something.

xhwhis commented 9 months ago

@matthewgapp I also encountered the same mismatch issue as described above. However, after applying your PR(#7638), the problem persists for me. To illustrate, I've created a minimal reproducible example that builds upon yours, adding just a single 'Insert' operation and utilizing 'PARTITION'. Below are the code and the corresponding error message. I'm uncertain whether the issue lies with the 'Insert' operation or with the 'PARTITION' clause.

Minimal repro

called `Result::unwrap()` on an `Err` value: ArrowError(InvalidArgumentError("batches[0] schema is different with argument schema.\n            batches[0] schema: Schema { fields: [Field { name: \"id\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"name\", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} },\n            argument schema: Schema { fields: [Field { name: \"id\", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"name\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }\n            "))
use std::sync::Arc;

use datafusion::{
    arrow::{
        datatypes::{DataType, Field, Schema},
        util::pretty::print_batches,
    },
    datasource::MemTable,
    prelude::{SessionConfig, SessionContext},
};

#[tokio::main(flavor = "current_thread")]
async fn main() {
    let config = SessionConfig::new()
        .with_create_default_catalog_and_schema(true)
        .with_information_schema(true);

    let ctx = SessionContext::with_config(config);

    ctx.register_table("source_table", Arc::new(create_mem_table()))
        .unwrap();

    let insert_table_query = r#"INSERT INTO source_table VALUES (1, 'Alice'),(2, 'Bob'),(3, 'Charlie'),(4, 'David'), (5, 'Eve')"#;
    let _ = ctx
        .sql(insert_table_query)
        .await
        .unwrap()
        .collect()
        .await
        .unwrap();

    let create_table_query =
        r#"SELECT *, RANK() OVER (PARTITION BY id) AS row_num FROM source_table"#;

    let batches = ctx
        .sql(create_table_query)
        .await
        .unwrap()
        .collect()
        .await
        .unwrap();

    print_batches(&batches).unwrap();
}

fn create_mem_table() -> MemTable {
    let schema = Arc::new(Schema::new(vec![
        Field::new("id", DataType::Int32, false),
        Field::new("name", DataType::Utf8, false),
    ]));

    MemTable::try_new(schema, vec![vec![]]).unwrap()
}
matthewgapp commented 9 months ago

Thanks @xhwhis, this seems like a separate bug (one whose root cause is because the values exec sets the schema for all of its columns as nullable here). This causes the record batch to inherit this nullable schema. The record batch is then inserted into the memory table without complaining (we do check and complain for the inverse - if nullable columns inserted into a non-nullable table here).

The fix is might be passing down the expected table schema to the values logical constructor if there is one. I would open a separate ticket.

EDIT: I opened up a separate issue and also put up a draft PR that should fix the issue: https://github.com/apache/arrow-datafusion/issues/7693

matthewgapp commented 9 months ago

@alamb, more of a meta thought, but with https://github.com/apache/arrow-rs/pull/4815, I'm concerned that all of these "bugs" may go unnoticed over time (unless they're caught in the DF application logic like here), potentially creating incorrect behavior.

I think it could be helpful to have something like your strict mode (potentially a compiler flag). But I'm still ramping on this ecosystem so not sure who should determine the correctness of a schema and/or when that correctness should be asserted.

However, it does feel like anytime that record batches are stuck together and assumed to be a congruent, continuous thing (like a table or stream), that nullability between these batches should be consistent (or at least a superset of the nullability of containing table or stream). For example, for the purposes of DF, it seems appropriate that non-nullable batches would always be acceptable in a table that is nullable. The inverse is not true.

alamb commented 9 months ago

However, it does feel like anytime that record batches are stuck together and assumed to be a congruent, continuous thing (like a table or stream), that nullability between these batches should be consistent (or at least a superset of the nullability of containing table or stream). For example, for the purposes of DF, it seems appropriate that non-nullable batches would always be acceptable in a table that is nullable. The inverse is not true.

Yes, I agree with this as the intent. I also agree with your assesment that this invariant is not always upheld. I think the reason that the RecordBatch level nullability has limited value (the biggest value is in knowing null/non-null for each individual Array, and indeed things like the low level kernels do look at the validity (Null) mask) and thus no one has ensured consistent enforcement across the codebase.