StarRocks / starrocks

The world's fastest open query engine for sub-second analytics both on and off the data lakehouse. With the flexibility to support nearly any scenario, StarRocks provides best-in-class performance for multi-dimensional analytics, real-time analytics, and ad-hoc queries. A Linux Foundation project.
https://starrocks.io
Apache License 2.0
8.93k stars 1.79k forks source link

Proposal: A new unified loading/unloading SQL syntax and design #4239

Closed jaogoy closed 1 year ago

jaogoy commented 2 years ago

I have put this article in Google docs, you can comment here A new unified loading/unloading SQL syntax and design in detail for convenience.

Background

  1. There are too many types of loading in StarRocks, Stream Load / Routine Load / Broker Load / Spark Load, …
  2. Different types of loading have different SQL syntax and different implementations which are hard to maintain and cost more engineering resources;
  3. The unloading syntax isn't similar to the loading syntax, and its function is weak;

Purpose

  1. Unify loading/unloading SQL syntax;
  2. Separate loading stages clearly, including different types of source, such as Kafka, Pulsar, Flink, or just Local host;
  3. Support more properties for different file types, such as CSV, JSON, ORC, Parquet, etc.

Some Research before the design

  1. The loading/unloading SQL syntax of snowflake looks good;
  2. Some other databases are also under investigation, including Amazon Redshift, ClickHouse, MySQL, Vertica, etc.

Thanks to those engineers and designers already doing a lot of work in this area.

Design

There are three scenarios:

  1. Loading data from Local host.
  2. Loading data from Cloud, such as HDFS, S3, etc.
  3. Continuously loading data from a stream, such as Kafka, Pulsar, etc.

We refer to them accordingly as Local Loading, Loading, Continuous Loading.

  1. Local Loading: The user PUSHES the data from the Local host into the StarRocks by calling a script or using a piece of Java code through HTTP protocol or other protocols. Almost the same as the current Stream Load.
  2. Loading: The user calls a COPY command (a SQL statement) through the MySQL protocol, and then the StarRocks system loads the data from HDFS, S3, etc. (maybe through Brokers)
  3. Continuous Loading: The user calls a STREAM COPY command (a SQL statement containing a sub COPY command) through the MySQL protocol, and then the StarRocks system triggers a PERIODICALLY scheduling task or something else which will read data from a source, such as Kafka and Pulsar, and loads the data into the system. That means the data will be CONTINUOUSLY loaded like a stream.

Loading

Local Loading

Almost an enhancement of current Stream Load

  1. Still use the HTTP protocol to Load data from Local host.
  2. The same Loading SQL syntax as Continuous Loading which will be described below.

There are two similar ways to load data from Local host.

A single curl post

We can load data from Local host by using a single cURL post with the loading SQL statement directly in the Header parameters.

curl -H "sql:${sqlStatement}" -H "<otherHeaders>"
    -T /path/to/data.csv
    -XPUT http://fe_host:http_port/api/{db}/{table}/_stream_load

Where the sqlStatement is roughly described as below (It should be flattened to a single line in the cURL command):

COPY INTO [<db_name>].<table_name>
[ transfomationAndFilter ]
FROM PIPE -- indicates that the data comes from a client through a pipe
WITH LABEL <db_name>.<label_name>
parameters
...
  1. It's easy to use, without too much change.
  2. However, there are some disadvantages: the Header only supports a limited length. \r,\n could NOT be placed in the statement directly (We should use \x0d , \x0a instead).

It's the recommended way when the copy statement is not too complicated.

Two steps curl post

  1. Create a loading PIPE first using a similar cURL post:

    curl -H "sql-type:FILE" -T /path/to/copy_data.sql
        -XPUT http://fe_host:http_port/api/_stream_load
    • The "sql-type:FILE" header parameter indicates that the file to be uploaded is a SQL file and not a data file as usual
    -- file copy_data.sql
    CREATE [ OR REPLACE ] PIPE [ IF NOT EXISTS ] <pipe_name>
        AS <copy_statement>
    • StarRocks will create a PIPE object which will wait for incoming data and load the data into the system with COPY command.
  2. Upload data through the PIPE created above with another cURL post:

    curl -H "sql-pipe:<pipe_name>" -T /path/to/data.csv
    -XPUT http://fe_host:http_port/api/{db}/{table}/_stream_load
    • The "sql-pipe:<pipe_name>" header parameters indicate that this cURL post will use a loading PIPE already created before.
    • We can use the PIPE as long as we want without creating it every time.

