apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
7.95k stars 1.8k forks source link

[Umbrella][Feature] Introduce the basic workflow framework based on flinksql #716

Closed xleoken closed 2 years ago

xleoken commented 2 years ago

Search before asking

Description

The apache flink already contains rich connectors, and support sql grammar, batch and streaming source connectors, also contains many other excellent features, we can benefit from it.

Several advantages:

  1. more sources and sinks are supported, like kafka, hbase, greenplum.
  2. workflows are isolated, each workflow is an independent yarn/k8s application.
  3. lightweight ETL data process, like filter null.
  4. support dimension table lookup in serveral cases.
  5. support customized udfs.

To implement the new workflow, we need to use flink table api, it will help us handle the table schema, field datatype, and more high sql semantics, also it supports the integration of multiple data sources/sinks catalog.

Here is the flinksql workflow demo,

CREATE TABLE kafka_source (
  customerId int,
  oStatus int,
  nStatus int
) with (
  'connector.type' = 'kafka',
  ...
  'connector.startup-mode' = 'earliest-offset',
  'format.type' = 'json'
);

CREATE TABLE fs_source (
  customerId int,
  oStatus int,
  nStatus int
) with (
  'connector.type' = 'filesystem',
  ...
  'path' = 'hdfs:///data/2021/06/01/xx.txt',
  'format.type' = 'json'
);

INSERT INTO fs_source
SELECT * FROM kafka_source
WHERE oStatus != 0;

Related issues

No response

Are you willing to submit a PR?

Code of Conduct

Task List

xleoken commented 2 years ago

cc @dailidong @garyelephant @RickyHuo, share your option when you are free, thanks

davidzollo commented 2 years ago

INSERT INTO fs_source SELECT * FROM kafka_source WHERE oStatus != 0;

thanks for your detail design.

I think you can refer more open source projects, there is an open source project named DBT(https://www.getdbt.com) , maybe we can have a better solution

xleoken commented 2 years ago

@dailidong yeah, I plan to do the same thing that let workflow based on SQL.

because many user are very familiar with SQL, they dont't have to spend more time to learn new api.

image

RickyHuo commented 2 years ago

@leo65535 Thanks for your suggestion. In SeaTunnel, the Input source will register as a table, using SQL transform plugin could generate workflow based on SQL also. It seems as same as you said in some ways.

xleoken commented 2 years ago

hi @RickyHuo

In SeaTunnel, the Input source will register as a table, using SQL transform plugin could generate workflow based on SQL also

Yeah, you are right.

This point of this feature is introduce a new workflow template, show you the workflow demo.

The origin workflow

env {
  execution.parallelism = 1
}

source {
    FakeSourceStream {
      result_table_name = "fake"
      field_name = "name,age"
    }
}

transform {
    sql {
      sql = "select name,age from fake"
    }
}

sink {
  ConsoleSink {}
}

The feature workflow

CREATE TABLE fake_source (
  name string,
  age int
) with (
  'connector.type' = 'fakestream',
  'format.type' = 'json'
);

CREATE TABLE print_sink (
  name string,
  age int
) with (
  'connector.type' = 'print',
);

INSERT INTO print_sink
SELECT * FROM fake_source;