apache / incubator-gluten

Gluten is a middle layer responsible for offloading JVM-based SQL engines' execution to native engines.
https://gluten.apache.org/
Apache License 2.0
1.22k stars 437 forks source link

[CH] Native parquet scan failed after native write when written map column contains null value #8022

Open taiyang-li opened 14 hours ago

taiyang-li commented 14 hours ago

Backend

CH (ClickHouse)

Bug description

drop table if exists tmp.tnmp;
create table tmp.tnmp using parquet as 
with data_source as (
select
id as uid,
case when random() < 0.1 then null else floor(random() * 100) end as rec_room_id,
case when random() < 0.1 then null else floor(random() * 100) end as room_id,
case when random() < 0.1 then null else floor(random() * 100) end as dispatch_id,
case when random() < 0.1 then null else floor(random() * 100) end as gift_value_total,
case when random() < 0.1 then null else floor(random() * 100) end as follow_channel,
case when random() < 0.1 then null else floor(random() * 100) end as follow_user,
case when random() < 0.1 then null else floor(random() * 100) end as followed_channel,
case when random() < 0.1 then null else floor(random() * 100) end as need_filter,
case when random() < 0.1 then null else floor(random() * 100) end as mic_time
from range(100000)
)
select
uid,
rec_room_id,
room_id,
dispatch_id,
str_to_map(
concat(
'gift_value_total:', gift_value_total,
',follow_channel:', follow_channel,
',follow_user:', follow_user,
',followed_channel:', followed_channel,
',filter:', need_filter,
',mic_time:', mic_time
)
) as label_map,
mic_time
from
data_source; 
:) desc hdfs('hdfs://bigocluster/apps/hive/warehouse/tmp.db/tnmp/part*.parquet')

DESCRIBE TABLE hdfs('hdfs://bigocluster/apps/hive/warehouse/tmp.db/tnmp/part*.parquet')

Query id: f317a8f4-54e4-46a9-867f-fa3e16db68bd

   ┌─name────────┬─type────────────────────────────────────┬─default_type─┬─default_expression─┬─comment─┬─codec_expression─┬─ttl_expression─┐
1. │ uid         │ Nullable(Int64)                         │              │                    │         │                  │                │
2. │ rec_room_id │ Nullable(Int64)                         │              │                    │         │                  │                │
3. │ room_id     │ Nullable(Int64)                         │              │                    │         │                  │                │
4. │ dispatch_id │ Nullable(Int64)                         │              │                    │         │                  │                │
5. │ label_map   │ Nullable(Map(String, Nullable(String))) │              │                    │         │                  │                │
6. │ mic_time    │ Nullable(Int64)                         │              │                    │         │                  │                │
   └─────────────┴─────────────────────────────────────────┴──────────────┴────────────────────┴─────────┴──────────────────┴────────────────┘

6 rows in set. Elapsed: 0.028 sec. 

:) select count(1) from hdfs('hdfs://bigocluster/apps/hive/warehouse/tmp.db/tnmp/part*.parquet') where label_map is not null and label_map['mic_time'] != mic_time; 

SELECT count(1)
FROM hdfs('hdfs://bigocluster/apps/hive/warehouse/tmp.db/tnmp/part*.parquet')
WHERE (label_map IS NOT NULL) AND ((label_map['mic_time']) != mic_time)

Query id: 2c30a0dd-ea1f-4086-a4b1-bfe8fa1faa06

Elapsed: 0.008 sec. 

Received exception from server (version 24.11.1):
Code: 386. DB::Exception: Received from localhost:9001. DB::Exception: There is no supertype for types String, Int64 because some of them are String/FixedString/Enum and some of them are not. Stack trace:

0. ./contrib/llvm-project/libcxx/include/exception:141: Poco::Exception::Exception(String const&, int) @ 0x00000000132631f9
1. ./build_gcc/./src/Common/Exception.cpp:109: DB::Exception::Exception(DB::Exception::MessageMasked&&, int, bool) @ 0x000000000a96e1d9
2. DB::Exception::Exception(PreformattedMessage&&, int) @ 0x0000000005b41e2c
3. DB::Exception::Exception<String, std::basic_string_view<char, std::char_traits<char>>&>(int, FormatStringHelperImpl<std::type_identity<String>::type, std::type_identity<std::basic_string_view<char, std::char_traits<char>>&>::type>, String&&, std::basic_string_view<char, std::char_traits<char>>&) @ 0x000000000921736b
4. ./build_gcc/./src/DataTypes/getLeastSupertype.cpp:69: std::shared_ptr<DB::IDataType const> DB::(anonymous namespace)::throwOrReturn<(DB::LeastSupertypeOnError)0, std::vector<std::shared_ptr<DB::IDataType const>, std::allocator<std::shared_ptr<DB::IDataType const>>>>(std::vector<std::shared_ptr<DB::IDataType const>, std::allocator<std::shared_ptr<DB::IDataType const>>> const&, std::basic_string_view<char, std::char_traits<char>>, int) @ 0x000000000ed4d2de
5. ./build_gcc/./src/DataTypes/getLeastSupertype.cpp:559: std::shared_ptr<DB::IDataType const> DB::getLeastSupertype<(DB::LeastSupertypeOnError)0>(std::vector<std::shared_ptr<DB::IDataType const>, std::allocator<std::shared_ptr<DB::IDataType const>>> const&) @ 0x000000000ed5108a
6. DB::FunctionComparison<DB::NotEqualsOp, DB::NameNotEquals>::executeGeneric(DB::ColumnWithTypeAndName const&, DB::ColumnWithTypeAndName const&) const @ 0x0000000008ed03a2
7. DB::FunctionComparison<DB::NotEqualsOp, DB::NameNotEquals>::executeImpl(std::vector<DB::ColumnWithTypeAndName, std::allocator<DB::ColumnWithTypeAndName>> const&, std::shared_ptr<DB::IDataType const> const&, unsigned long) const @ 0x0000000008ec015c
8. DB::IFunction::executeImplDryRun(std::vector<DB::ColumnWithTypeAndName, std::allocator<DB::ColumnWithTypeAndName>> const&, std::shared_ptr<DB::IDataType const> const&, unsigned long) const @ 0x0000000005b410aa
9. DB::FunctionToExecutableFunctionAdaptor::executeDryRunImpl(std::vector<DB::ColumnWithTypeAndName, std::allocator<DB::ColumnWithTypeAndName>> const&, std::shared_ptr<DB::IDataType const> const&, unsigned long) const @ 0x00000000065bc47a
10. DB::IExecutableFunction::executeWithoutLowCardinalityColumns(std::vector<DB::ColumnWithTypeAndName, std::allocator<DB::ColumnWithTypeAndName>> const&, std::shared_ptr<DB::IDataType const> const&, unsigned long, bool) const @ 0x000000000e1ba2af
11. DB::IExecutableFunction::defaultImplementationForNulls(std::vector<DB::ColumnWithTypeAndName, std::allocator<DB::ColumnWithTypeAndName>> const&, std::shared_ptr<DB::IDataType const> const&, unsigned long, bool) const @ 0x000000000e1ba5cc
12. DB::IExecutableFunction::executeWithoutLowCardinalityColumns(std::vector<DB::ColumnWithTypeAndName, std::allocator<DB::ColumnWithTypeAndName>> const&, std::shared_ptr<DB::IDataType const> const&, unsigned long, bool) const @ 0x000000000e1ba27d
13. DB::IExecutableFunction::executeWithoutSparseColumns(std::vector<DB::ColumnWithTypeAndName, std::allocator<DB::ColumnWithTypeAndName>> const&, std::shared_ptr<DB::IDataType const> const&, unsigned long, bool) const @ 0x000000000e1baca2
14. DB::IExecutableFunction::execute(std::vector<DB::ColumnWithTypeAndName, std::allocator<DB::ColumnWithTypeAndName>> const&, std::shared_ptr<DB::IDataType const> const&, unsigned long, bool) const @ 0x000000000e1bbebb
15. ./build_gcc/./src/Interpreters/ActionsDAG.cpp:739: DB::ActionsDAG::evaluatePartialResult(std::unordered_map<DB::ActionsDAG::Node const*, DB::ColumnWithTypeAndName, std::hash<DB::ActionsDAG::Node const*>, std::equal_to<DB::ActionsDAG::Node const*>, std::allocator<std::pair<DB::ActionsDAG::Node const* const, DB::ColumnWithTypeAndName>>>&, std::vector<DB::ActionsDAG::Node const*, std::allocator<DB::ActionsDAG::Node const*>> const&, unsigned long, bool) @ 0x000000000f02518e
16. ./build_gcc/./src/Interpreters/ActionsDAG.cpp:819: DB::ActionsDAG::updateHeader(DB::Block const&) const @ 0x000000000f023eb0
17. ./build_gcc/./src/Processors/Transforms/FilterTransform.cpp:21: DB::FilterTransform::transformHeader(DB::Block const&, DB::ActionsDAG const*, String const&, bool) @ 0x00000000114df2d3
18. ./build_gcc/./src/Processors/QueryPlan/FilterStep.cpp:34: DB::FilterStep::FilterStep(DB::Block const&, DB::ActionsDAG, String, bool) @ 0x000000001167ff79
19. ./contrib/llvm-project/libcxx/include/__memory/unique_ptr.h:714: std::__unique_if<DB::FilterStep>::__unique_single std::make_unique[abi:v15007]<DB::FilterStep, DB::Block const&, DB::ActionsDAG, String&, bool&>(DB::Block const&, DB::ActionsDAG&&, String&, bool&) @ 0x000000000f94d76a
20. ./build_gcc/./src/Planner/Planner.cpp:408: DB::(anonymous namespace)::addFilterStep(DB::QueryPlan&, DB::FilterAnalysisResult&, String const&, std::unordered_set<std::shared_ptr<DB::FutureSet>, std::hash<std::shared_ptr<DB::FutureSet>>, std::equal_to<std::shared_ptr<DB::FutureSet>>, std::allocator<std::shared_ptr<DB::FutureSet>>>&) @ 0x000000000f98eb7a
21. ./build_gcc/./src/Planner/Planner.cpp:1608: DB::Planner::buildPlanForQueryNode() @ 0x000000000f988012
22. ./build_gcc/./src/Planner/Planner.cpp:1301: DB::Planner::buildQueryPlanIfNeeded() @ 0x000000000f983852
23. ./build_gcc/./src/Interpreters/InterpreterSelectQueryAnalyzer.cpp:241: DB::InterpreterSelectQueryAnalyzer::getQueryPlan() @ 0x000000000f980b8d
24. ./build_gcc/./src/Interpreters/executeQuery.cpp:1346: DB::executeQueryImpl(char const*, char const*, std::shared_ptr<DB::Context>, DB::QueryFlags, DB::QueryProcessingStage::Enum, DB::ReadBuffer*) @ 0x000000000fcfdb75
25. ./build_gcc/./src/Interpreters/executeQuery.cpp:1564: DB::executeQuery(String const&, std::shared_ptr<DB::Context>, DB::QueryFlags, DB::QueryProcessingStage::Enum) @ 0x000000000fcfa3ba
26. ./build_gcc/./src/Server/TCPHandler.cpp:595: DB::TCPHandler::runImpl() @ 0x0000000011190431
27. ./build_gcc/./src/Server/TCPHandler.cpp:2528: DB::TCPHandler::run() @ 0x00000000111a8079
28. ./build_gcc/./base/poco/Net/src/TCPServerConnection.cpp:43: Poco::Net::TCPServerConnection::start() @ 0x00000000133074a7
29. ./build_gcc/./base/poco/Net/src/TCPServerDispatcher.cpp:115: Poco::Net::TCPServerDispatcher::run() @ 0x000000001330799e
30. ./build_gcc/./base/poco/Foundation/src/ThreadPool.cpp:205: Poco::PooledThread::run() @ 0x00000000132b2c52
31. ./base/poco/Foundation/src/Thread_POSIX.cpp:335: Poco::ThreadImpl::runnableEntry(void*) @ 0x00000000132b076f
. (NO_COMMON_TYPE)