Loading & COPY command

We can just call a COPY command to load data from Cloud data.

The COPY command has two main forms:

/* Standard data load */
COPY INTO [<db_name>].<table_name> 
FROM location
[ WITH LABEL '<db_name>.<label_name>' ]
[ FILES = ( '<file_name>' [ , '<file_name>' ] [ , ... ] ) ]
[ PATTERN = '<regex_pattern>' ]
[ DATA_FORMAT = ( TYPE = { CSV | JSON | ORC | PARQUET } [ formatTypeOptions ] ) ]
[ copyOptions ]
[ validationOptions ]
[ otherOptions ]

location ::=
    { SOURCE '<source_name>' |   -- stream data such as from KAFKA may be encapsulated as a SOURCE
      { PIPE | HDFS | S3 | ... }
      [( 'hdfs://path' [,...]   -- different types with different detail parameters
          RESOURCE = ( { RESOURCE_NAME = '<resource_name>' | resourceOptions )
        ) ] }

resourceOptions ::=  /* some binding relationship with location */
    [ endpoint = '<endpoint_list>' ]
    [ CREDENTIALS-info ]  /* user/password;AK/SK;kerbros;HA; IAM; etc. */

/* Data load with transformation
 */
COPY INTO [<db_name>].<table_name> 
[ (column [,...] ) ]
SELECT { * | { column-expression | column } [,...] }
FROM location
[ WITH FIELDS(['<field>' [AS] ] <source_field_name> [,...] ) ]
[ whereClause ]
[ WITH LABEL <db_name>.<label_name> ]
...
  1. Mainly similar to Snowflake COPY syntax.

  2. Enhance transformation & filter to support stronger and more flexible expressions. (will be detailed as follows)

  3. Use WITH FIELDS to rename column names for convenience:

    • When loading JSON format data:
      • We can use the outermost keys of a JSON object as column names directly.
      • We can also use WITH FIELDS('$.id' as id, '$.a' as ax) to rename any JSON value with a JSON path like jsonpaths as before.
    • When loading CSV format data:
    • We can directly use WITH FIELDS(a, b, c) statement to rename CSV fields from 1 to n.
  4. Location indicates the location where data come from, such as a SOURCE, a PIPE, or HDFS/S3 files.

  5. RESOURCE could be created as below, or just be unfolded in the COPY command:

CREATE RESOURCE <resource_name>
    endpoint = '<endpoint_list>'
    [ CREDENTIALS-info ]  /* user/password;AK/SK;kerbros;HA; IAM; etc. */
  1. Format Type Options ( formatTypeOptions ) includes some parameters same as Snowflake's formatTypeOptions and more:
    1. use_header_format: whether to use the header line names as column names in sequence.
    2. skip_empty_line: whether to skip lines only with white space.
    3. ...

Transformation & Filter

The syntax will be mainly as below:

[ (x, y, w, ...) ]  -- field names in the target table, so we can omit some unwanted fields
                    -- there are 90 columns more
select year(datetime($11)), month(datetime($11)), day(datetime($11))
    , $1..$10, $21..$100
from location
with fields(a, b, c, ...)
where $2::int > 2
  1. The columns of the target table. Because they can be directly copied from the target table (using show create table <table_name> ), it won't be too troublesome.
  2. The select supports the configuration of a range of fields like $1..$10 for easy writing and maintenance. At the same time, you can also write ordinary functions (such as array([$2,$3,$5]) or some more complex functions) or a single column.
  3. If you want to use named columns, you can use WITH FIELDS(…) , but it should be either empty or all listed.
  4. If there is an error or ambiguity in the type derivation in the where clause, you should give a type prompt, but only needed when it's required;

Some simple examples:

-- All the columns will be mapped one-one using their positions
<nothing-to-specify>

-- Select some columns from source file with column ids(positions)
select $1, $1+$2, to_string(c)

-- Select 99 columns from column 1 to column 99 of the source file,
--   and specify the data type of column 100 for filter
select $1..$99
where $100::int > 2

-- Name all the columns from the source file, select them by names
--   and write these three columns to target table columns x, y, w separately
--   with column z using its default value
(x, y, w)
select a, a+b, to_string(c)
from location
with fields(a, b, c)
where b::int > 2

Continuous Loading

A StreamCopy seems like a wrapper of COPY command with a data source and some schedule parameters additionally. But, it will do a lot of work besides COPY.

CREATE STREAM COPY <name>
    dataSource
    scheduleParameters
    [ parameters ]
    AS <copy_statement>  -- COPY INTO <table>

dataSource ::=
    [ DATA_SOURCE = { '<source_name>' |
                      { KAFKA | PULSAR ... } ( key1 = value1 [...] ) } ]

scheduleParameters ::=
    [ SCHEDULE_INFO = ( k1 = v1 [...] ) ]
  1. A StreamCopy integrates three parts: connector, schedule, and sink. It also has an implicit state (including offset) inside.
    1. SCHEDULE makes a periodical loading.
    2. CONNECTOR will connect to a data source and read data.
    3. SINK will pass through the data read by the CONNECTOR to COPY command.
  2. All the parameters about the external data source, like Kafka, Pulsar, will be put in the dataSource clause.
  3. scheduleParameters are some parameters about schedule, such as max_batch_interval , max_batch_size , max_error_number , etc.

Example:

CREATE STREAM COPY <name>
    DATA_SOURCE = KAFKA (
        "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092"
        "kafka_topic" = "my_topic"
        "kafka_partitions" = "0,1,2,3"
        "kafka_offsets" = "101,0,0,200" )
    AS COPY INTO <table_name>
    FROM PIPE
    copyOptions
    ...

CREATE PIPE

For other situations in which that outer system will implement connector + schedule + sink , StarRocks will supply a PIPE definition:

CREATE PIPE <name>
    [ parameters ]
    AS <copy_statement>
  1. Stream Load and Routine Load can be integrated together.
  2. Outer sink systems can put data into the PIPE continuously through an HTTP protocol or others. It seems like a stream.

Unloading

COPY command

Similar to Loading, we can use a COPY command to copy data from the internal table to a specified location.

COPY INTO location  
FROM { [<db>.]<table_name> | ( <query> ) }
[ PARTITION BY <expr> ]
[ DATA_FORMAT = ( TYPE = { CSV | JSON | PARQUET } [ formatTypeOptions ] ) ]
[ copyOptions ]
[ HEADER ]
  1. Similar to Loading, the INTO location clause can be unfolded into a more complicated form.
  2. It may be detailed later or in another proposal.

Local Unloading

COPY INTO PIPE
FROM { [<db>.]<table_name> | ( <query> ) }
...

Some thoughts

Some command names could be changed to more meaningful words.

  1. Does the STREAM COPY command sound appropriate? Will ROUTINE COPY , SINKER , or others be more suitable?

  2. Shell we define a Stream object to store the state of the StreamCopy task?

    CREATE STREAM COPY <name>
        DATA_SOURCE = KAFKA (
            "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092"
        )
        STREAM = (
            "kafka_topic" = "my_topic"
            "kafka_partitions" = "0,1,2,3"
            "kafka_offsets" = "101,0,0,200" 
        )
        SCHEDULE_INFO = ()
        AS COPY INTO <table_name>
        FROM PIPE
        copyOptions
        ...
    • Then we can separate several parts clearly.
    • And when we want to implement a CDC stream on a table, they will be similar to each other.
  3. Should we use STREAM instead of PIPE?

  4. What's the difference between COPY and INSERT INTO SELECT ? Which one will be more in line with our intuition when we want to load data from an External Table, such as from S3 or Iceberg?

I'm very glad to hear from you anything about this proposal.

imay commented 2 years ago

@jaogoy I think you can paste it in Google doc and paste the link in this issue. And everyone can review on that google doc

jaogoy commented 2 years ago

I have put this article in Google docs, you can comment A new unified loading/unloading SQL syntax and design in detail for convenience.

github-actions[bot] commented 2 years ago

We have marked this issue as stale because it has been inactive for 6 months. If this issue is still relevant, removing the stale label or adding a comment will keep it active. Otherwise, we'll close it in 10 days to keep the issue queue tidy. Thank you for your contribution to StarRocks!