apache / doris-flink-connector

Flink Connector for Apache Doris
https://doris.apache.org/
Apache License 2.0
292 stars 201 forks source link

[Bug] DorisSource createEnumerator error when has filter condition(pushDown) #376

Open yingh0ng opened 2 months ago

yingh0ng commented 2 months ago

Search before asking

Version

1.6.0

What's Wrong?

my sql exsit filter query but column name is "null". Here is the detailed log:

2024-05-06 09:45:01.836 [flink-akka.actor.default-dispatcher-8970] INFO org.apache.doris.flink.source.DorisSource - Query SQL Sending to Doris FE is: 'select FINISHEDFLAG_, FIELD0006_, FIELD0031_, FIELD0030_ from qixi_ods.ods_dcb850f793dd87c6ae90e3aa81875608 where (null = 4)'. 2024-05-06 09:45:01.836 [flink-akka.actor.default-dispatcher-8970] INFO org.apache.doris.flink.source.DorisSource - Send request to Doris FE 'http://192.168.9.200:8030/api/qixi_ods/ods_dcb850f793dd87c6ae90e3aa81875608/_query_plan' with user 'root'. 2024-05-06 09:45:01.843 [flink-akka.actor.default-dispatcher-8970] ERROR org.apache.doris.flink.source.DorisSource - Doris FE's response cannot map to schema. res: {"exception":"PlannershouldplanjustonlyoneScanNodebutfound[0]","status":500}

Caused by: org.apache.doris.flink.exception.DorisException: Doris FE's response cannot map to schema. res: {"exception":"PlannershouldplanjustonlyoneScanNodebutfound[0]","status":500} at org.apache.doris.flink.rest.RestService.getQueryPlan(RestService.java:653) at org.apache.doris.flink.rest.RestService.findPartitions(RestService.java:620) at org.apache.doris.flink.source.DorisSource.createEnumerator(DorisSource.java:97) at org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:222)

What You Expected?

no error. BTW how to disable the pushdown?

How to Reproduce?

No response

Anything Else?

No response

Are you willing to submit PR?

Code of Conduct

yingh0ng commented 2 months ago

How to Reproduce? I used a founction column in where condition, like: select a, b, c from (select a, b, (a+b) as c from tablea)tableb where c=4;

Tyxuani commented 2 months ago

I had the same problem:

Caused by: org.apache.doris.shaded.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "exception" (class org.apache.doris.flink.rest.models.QueryPlan), not marked as ignorable (3 known properties: "partitions", "status", "opaqued_query_plan"])
 at [Source: (String)"{"exception":"PlannershouldplanjustonlyoneScanNodebutfound[0]","status":500}"; line: 1, column: 15] (through reference chain: org.apache.doris.flink.rest.models.QueryPlan["exception"])
    at org.apache.doris.shaded.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:61)
    at org.apache.doris.shaded.com.fasterxml.jackson.databind.DeserializationContext.handleUnknownProperty(DeserializationContext.java:1127)
    at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:2023)
    at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1700)
    at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownVanilla(BeanDeserializerBase.java:1678)
    at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:319)
    at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:176)
    at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:323)
    at org.apache.doris.shaded.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4674)
    at org.apache.doris.shaded.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3629)
    at org.apache.doris.shaded.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3597)
    at org.apache.doris.flink.rest.RestService.getQueryPlan(RestService.java:645)

POST http://doris.xxx.com:8030/api/frontier/stream/_query_plan basicAuth add db user/name; body:

{"sql": "select `seq`, `meta_seq`, `latest_seq` from `dba`.`tb1` where (pp_yn = 'Y') AND (null = 1) AND (null = 1) AND ((((((((((((((((((((seq = 1) OR (seq = 2)) OR (seq = 3)) OR (seq = 4)) OR (seq = 8)) OR (seq = 9)) OR (seq = 12)) OR (seq = 13)) OR (seq = 16)) OR (seq = 19)) OR (seq = 23)) OR (seq = 24)) OR (seq = 25)) OR (seq = 26)) OR (seq = 28)) OR (seq = 29)) OR (seq = 30)) OR (seq = 34)) OR (seq = 43)) OR (seq = 44)) AND (latest_seq IS NOT NULL)"}

tb1 DDL like:

CREATE TABLE IF NOT EXISTS dba.tb1(
  `a_seq` BIGINT,
  `created_at` DATETIME,
  `seq` INT,
  `meta_seq` BIGINT,
  `latest_seq` BIGINT,
  `sk` VARCHAR(64),
  `pp_yn` VARCHAR(2),
  `is_aa` tinyint(4) NULL,
  `is_bb` tinyint(4) NULL
)
UNIQUE KEY(`a_seq`, `created_at`)
PARTITION BY RANGE(`created_at`) ()
DISTRIBUTED BY HASH(`a_seq`) BUCKETS 1
PROPERTIES(
  "dynamic_partition.enable" = "true",
  "dynamic_partition.time_unit" = "year",
  "dynamic_partition.start" = "-5",
  "dynamic_partition.end" = "1",
  "dynamic_partition.prefix" = "s_",
  "dynamic_partition.create_history_partition"="true",
  "replication_num" = "3"
);

My original condition contains is_aa = 1 AND is_bb = 1, but been parse to (null = 1) AND (null = 1), that cause the issue. I want to know what it happen when flink-doris-connecer parse condition, and how to resolve the issue.

Tyxuani commented 2 months ago

Resolved this by myself: tinyint is_aa can't use to condition is_aa = 1 , need to define as int.

vinlee19 commented 2 months ago

@yingh0ng @Tyxuani We can communicate on WeChat. Here's my WeChat ID: xyelonx