StarRocks / starrocks-connector-for-apache-flink

Apache License 2.0
192 stars 154 forks source link

[Feature] Support Flink catalog #295

Closed banmoy closed 10 months ago

banmoy commented 10 months ago

What type of PR is this:

Which issues of this PR fixes :

Fixes #

Problem Summary(Required) :

Support Flink catalog to ease the usage of the connector. The connector will fetch the metadata, such as schema, via the catalog, and users don't need to declare the table structure explicitly in Flink SQL. The catalog for StarRocks supports the following methods, and other methods are currently not supported.

listDatabases();
getDatabase(String databaseName);
databaseExists(String databaseName);
createDatabase(String databaseName, CatalogDatabase database, boolean ignoreIfExists);
listTables(String databaseName);
tableExists(ObjectPath tablePath);
getTable(ObjectPath tablePath);
createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists);
dropTable(ObjectPath tablePath, boolean ignoreIfNotExists);

An example to access StarRocks table with the catalog in Flink SQL

CREATE CATALOG `sr_catalog` WITH(
    'type' = 'starrocks',
    # FE jdbc server
    'jdbc-url' = 'xxx',
    # FE http server
    'http-url' = 'xxx',
    'username' = 'xxx',
    'password' = 'xxx',
    'default-database' = 'xxx'
);
USE CATALOG `sr_catalog`;

# write data to the StarRocks table `test`.`score_board`. The catalog will
# fetch the metadata of the table from StarRocks, and need to declare a
# create table statement like before
INSERT INTO `test`.`score_board` VALUES (1, 'starrocks', 100);

# read data from the StarRocks table `test`.`score_board`. The catalog will
# fetch the metadata of the table from StarRocks, and need to declare a
# create table statement like before
SELECT * FROM `test`.`score_board`;

Checklist: