apache / incubator-streampark

Make stream processing easier! Easy-to-use streaming application development framework and operation platform.
https://streampark.apache.org/
Apache License 2.0
3.85k stars 990 forks source link

[SPIP-2][Flink] Introduce catalog managed. #3912

Open Mrart opened 1 month ago

Mrart commented 1 month ago

Search before asking

Motivation

There are many internal business database systems, and the number and variety of upstream and downstream databases bring a lot of management difficulties. At the same time, most of the current business systems need to connect to Flink CDC for data synchronization. Flink itself supports metadata management, Streampark does not implement metadata management function at present, in order to improve the convenience of users to use data, we designed the metadata management function.

Design Detail

Product design

catalog management refers to the catalog management design of Alibaba Cloud real-time computing products, including jdbc catalog, hive catalog, paimon catalog, etc. jdbc catalog(mysql/pgsql)

image

hive catalog

image

paimong catalog

image

Catalog implementation detailed design

image

As shown

  1. The user realizes crud metadata management by streampark job management and adding metadata management functions
  2. When you start a flink sql job, you use the 'use catalog' to call the catalog and ship catalogstore plugin into the flink job runtime classloader.
  3. When the job starts, it recognizes the use catalog c1, links to the catalogstore through the configuration information in flink-conf, reads the catalog information, and resolves the catalog. Among them flink-conf.yaml table.catalog-store.kind: jdbc table.catalog-store.jdbc.url: jdbc:mysql://127.0.0.1:3306/database table.catalog-store.jdbc.table-name: t_flink_catalog table.catalog-store.jdbc.driver: mysql.driver table.catalog-store.jdbc.username: test table.catalog-store.jdbc.password: test table.catalog-store.jdbc.max-retry-timeout:10

Design of catalog information table in streampark

create table if not exists t_flink_catalog (
       `id` BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1 INCREMENT BY 1),
       `team_id` bigint not null,
       `user_id` bigint default null,
       `catalog_type` varchar(255) not NULL,
       `catalog_name` VARCHAR(255) NOT NULL,
       `configuration` text,
       `create_time` TIMESTAMP WITHOUT TIME ZONE DEFAULT NULL,
       `update_time` TIMESTAMP WITHOUT TIME ZONE DEFAULT NULL,
       CONSTRAINT uniq_catalog_name UNIQUE (`catalog_name`)
    );

Compatibility, Deprecation, and Migration Plan

No response

Test Plan

No response

Code of Conduct

References

  1. https://cwiki.apache.org/confluence/display/FLINK/FLIP-295%3A+Support+lazy+initialization+of+catalogs+and+persistence+of+catalog+configurations
  2. https://help.aliyun.com/zh/flink/user-guide/manage-hive-catalogs?spm=a2c4g.11186623.0.0.3fe75db0rRCV6y
  3. https://docs.qq.com/doc/DRVN5Q0tDZ093YmxE?u=0c37af373c1a46cea6607956b957dd59
Mrart commented 1 month ago

https://github.com/apache/incubator-streampark/pull/3914

Mrart commented 1 month ago

Implementation steps:

1.[Done] Catalog store plugin .catalog store plugin 2.[Done] Catalog curd.Introduce catalog managed 3.[Doing] Streampark start job ship catalogstore plugin ship catalog plugin. 4.[To begin] Catalog managed frontend. 5.[Doing] Catalog database/table managed (backend/Frontend)

wolfboys commented 1 month ago

Looks good

Mrart commented 2 weeks ago

@wolfboys All the backend code has been pushed, please review it ASAP.