ClickHouse / ClickHouse

ClickHouse® is a real-time analytics DBMS
https://clickhouse.com
Apache License 2.0
37.74k stars 6.93k forks source link

Query runs successfully in RMT table but fails on Distributed table #70717

Open chhetripradeep opened 1 month ago

chhetripradeep commented 1 month ago

This was reported by user in community slack thread

Describe the unexpected behaviour A clear and concise description of what works not as it is supposed to.

How to reproduce

We are using v24.8.4.13-lts version

This is the query which runs successfully in RMT table but fails on Distributed table

SELECT
COUNT(CASE WHEN platform = 'web' THEN 1 ELSE NULL END) AS base_web_cnt
FROM events_dist as base
prewhere base.id = XXX AND (base.eventTimeMs BETWEEN '2024-01-01 00:00:00' AND '2024-01-02 00:00:00')
LIMIT 1

It fails with

Cannot find column `countIf(_CAST(1_UInt8, 'Nullable(UInt8)'_String), equals(__table1.platform, 'web'_String))` in source stream, there are only columns: [countIf(_CAST(1_Nullable(UInt8), 'Nullable(UInt8)'_String), equals(__table1.platform, 'web'_String))]. (THERE_IS_NO_COLUMN) (version 24.8.4.13 (official build))

Stacktrace

