apache / doris

Apache Doris is an easy-to-use, high performance and unified analytics database.
https://doris.apache.org
Apache License 2.0
12.66k stars 3.27k forks source link

[Proposal] Doris support version column for REPLACE aggregate type #3930

Open Youngwb opened 4 years ago

Youngwb commented 4 years ago

BackGround

Doris currently use REPLACE to update data, but the replacement order cannot be guaranteed for the data import of the same batch. The user needs to guarantee that there is no same key column in the imported data of the same batch to guarantee the replacement order, which is very inconvenient for the user. To solve this problem, we can use a version column to specify the replacement order.

Goal

The user specifies a version column when creating the table. Doris relies on this column to update the data of REPLACE type. The larger version column data can REPLACE the data of the smaller version column, while the data of the smaller version column cannot REPLACE the larger version column data.

Create Table Interface

CREATE TABLE `test` (
`id` bigint(20) NOT NULL,
`date` date NOT NULL,
`group_id` bigint(20) NOT NULL,
`version` int MAX NOT NULL,
`keyword` varchar(128) REPLACE NOT NULL,
`clicks` bigint(20) SUM NULL DEFAULT "0" ,
`cost` bigint(20) SUM NULL DEFAULT "0" 
) ENGINE=OLAP
AGGREGATE KEY(`id`, `date`, `group_id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 16
PROPERTIES (
  "replace_version_column" = "version"
);

When creating a table, the user simply adds the replace_version_column attribute in PROPERTIES to identify the version column, which requires a MAX aggregation type to ensure that only the largest version column is retained for the same key column.

Query

When a user's query does not contain the REPLACE column, the original logic follows. When a user's query contains REPLACE columns, BE needs to extend the Version column on which the REPLACE column depends, and compare the value column when it is aggregated. These operations can be done by extending Reader return columns, and in FE,the isPreAggregation is OFF because of the REPLACE column is value column in StorageEngine ,which means the storage engine needs to aggregate the data before returning to scan node,so we can guarantee that the same key columns will be aggregated in Reader.

Compaction

Base and Cumulative Compaction use Reader to aggregate data, and it use all tablet columns as return columns, so similar to the query processing, we can use Reader for replace based on version columns.

Load

With the same batch of data load, Doris uses one or more MemTable. We need to ensure that the same key column in one MemTable, columns of REPLACE type are replaced with version column, while the data in different MemTable is not guaranteed in LOAD because Query and Compaction guarantee the order of replacement.

RollUp

If rollup contains a column of REPLACE type, we need the user to add the Replace version column or extend the column automatically.

morningman commented 4 years ago

How to generate the value of the version column? Is is auto generated? Or it has to be in the user's origin load data?

Youngwb commented 4 years ago

How to generate the value of the version column? Is is auto generated? Or it has to be in the user's origin load data?

it's user's origin load data

yangzhg commented 4 years ago

I suggest that the columns used for keeping order should not exist in the create table statement, but as a hidden column. When creating a unique table, add a property such as sequence=true, so that be can automatically add a hidden one after the key column. The hidden key column is only used for sorting, so that it can be used as a sorting basis when doing compaction and read. A column may be configed as a sequence column during load

morningman commented 4 years ago

I suggest that the columns used for keeping order should not exist in the create table statement, but as a hidden column. When creating a unique table, add a property such as sequence=true, so that be can automatically add a hidden one after the key column. The hidden key column is only used for sorting, so that it can be used as a sorting basis when doing compaction and read. A column may be configed as a sequence column during load

+1.

The only function of the version column is to retain the order information of the data in the business logic. So if the version column appears in the table schema explicitly, we cannot control the user's query behavior for this column.

Youngwb commented 4 years ago

According to @morningman @yangzhg 's suggest, I made some corrections

name

use sequence column instead of version column for user understand easy.

Create table

Use UNIQUE_KEYS instead of AGG_KEYS. Because sequence column is a hidden column, there is no need to create a version column with MAX AGG_TYPE.

CREATE TABLE `test_1` (
`pin_id` bigint(20) NOT NULL COMMENT "",
`date` date NOT NULL COMMENT "",
`group_id` bigint(20) NOT NULL COMMENT "",
`keyword` varchar(128)  NOT NULL
) ENGINE=OLAP
UNIQUE KEY(`pin_id`, `date`, `group_id`)
PROPERTIES (
  "function_column.sequence_type" = "int"
);

like such example, user need to add sequence_type to Identify the sequence column type. It only support the Integer types (int, bigint, largeint) and time types(date, datetime). User can't query the sequence_column hidden in table , but can add one column which value is equal to sequence_column. like this

CREATE TABLE `test_2` (
`pin_id` bigint(20) NOT NULL COMMENT "",
`date` date NOT NULL COMMENT "",
`group_id` bigint(20) NOT NULL COMMENT "",
`sequence_visiable` int NOT NULL,
`keyword` varchar(128)  NOT NULL
) ENGINE=OLAP
UNIQUE KEY(`pin_id`, `date`, `group_id`)
PROPERTIES (
"function_column.sequence_type" = "int"
);

Column names are not necessarily "sequence_visiable", this is just an example. The user ensures that the values are same by specifying parameters at LOAD time

LOAD

Stream Load

curl --location-trusted -u root -H "columns: pin_id,date,group_id,source_sequence,keyword" -H "function_column.sequence_col: source_sequence" -T test_load http://127.0.01:8030/api/test/test_1/_stream_load

Broker Load

LOAD LABEL test.test11
(
    DATA INFILE("hdfs://path/to/load_file")
    INTO TABLE `test_1`
    FORMAT AS "parquet"
    (pin_id,date,group_id,source_sequence,keyword)
    ORDER BY source_sequence
) with BROKER broker_name (...)

Routine Load

CREATE ROUTINE LOAD test_1_job ON test_1
COLUMNS TERMINATED BY ",",
(pin_id,date,group_id,source_sequence,keyword)
ORDER BY source_sequence
PROPERTIES
(
    "desired_concurrent_number"="3",
    "max_batch_interval" = "30",
    "max_batch_rows" = "1000000",
    "max_batch_size" = "509715200"
) FROM KAFKA
(
    "kafka_broker_list" = "...",
    "kafka_topic" = "...",
    "property.client.id" = "...",
    "property.group.id" = "..."
);

I added a parameter sequence_col to identify the source data for the sequence column at load, because it's hidden column, user need to identify the source column in columns_mapping.

For table test_2 which has column sequence_visiable, user can set "sequence_col" = "sequence_visiable" at properties, which means the hidden column "sequence_col" is same with the "sequence_visiable" in table, user can query the the column "sequence_visiable" instead of "sequence_col"

morningman commented 4 years ago

If user create sequence_visible column in table, does it still has a hidden sequence column? In my point, it should has.

Youngwb commented 4 years ago

If user create sequence_visible column in table, does it still has a hidden sequence column? In my point, it should has.

yes, it has.

chaoyli commented 4 years ago

I think it's better to handle the problem in load. If you want to preserve the ordering, you can speculate the behavior in the load command. CreateTable should not catch this logic, otherwise it's confused to people to understand the create table.

Youngwb commented 4 years ago

I think it's better to handle the problem in load. If you want to preserve the ordering, you can speculate the behavior in the load command. CreateTable should not catch this logic, otherwise it's confused to people to understand the create table.

If a sequence column is generated during the Load command phase, we need to consider the case where the user first load and specified a sequence column and then did not, or did not specify a sequence column the first time and then specified a sequence column. This will make the analysis of the load phase more complicated

chaoyli commented 4 years ago

I think it's better to handle the problem in load. If you want to preserve the ordering, you can speculate the behavior in the load command. CreateTable should not catch this logic, otherwise it's confused to people to understand the create table.

If a sequence column is generated during the Load command phase, we need to consider the case where the user first load and specified a sequence column and then did not, or did not specify a sequence column the first time and then specified a sequence column. This will make the analysis of the load phase more complicated

I think it's a problem about stable sort. If we can ensure the data received lately been flushed lately. It seems can solve the problem rightly?

Youngwb commented 4 years ago

I think it's better to handle the problem in load. If you want to preserve the ordering, you can speculate the behavior in the load command. CreateTable should not catch this logic, otherwise it's confused to people to understand the create table.

If a sequence column is generated during the Load command phase, we need to consider the case where the user first load and specified a sequence column and then did not, or did not specify a sequence column the first time and then specified a sequence column. This will make the analysis of the load phase more complicated

I think it's a problem about stable sort. If we can ensure the data received lately been flushed lately. It seems can solve the problem rightly?

yes, it's a problem about stable sort, I don't understand this "data received lately been flushed lately." . I think the node responsible for flush data does not guarantee orderly receipt of data,for example, the Broker Load will scan in multiple BE at the same time and send to the corresponding node, so that the order of sending data in the same key column cannot be guaranteed, and the results will be different even if the same source data is loaded multiple times

chaoyli commented 4 years ago

@Youngwb Sorry, I am busy to forget to reply you. If you add version to data, how to prevent the data lately read with small version?

Youngwb commented 4 years ago

how to prevent the data lately read with small version?

The small version cannot replace the data with bigger version, it will compare the version column(or sequence column) when the key columns are the same.

chaoyli commented 4 years ago

My problem is how to set version in source data in HDFS? If two records have the same key but belongs to different files. The version is assigned by load or be assigned in files in HDFS before load?

Youngwb commented 4 years ago

My problem is how to set version in source data in HDFS? If two records have the same key but belongs to different files. The version is assigned by load or be assigned in files in HDFS before load?

This Version column is a column in the source data and is specified by the user at load time

chaoyli commented 4 years ago

I see it. You want to replace the recored according to the the version specified by user's application. If we add a replace property in load command, can we use it to do replace behavior without creating a special table?

Youngwb commented 4 years ago

I see it. You want to replace the recored according to the the version specified by user's application. If we add a replace property in load command, can we use it to do replace behavior without creating a special table?

I need to specify the type of sequence column in the create table statement, and If I create a sequence column during the load phase, this columns-adding schema change operation will complicate the load

chaoyli commented 4 years ago

How about discuss it using WeChat. It may be efficient. My WeChat : 15652918147

BabySid commented 4 years ago

有两个场景,麻烦看下: 1、是否支持Aggregate表?即表中有replace(replace_if_not_null)聚合模式以及其他聚合模式。 2、如果表中有多个replace/replace_if_not_null列,不同列的更新版本是不同的。这类的如何支持?是否要针对列做版本

如表字段为key1, v1(replace), v2(replace_if_not_null) 几行数据(默认为大的覆盖小的,如v1.2 可以替换v1.0、v1.1) k1, v1.1, (NULL) => 表数据为k1, v1.1, null k1, v1.0, v2.1 => 表数据为 k1, v1.1, v2.1 k1, v1.3, v2.0 => 表数据为 k1, v1.3, v2.1 这个的背景是不同的字段的版本时间不同(业务上一般拿数据生产时间作为版本,不同的字段由于在多个数据流计算,导致生产时间不一致)