StarRocks / starrocks-connector-for-apache-flink

Apache License 2.0
189 stars 152 forks source link

[BugFix] predicate push-down time dimension table error #375

Open hhoao opened 1 month ago

hhoao commented 1 month ago
image image

第一张图表示flink api传过来的是谓词下推后的下标,使用该下标会构建错filterRichInfo

如何复现: `package org.example;

import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

public class StarRocksSqlDimApp { public static void main(String[] args) { Configuration configuration = new Configuration(); configuration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(30));

    StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
    StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(executionEnvironment);
    streamTableEnvironment.executeSql("CREATE TABLE source (\n" +
        "  id INT,\n" +
        "  name STRING,\n" +
        "  proc_time AS PROCTIME()" +
        ") WITH (\n" +
        "   'connector' = 'starrocks',\n" +
        " 'password' = '',\n" +
        " 'table-name' = 'source', \n" +
        "  'scan-url' = '', \n" +
        " 'username'='root',\n" +
        "   'jdbc-url' = '',\n" +
        "   'database-name' = 'test'\n" +
        ");");
    streamTableEnvironment.executeSql("CREATE TABLE dim (\n" +
        "  id INT,\n" +
        "  name STRING\n" +
        ") WITH (\n" +
        "   'connector' = 'starrocks',\n" +
        " 'password' = '',\n" +
        " 'table-name' = 'dim', \n" +
        "  'scan-url' = '', \n" +
        " 'username'='root',\n" +
        "   'jdbc-url' = 'jdbc:mysql:///',\n" +
        "   'database-name' = 'test'\n" +
        ");");

    streamTableEnvironment.executeSql("CREATE TABLE sink (\n" +
        "  id INT,\n" +
        "  name STRING\n" +
        ") WITH (\n" +
        "   'connector' = 'starrocks',\n" +
        " 'password' = '',\n" +
        " 'table-name' = 'sink', \n" +
        "  'load-url' = '', \n" +
        " 'username'='root',\n" +
        "   'jdbc-url' = 'jdbc:mysql:///',\n" +
        "   'database-name' = 'test'\n" +
        ");");

    TableResult tableResult = streamTableEnvironment.executeSql(
        "INSERT INTO sink\n" +
            "SELECT source.id, dim.name FROM source\n" +
            "LEFT JOIN\n" +
            "dim for SYSTEM_TIME as OF source.proc_time\n" +
            "on dim.name = source.name;");
    tableResult.print();
}

} `