Spark version

None

Spark configurations

No response

System information

No response

Relevant logs

No response

taiyang-li commented 14 hours ago

As a comparison, if we make sure written map column doesn't contain null, the issue doesn't appear any more. And the written and read columns are consistent.

drop table if exists tmp.tnmpnn;
create table tmp.tnmpnn using parquet as 
with data_source as (
    select
        id as uid,
        case when random() < 0.1 then null else floor(random() * 100) end as rec_room_id,
        case when random() < 0.1 then null else floor(random() * 100) end as room_id,
        case when random() < 0.1 then null else floor(random() * 100) end as dispatch_id,
        case when random() < 0.1 then null else floor(random() * 100) end as gift_value_total,
        case when random() < 0.1 then null else floor(random() * 100) end as follow_channel,
        case when random() < 0.1 then null else floor(random() * 100) end as follow_user,
        case when random() < 0.1 then null else floor(random() * 100) end as followed_channel,
        case when random() < 0.1 then null else floor(random() * 100) end as need_filter,
        case when random() < 0.1 then null else floor(random() * 100) end as mic_time
    from range(100000)
)
select
    uid,
    rec_room_id,
    room_id,
    dispatch_id,
    str_to_map(
        concat(
            'gift_value_total:', coalesce(gift_value_total, 0),
            ',follow_channel:', coalesce(follow_channel, 0),
            ',follow_user:', coalesce(follow_user, 0),
            ',followed_channel:', coalesce(followed_channel, 0),
            ',filter:', coalesce(need_filter, 0),
            ',mic_time:', coalesce(mic_time, 0)
        )
    ) as label_map,
    mic_time
from
    data_source;
:) select count(1) from hdfs('hdfs://bigocluster/apps/hive/warehouse/tmp.db/tnmpnn/part*.parquet') where label_map is not null and toInt64OrZero(label_map['mic_time']) != mic_time;

SELECT count(1)
FROM hdfs('hdfs://bigocluster/apps/hive/warehouse/tmp.db/tnmpnn/part*.parquet')
WHERE (label_map IS NOT NULL) AND (toInt64OrZero(label_map['mic_time']) != mic_time)

Query id: 7d7d1518-6c24-4603-b0b7-8e5b6b61aa44

   ┌─count(1)─┐
1. │        0 │
   └──────────┘

1 row in set. Elapsed: 16.334 sec. Processed 100.00 thousand rows, 1.52 MB (6.12 thousand rows/s., 92.91 KB/s.)
Peak memory usage: 27.36 MiB.