apache / doris

Apache Doris is an easy-to-use, high performance and unified analytics database.
https://doris.apache.org
Apache License 2.0
11.82k stars 3.12k forks source link

Flink Doris Integration (not able to create Doris catalog) #31275

Open vshinde-medacist opened 4 months ago

vshinde-medacist commented 4 months ago

Hi Team,

Seeking advice on below.

We are conducting a PoC and are in the process of evaluating Flink and Doris integration using the below versions and dependencies.

Doris is up and running successfully (set up by following Quick Start from official Doris documentation). Able to access FE using - http://10.0.2.15:8030

Below is the PoC code snippet (Java maven project) being used to read the CSV file as Flink FileSystem source and insert it into Doris table as a sink

import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.table.catalog.*;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

public class FlinkDorisIntegration {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(1000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointStorage("file:///home/user/Documents/app_data/checkpoint/");
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // Flink FileSystem CSV format source table in default in-memory catalog
        tableEnv.executeSql("create table IF NOT EXISTS `default_catalog`.`default_database`.`fs_src`(station string, txdate string, txtime string, txseq string)" +
                " with (\n" +
                "'connector' = 'filesystem',\n" +
                "'path'      = 'file:///home/user/Documents/app_data/source/',\n" +
                "'format'    = 'csv',\n" +
                "'csv.ignore-parse-errors' = 'true',\n" +
                "'csv.allow-comments' = 'true',\n" +
                "'source.monitor-interval' = '1s'\n" +
                ");");

            // Create Doris Catalog (Not able to execute this, throwing an exception as illegalargumentexception )
        tableEnv.executeSql("CREATE CATALOG demo_catalog WITH('type' = 'jdbc', 'default-database' = 'db_test', 'username' = 'root', 'password' = '', 'base-url' = 'jdbc:mysql://10.0.2.15:9030')");

            // Switch to Doris Catalog
        tableEnv.executeSql("USE CATALOG demo_catalog;");

        //Create Doris Sink table
        tableEnv.executeSql("CREATE TABLE IF NOT EXISTS db_test.flink_doris_sink (station varchar, txdate varchar, txtime varchar, txseq varchar) " +
                " with (\n" +
                "'connector' = 'doris',\n" +
                "'fenodes'      = '10.0.2.15:8030',\n" +
                "'table.identifier'    = 'db_test.flink_doris_sink',\n" +
                "'username' = 'root',\n" +
                "'password' = '',\n" +
                "'sink.label-prefix' = 'doris_label'\n" +
                ");");

        // Insert into Doris table
        tableEnv.executeSql("INSERT INTO demo_catalog.db_test.flink_doris_sink " +
                " (station, txdate, txtime, txseq)" +
                " SELECT " +
                " station, txdate, txtime, txseq " +
                " FROM `default_catalog`.`default_database`.fs_src; ");

        env.execute();
    }
}

To access Doris table, I need to switch to Doris catalog but getting illegalargumentexception exception while trying to create Doris catalog from Flink. Running code from Intellij Idea and not as a job by submitting a jar to Flink cluster.

Can someone please help with the below:

rohitrs1983 commented 4 months ago

create catalog syntax is incorrect. WITH should be PROPERTIES you need to specify jdbc_url & driver_url

something like this CREATE CATALOG demo_catalog PROPERTIES('type' = 'jdbc', 'default-database' = 'db_test', 'username' = 'root', 'password' = '', 'jdbc_url' = 'jdbc:mysql://10.0.2.15:9030', 'driver_url' = 'mysql-connector-java-8.0.25.jar');

refer to this for details https://doris.apache.org/docs/dev/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-CATALOG/ https://doris.apache.org/docs/dev/lakehouse/multi-catalog/jdbc/

vshinde-medacist commented 4 months ago

@rohitrs1983 - thanks for the suggestion, I tried your suggestion, getting the SQL parser exception as below:

Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "PROPERTIES" at line 1, column 21.
Was expecting one of:
    <EOF> 
    "WITH" ...
    ";" ...

I think - how Flink SQL parser supposed to know Doris specific DDL!