0. DB::Exception::Exception(DB::Exception::MessageMasked&&, int, bool) @ 0x000000000daffc3b
1. DB::Exception::Exception(PreformattedMessage&&, int) @ 0x0000000007e59fcc
2. DB::Exception::Exception<String const&, String>(int, FormatStringHelperImpl<std::type_identity<String const&>::type, std::type_identity<String>::type>, String const&, String&&) @ 0x00000000088546eb
3. DB::ActionsDAG::makeConvertingActions(std::vector<DB::ColumnWithTypeAndName, std::allocator<DB::ColumnWithTypeAndName>> const&, std::vector<DB::ColumnWithTypeAndName, std::allocator<DB::ColumnWithTypeAndName>> const&, DB::ActionsDAG::MatchColumnsMode, bool, bool, std::unordered_map<String, String, std::hash<String>, std::equal_to<String>, std::allocator<std::pair<String const, String>>>*) @ 0x00000000114b04a6
4. DB::createLocalPlan(std::shared_ptr<DB::IAST> const&, DB::Block const&, std::shared_ptr<DB::Context const>, DB::QueryProcessingStage::Enum, unsigned long, unsigned long, bool) @ 0x000000001340b602
5. DB::ClusterProxy::SelectStreamFactory::createForShardImpl(DB::Cluster::ShardInfo const&, std::shared_ptr<DB::IAST> const&, std::shared_ptr<DB::IQueryTreeNode> const&, DB::StorageID const&, std::shared_ptr<DB::IAST> const&, std::shared_ptr<DB::Context const>, std::vector<std::unique_ptr<DB::QueryPlan, std::default_delete<DB::QueryPlan>>, std::allocator<std::unique_ptr<DB::QueryPlan, std::default_delete<DB::QueryPlan>>>>&, std::vector<DB::ClusterProxy::SelectStreamFactory::Shard, std::allocator<DB::ClusterProxy::SelectStreamFactory::Shard>>&, unsigned int, bool, std::function<std::shared_ptr<DB::IAST> (unsigned long)>, bool)::$_0::operator()() const @ 0x0000000012033050
6. DB::ClusterProxy::SelectStreamFactory::createForShardImpl(DB::Cluster::ShardInfo const&, std::shared_ptr<DB::IAST> const&, std::shared_ptr<DB::IQueryTreeNode> const&, DB::StorageID const&, std::shared_ptr<DB::IAST> const&, std::shared_ptr<DB::Context const>, std::vector<std::unique_ptr<DB::QueryPlan, std::default_delete<DB::QueryPlan>>, std::allocator<std::unique_ptr<DB::QueryPlan, std::default_delete<DB::QueryPlan>>>>&, std::vector<DB::ClusterProxy::SelectStreamFactory::Shard, std::allocator<DB::ClusterProxy::SelectStreamFactory::Shard>>&, unsigned int, bool, std::function<std::shared_ptr<DB::IAST> (unsigned long)>, bool) @ 0x0000000012031343
7. DB::ClusterProxy::SelectStreamFactory::createForShard(DB::Cluster::ShardInfo const&, std::shared_ptr<DB::IQueryTreeNode> const&, DB::StorageID const&, std::shared_ptr<DB::IAST> const&, std::shared_ptr<DB::Context const>, std::vector<std::unique_ptr<DB::QueryPlan, std::default_delete<DB::QueryPlan>>, std::allocator<std::unique_ptr<DB::QueryPlan, std::default_delete<DB::QueryPlan>>>>&, std::vector<DB::ClusterProxy::SelectStreamFactory::Shard, std::allocator<DB::ClusterProxy::SelectStreamFactory::Shard>>&, unsigned int, bool, std::function<std::shared_ptr<DB::IAST> (unsigned long)>) @ 0x0000000012033777
8. DB::ClusterProxy::executeQuery(DB::QueryPlan&, DB::Block const&, DB::QueryProcessingStage::Enum, DB::StorageID const&, std::shared_ptr<DB::IAST> const&, DB::ClusterProxy::SelectStreamFactory&, std::shared_ptr<Poco::Logger>, std::shared_ptr<DB::Context const>, DB::SelectQueryInfo const&, std::shared_ptr<DB::ExpressionActions> const&, String const&, DB::DistributedSettings const&, std::function<std::shared_ptr<DB::IAST> (unsigned long)>, bool) @ 0x0000000012025b38
9. DB::StorageDistributed::read(DB::QueryPlan&, std::vector<String, std::allocator<String>> const&, std::shared_ptr<DB::StorageSnapshot> const&, DB::SelectQueryInfo&, std::shared_ptr<DB::Context const>, DB::QueryProcessingStage::Enum, unsigned long, unsigned long) @ 0x00000000123890a1
10. DB::(anonymous namespace)::buildQueryPlanForTableExpression(std::shared_ptr<DB::IQueryTreeNode>, DB::SelectQueryInfo const&, DB::SelectQueryOptions const&, std::shared_ptr<DB::PlannerContext>&, bool, bool) @ 0x0000000011b02482
11. DB::buildJoinTreeQueryPlan(std::shared_ptr<DB::IQueryTreeNode> const&, DB::SelectQueryInfo const&, DB::SelectQueryOptions&, std::unordered_set<String, std::hash<String>, std::equal_to<String>, std::allocator<String>> const&, std::shared_ptr<DB::PlannerContext>&) @ 0x0000000011afa06d
12. DB::Planner::buildPlanForQueryNode() @ 0x0000000011ae47be
13. DB::Planner::buildQueryPlanIfNeeded() @ 0x0000000011ae0c19
14. DB::executeQueryImpl(char const*, char const*, std::shared_ptr<DB::Context>, DB::QueryFlags, DB::QueryProcessingStage::Enum, DB::ReadBuffer*) @ 0x0000000011e27821
15. DB::executeQuery(String const&, std::shared_ptr<DB::Context>, DB::QueryFlags, DB::QueryProcessingStage::Enum) @ 0x0000000011e23e1a
16. DB::TCPHandler::runImpl() @ 0x0000000012fb6504
17. DB::TCPHandler::run() @ 0x0000000012fd1258
18. Poco::Net::TCPServerConnection::start() @ 0x0000000015dbd6a7
19. Poco::Net::TCPServerDispatcher::run() @ 0x0000000015dbdb39
20. Poco::PooledThread::run() @ 0x0000000015d8aae1
21. Poco::ThreadImpl::runnableEntry(void*) @ 0x0000000015d8909d
22. ? @ 0x00007f01ccc68609
23. ? @ 0x00007f01ccb8d353

The column is defined as

`platform` LowCardinality(String) CODEC(ZSTD(1)),

The column comparison showed a difference between the column name from the header and the column name from the stream:

ActionsDAG ActionsDAG::makeConvertingActions(
    const ColumnsWithTypeAndName & source, // countIf(_CAST(1_Nullable(UInt8), 'Nullable(UInt8)'_String), equals(__table1.platform, 'web'_String))
    const ColumnsWithTypeAndName & result, // countIf(_CAST(1_UInt8, 'Nullable(UInt8)'_String), equals(__table1.platform, 'web'_String))
    MatchColumnsMode mode,
    bool ignore_constant_values,
    bool add_casted_columns,
    NameToNameMap * new_names)
{
    size_t num_input_columns = source.size();
    size_t num_result_columns = result.size();

    if (mode == MatchColumnsMode::Position && num_input_columns != num_result_columns)
        throw Exception(ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH, "Number of columns doesn't match (source: {} and result: {})", num_input_columns, num_result_columns);

    if (add_casted_columns && mode != MatchColumnsMode::Name)
        throw Exception(ErrorCodes::LOGICAL_ERROR, "Converting with add_casted_columns supported only for MatchColumnsMode::Name");

    ActionsDAG actions_dag(source);
    NodeRawConstPtrs projection(num_result_columns);

    FunctionOverloadResolverPtr func_builder_materialize = std::make_unique<FunctionToOverloadResolverAdaptor>(std::make_shared<FunctionMaterialize>());

    std::unordered_map<std::string_view, std::list<size_t>> inputs;
    if (mode == MatchColumnsMode::Name)
    {
        size_t input_nodes_size = actions_dag.inputs.size();
        for (size_t pos = 0; pos < input_nodes_size; ++pos)
            inputs[actions_dag.inputs[pos]->result_name].push_back(pos);
    }

    for (size_t result_col_num = 0; result_col_num < num_result_columns; ++result_col_num)
    {
        const auto & res_elem = result[result_col_num];
        const Node * src_node = nullptr;
        const Node * dst_node = nullptr;

        switch (mode)
        {
            case MatchColumnsMode::Position:
            {
                src_node = dst_node = actions_dag.inputs[result_col_num];
                break;
            }

            case MatchColumnsMode::Name:
            {
                auto & input = inputs[res_elem.name];
                if (input.empty())
                {
                    const auto * res_const = typeid_cast<const ColumnConst *>(res_elem.column.get());
                    if (ignore_constant_values && res_const)
                        src_node = dst_node = &actions_dag.addColumn(res_elem);
                    else
                        throw Exception(ErrorCodes::THERE_IS_NO_COLUMN,
                                        // res_elem.name is:  countIf(_CAST(1_UInt8, 'Nullable(UInt8)'_String), equals(__table1.platform, 'web'_String))
                                        // Block(source) is:  countIf(_CAST(1_Nullable(UInt8), 'Nullable(UInt8)'_String), equals(__table1.platform, 'web'_String))
                                        "Cannot find column `{}` in source stream, there are only columns: [{}]",
                                        res_elem.name, Block(source).dumpNames());
                }
void addConvertingActions(QueryPlan & plan, const Block & header, bool has_missing_objects)
{
    if (blocksHaveEqualStructure(plan.getCurrentDataStream().header, header))
        return;

    auto mode = has_missing_objects ? ActionsDAG::MatchColumnsMode::Position : ActionsDAG::MatchColumnsMode::Name;

    auto get_converting_dag = [mode](const Block & block_, const Block & header_)
    {
        /// Convert header structure to expected.
        /// Also we ignore constants from result and replace it with constants from header.
        /// It is needed for functions like `now64()` or `randConstant()` because their values may be different.
        /**
         *

MatchColumnsMode mode,
         */
        return ActionsDAG::makeConvertingActions(
            block_.getColumnsWithTypeAndName(), // const ColumnsWithTypeAndName & source, // countIf(_CAST(1_Nullable(UInt8), 'Nullable(UInt8)'_String), equals(__table1.platform, 'web'_String))
            header_.getColumnsWithTypeAndName(),// const ColumnsWithTypeAndName & result, // countIf(_CAST(1_UInt8, 'Nullable(UInt8)'_String), equals(__table1.platform, 'web'_String))
            mode,
            true);
    };

    auto convert_actions_dag = get_converting_dag(plan.getCurrentDataStream().header, header);
    auto converting = std::make_unique<ExpressionStep>(plan.getCurrentDataStream(), std::move(convert_actions_dag));
    plan.addStep(std::move(converting));
}

The CASE WHEN platform = 'web' THEN 1 ELSE NULL END should triggered a conversion to type Nullable(Nullable(UInt8)), thus the type in the stream is Nullable(UInt8), but the type in the header didn't apply such a conversion, thus have no such a Nullable(UInt8), just a UInt8

Link https://github.com/ClickHouse/ClickHouse/blob/v24.8.4.13-lts/src/Storages/MergeTree/MergeTreeSplitPrewhereIntoReadSteps.cpp#L279

Expected behavior Query should have run successfully over Distributed table

iceyett commented 1 month ago

Not just prewhere, the failure is also happening with WHERE. can we prioritize this as we have to disable analyzer to avoid the error from happening for now? Thanks!