apache / doris

Apache Doris is an easy-to-use, high performance and unified analytics database.
https://doris.apache.org
Apache License 2.0
12.53k stars 3.24k forks source link

[Proposal] Doris supports batch delete On UNIQUE_KEYS table #4051

Closed yangzhg closed 4 years ago

yangzhg commented 4 years ago

Doris supports batch delete

BACKGROUND

At present, Doris supports multiple import methods such as broker load, routine load, stream load, etc. For the deletion of data, it can only be deleted through the delete statement. When the delete statement is used, a new version of the data will be generated every time the delete is executed. If frequent deletions will seriously affect query performance, and when using delete mode to delete, it is achieved by generating an empty rowset to record the deletion conditions. Each read must filter the deletion jump condition, also when there are many conditions Impact on performance. Compared with other systems, the implementation of greenplum is more like a traditional database product, and snowflake is implemented through merge syntax.

For a scenario similar to the import of cdc data, insert and delete are generally interspersed in the data data. In the face of this scenario, our current import method cannot be satisfied, even if we can separate insert and delete. Problem, but still cannot solve the problem of deletion.

Design goals

Functional level:

Enhance the import function so that it can support the following scenarios:

Ease of use:

Minimize the modification of import syntax, and be compatible with the current import syntax

Performance

Import and read performance should be basically the same as the current import method, and there should not be too much performance loss

detailed design

The import syntax is to add a column to indicate whether the current row is imported or deleted. If there is no default behavior to insert rows, the function of this level of upgrade is only implemented on segmentV2, v1 is not considered for the time being, in the index file of the segment file IndexRegion A bitmap index similar to null bitmap is added to mark the rows to be deleted.

Data structure design

A bitmap index (delete_index_page) needs to be added to the segment structure to indicate which row is marked for deletion. The PagePointerPB structure is the same as previously defined, using bitmap as the index.

message SegmentFooterPB {
    optional uint32 version = 1 [default = 1]; // file version
    repeated ColumnMetaPB columns = 2; // tablet schema
    optional uint32 num_rows = 3; // number of values
    optional uint64 index_footprint = 4; // total idnex footprint of all columns
    optional uint64 data_footprint = 5; // total data footprint of all columns
    optional uint64 raw_data_footprint = 6; // raw data footprint

    optional CompressionTypePB compress_type = 7 [default = LZ4F]; // default compression type for file columns
    repeated MetadataPairPB file_meta_datas = 8; // meta data of file

    // Short key index's page
    optional PagePointerPB short_key_index_page = 9;,
    // Use bitmap index to indicate which row is marked for deleting
    optional PagePointerPB delete_index_page = 10;
}

Import syntax

The syntax design of the import is mainly to add a column map that specifies the field to delete the marked column, and this column needs to be added to the imported data. The method of setting each import method is as follows

stream load

The writing of stream load adds a field to set the deletion mark column in the column field of the header, example -H "columns: k1, k2, label_c3" -H "merge_type: [MERGE|APPEND|DELETE]" -H "delete: label_c3=1"

broker load

Set the field to delete the marked column at PROPERTIES

LOAD LABEL db1.label1
(
    [MERGE|APPEND|DELETE] DATA INFILE("hdfs://abc.com:8888/user/palo/test/ml/file1")
    INTO TABLE tbl1
    COLUMNS TERMINATED BY ","
    (tmp_c1,tmp_c2, label_c3)
    SET
    (
        id=tmp_c2,
        name=tmp_c1,
    )
    [DELETE ON label=true]

)
WITH BROKER'broker'
(
    "username"="user",
    "password"="pass"
)
PROPERTIES
(
    "timeout" = "3600"
);

reoutine load

Routine load adds mapping in the columns field. The mapping method is the same as above, the example is as follows

   CREATE ROUTINE LOAD example_db.test1 ON example_tbl
   [WITH MERGE|APPEND|DELETE]
    COLUMNS(k1, k2, k3, v1, v2, label),
    WHERE k1> 100 and k2 like "%doris%"
    [DELETE ON label=true]
    PROPERTIES
    (
        "desired_concurrent_number"="3",
        "max_batch_interval" = "20",
        "max_batch_rows" = "300000",
        "max_batch_size" = "209715200",
        "strict_mode" = "false",

    )
    FROM KAFKA
    (
        "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
        "kafka_topic" = "my_topic",
        "kafka_partitions" = "0,1,2,3",
        "kafka_offsets" = "101,0,0,200"
    );

Import

The process of data import is as follows:

When the imported data contains a delete mark and the delete mark is true, write the data and record the line number of the secondary line in the segment, and record it in the delete index, otherwise write the data directly, there can be an optimization here when the mark is deleted The value column of this row can be set to the value of the most space occupied by the corresponding type. For example, for the varchar type, we can set the value to an empty string to save space.

st=>start: Start Load
flag_cond=>condition: delete flag is true
write_rowdata=>operation: write data
write_rowdata_opt=>operation: write Data with minimum values
write_delete_index=>operation: write delete index
e=>end

st->flag_cond
flag_cond(yes)->write_rowdata_opt->write_delete_index->e
flag_cond(no)->write_rowdata->e

image

Suppose there is a table

+-------+-------------+------+-------+---------+---------+
| Field | Type        | Null | Key   | Default | Extra   |
+-------+-------------+------+-------+---------+---------+
| k1    | INT         | Yes  | true  | 0       |         |
| k2    | SMALLINT    | Yes  | true  | NULL    |         |
| k3    | VARCHAR(32) | Yes  | true  |         |         |
| v1    | BIGINT      | Yes  | false | 0       | REPLACE |
+-------+-------------+------+-------+---------+---------+

Import data is

0,1,foo,3,false
0,1,foo,2,true
1,2,bar,3,true
0,1,foo,5,false

If it is UNIQUE_KEYS, only the latest result can be recorded, so the first line and the second line are meaningless and will be ignored when importing the aggregation. Then the data recorded in the data area of the rowset is

1,2,bar,3
0,1,foo,5

delete_index is the record [1,0] in the bitmap, indicating that the first line in the batch of imported data is marked for deletion.

Read

UNIQUE_KEYS table

At present, when the UNIQUE_KEYS table is read, the data is merged from the high version to the floor version. Since unique_keys can ensure that there is only one record in each segment, only the first version needs to be read. If it is marked for deletion, skip all. Otherwise, return to the first line directly, and skip the rest.

Compaction

In compaction, the functions of cumulative compaction and base compaction are different. Cumulative compaction needs to save delete index, and delete index can be deleted after base compaction is over.

Cumulative Compaction

Cumulative compaction is to merge from a lower version to a higher version, similar to a read operation, but the rows that need to be deleted need to be read out and merged and need to be added to the new delete index. Since delete index is a bitmap index, it exists in each segment file,

Base Compaction

Base compaction is also an operation similar to read, but because it is a full read, there is no need to record the delete index. Just read the data directly and filter out the rows to be deleted according to the previous compaction logic.

yangzhg commented 4 years ago

Doris 支持批量删除设计文档

背景

​ 目前Doris 支持broker load, routine load, stream load 等多种导入方式,对于数据的删除目前只能通过delete 语句进行删除,使用delete 语句的方式删除时,每执行一次delete 都会生成一个新的数据版本,如果频繁删除会严重影响查询性能,并且在使用delete 方式删除时,是通过生成一个空的rowset来记录删除条件实现,每次读取都要对删除跳条件进行过滤,同样在条件较多时会对性能造成影响。对比其他的系统,greenplum 的实现方式更像是传统数据库产品,snowflake 通过merge 语法实现。

​ 对于类似于cdc 数据的导入的场景,数据数据中insert 和delete 一般是穿插出现的,面对这种场景我们目前的导入方式也无法满足,即使我们能够分离出insert 和delete 虽然可以解决导入的问题,但是仍然解决不了删除的问题。

设计目标

功能层面:

增强导入功能,使其能够支持如下场景:

易用性方面:

尽量减少对导入语法的修改,并且能够兼容目前的导入语法

性能方面

导入和读取的性能要和目前的导入方式基本持平,不能有太大的性能损失

详细设计

导入语法方面为在导入是增加一列标示当前行是导入还是删除,如果没有默认所有行为插入行,实现层面本次升级的功能只在segmentV2上实现,v1 暂不考虑,在segment 文件的IndexRegion 中增加一个类似于null bitmap 的bitmap 索引用于标示需要删除的行。

数据结构设计

需要在segment 结构中增加一个bitmap 索引(delete_index_page)来标示哪一行被标记为删除,PagePointerPB 结构和之前定义的相同,使用bitmap 作为索引。

message SegmentFooterPB {
    optional uint32 version = 1 [default = 1]; // file version
    repeated ColumnMetaPB columns = 2; // tablet schema
    optional uint32 num_rows = 3; // number of values
    optional uint64 index_footprint = 4; // total idnex footprint of all columns
    optional uint64 data_footprint = 5; // total data footprint of all columns
    optional uint64 raw_data_footprint = 6; // raw data footprint

    optional CompressionTypePB compress_type = 7 [default = LZ4F]; // default compression type for file columns
    repeated MetadataPairPB file_meta_datas = 8; // meta data of file

    // Short key index's page
    optional PagePointerPB short_key_index_page = 9;、
    // Use bitmap index to indicate which row is marked for deleting
    optional PagePointerPB delete_index_page = 10;
}

导入语法

导入的语法设计方面主要是增加一个指定删除标记列的字段的column 映射,并且需要在导入数据中增加这一列,各个导入方式设置的方法如下

stream load

stream load 的写法在在header 中的 columns 字段增加一个设置删除标记列的字段, 示例 -H "columns: k1, k2, label_c3" -H "merge_type: [MERGE|APPEND|DELETE]" -H "delete: label_c3=1"

broker load

PROPERTIES 处设置删除标记列的字段

LOAD LABEL db1.label1
(
    [MERGE|APPEND|DELETE] DATA INFILE("hdfs://abc.com:8888/user/palo/test/ml/file1")
    INTO TABLE tbl1
    COLUMNS TERMINATED BY ","
    (tmp_c1,tmp_c2, label_c3)
    SET
    (
        id=tmp_c2,
        name=tmp_c1,
    )
    [DELETE ON label=true]

)
WITH BROKER 'broker'
(
    "username"="user",
    "password"="pass"
)
PROPERTIES
(
    "timeout" = "3600"

);

reoutine load

routine load 在columns 字段增加映射 映射方式同上,示例如下

   CREATE ROUTINE LOAD example_db.test1 ON example_tbl 
    [WITH MERGE|APPEND|DELETE]
    COLUMNS(k1, k2, k3, v1, v2, label),
    WHERE k1 > 100 and k2 like "%doris%"
    [DELETE ON label=true]
    PROPERTIES
    (
        "desired_concurrent_number"="3",
        "max_batch_interval" = "20",
        "max_batch_rows" = "300000",
        "max_batch_size" = "209715200",
        "strict_mode" = "false"
    )
    FROM KAFKA
    (
        "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
        "kafka_topic" = "my_topic",
        "kafka_partitions" = "0,1,2,3",
        "kafka_offsets" = "101,0,0,200"
    );

导入

数据导入时流程如下:

当导入数据包含 删除标记 并且删除标记是true 时写入数据的同时记录次行在segment 中的行号,并记录在delete index 中,否则直接写数据,这里可以有一个优化就是当标记为删除时这一行的value 列可以都设置成相应类型的最空间占用最小的值, 比如varchar 类型我们可以把值都设置成空字符串以节省空间。

st=>start: Start Load
flag_cond=>condition: delete flag is true
write_rowdata=>operation: write data 
write_rowdata_opt=>operation: write Data with minimum values
write_delete_index=>operation: write delete index
e=>end

st->flag_cond
flag_cond(yes)->write_rowdata_opt->write_delete_index->e
flag_cond(no)->write_rowdata->e

image

假设有表

+-------+-------------+------+-------+---------+---------+
| Field | Type        | Null | Key   | Default | Extra   |
+-------+-------------+------+-------+---------+---------+
| k1    | INT         | Yes  | true  | 0       |         |
| k2    | SMALLINT    | Yes  | true  | NULL    |         |
| k3    | VARCHAR(32) | Yes  | true  |         |         |
| v1    | BIGINT      | Yes  | false | 0       | REPLACE |
+-------+-------------+------+-------+---------+---------+

导入数据为

0,1,foo,3,false
0,1,foo,2,true
1,2,bar,3,true
0,1,foo,5,false

如果为UNIQUE_KEYS ,只记录最新的结果即可,因此第一行和第二行都没有意义,在导入聚合时会被忽略。则记录到rowset 中数据区的数据为

1,2,bar,3
0,1,foo,5

delete_index 为bitmap 中记录[1,0], 标示这批导入数据中第1行被标记为删除。

读取

UNIQUE_KEYS表

目前UNIQUE_KEYS 表读取时已经是从高版本向地板版本合并数据,由于unique_keys能保证每个segement中只有一条记录,只需读取第一个版本即可,如果是标记删除的则全部跳过,否则直接返回第一行,剩余跳过。

Compaction

compaction时 cumulative compaction和 base compaction 的功能方式有所不同cumulative compaction需要保存delete index,base compaction结束后可以删除delete index。

Cumulative Compaction

cumulative compaction 是从低版本向高版本合并,类似于读取的操作,但是需要遇到删除的行需要读取出来合并并且还需要添加到新的delete index 中。由于delete index 是一个bitmap索引,存在于每个segment 文件中,

Base Compaction

base compaction 也是和读取类似的操作,但是因为是全量读取因此不需要记录delete index,直接把数据读取出来过滤掉要删除的行按之前compaction 逻辑处理即可。

详细流程

方案一

FE端

在导入是通过fe 在导入的数据生成tuple 中增加了一列__DORIS_BATCH_DELETE_COL__,这一列的表达式就是删除条件,这一列的值就是delete 条件的结果,这样这列就会在broker scannode的中 自动计算出来并返回

BE 端

  1. 在be 的 tablet writer 中通过fe发过来的参数判断是否存在delete列来初始化 memtable

  2. 在tuple 被插入memtable 时将删除属性带入,在memtable merge 时对这个属性处理类似于 replace聚合, 这里修改 ContiguousRow增加一个属性_is_delete

  3. 在memtable 将内容刷入segment 文件时 将delete 信息写入segment footer 中的一个类似索引结构中,这里只记录需要删除的行的行号

  4. 在读取是将这个索引信息由segemt iterator 带入读出 传入 RowBlock 最终写入RowCursor中(RowCursor 也需要增加相应的delete 属性),之后就是在reader.cpp处理删除逻辑

方案二

方案二主要工作在fe端, 通过增加一个隐藏列__DELETE_SIGN__实现,因为我们只是在unique 模型上做批量删除,因此只需要增加一个 类型为bool 聚合函数为replace 的隐藏列即可。在be 各种聚合写入流程都和正常列一样,读取方案有两个:

  1. 在fe遇到 * 等扩展时去去掉__DELETE_SIGN__,并且默认加上 __DELETE_SIGN__ != true 的条件
  2. be 读取时都会加上一列进行判断,通过条件确定是否删除

base compaction 时将标记删除的行去掉

折衷考虑

在上面的实现在FE端给broker 读出的数据生成了一个隐藏列,而在对应实际数据存储时没有在最终数据上也是用隐藏列 主要考虑如下:

  1. 如果在doris 里增加隐藏列比如需要更改表的结构,这样就会出现需要新建表才能使用,旧表需要schem change 后在能用,这样对用户不是很友好,对于用户数据的修改处于谨慎考虑不宜自动做schema change;
  2. 增加隐藏列后对于查询、删除设计列名的操作都需要过滤隐藏列的情况,在be 层除了存储层在segment 层面不感知这一列的存在其他层面都需要感知,改动处仍然比较多。
  3. 如果我们不在fe 建表时增加隐藏列 仅在be 增加,则会在构建 memtable 时需要修改biao 的tablet_schema 中的列信息,这里会涉及许多tupleid 和slot ID, 这些id 均由FE 生成,在be 修改增加因为没有全局信息有较大的风险可能会重复或者冲突,而且这些结构的代码大多没有提供修改和增加列的接口,修改成本也比较大。

两种实现方案的却别就在于,隐藏列的方式除了segment 层不感知以外 FE, 计算, 除了segment以外的存储层其他部分都需要感知,而通过删除标记 只有存储层感知,其他层均不感知这种变化。从开发时间和复杂度角度来看目前采用删除标记的方案比较好,而且就只功能而言使用隐藏列和删除标记并没有性能上的区别。

但是就更广泛的层面考虑doris 也应该支持隐藏列功能,个人觉得这一功能可以单独作为一个功能点单独开发(如果咋doris 已经支持隐藏列的情况下,批量删除功能则应使用隐藏列实现)这样也有利于以后的功能扩展(比如事物的支持)。

morningman commented 4 years ago

The syntax and api need further discuss And I think it is corresponding to #3930 .

yangzhg commented 4 years ago

Task List:

imay commented 4 years ago

I think we should discuss about this feature befor developing it.

imay commented 4 years ago

I have some question about this proposal.

  1. Doris is a OLAP system which should be read optimized, however this batch delete proposal is write optimized.
  2. When user want to delete one row, primary key is enough to delete one row. It is unreasonable for user to give all columns for rows to be deleted.
  3. When this is supported, how about materialized view's udpating.

I think delete is a key feature in a database system, we should discuss enough before doing it.

morningman commented 4 years ago

I have some question about this proposal.

  1. Doris is a OLAP system which should be read optimized, however this batch delete proposal is write optimized.
  2. When user want to delete one row, primary key is enough to delete one row. It is unreasonable for user to give all columns for rows to be deleted.
  3. When this is supported, how about materialized view's udpating.

I think delete is a key feature in a database system, we should discuss enough before doing it.

  1. Currently Doris is implemented based on the Merge on Read method. This implementation method is the same as the current implementation strategy.

  2. In principle, users do not need to provide a value column, this function can support users not to specify a value column. One purpose of this function is to receive the change data sent from the TP database, which is usually the entire line of content.

  3. This function is only for the table of the unique key model, and has nothing to do with the materialized view

imay commented 4 years ago
  1. Currently Doris is implemented based on the Merge on Read method. This implementation method is the same as the current implementation strategy.

Why not change it to 'Merge on Write'?

  1. In principle, users do not need to provide a value column, this function can support users not to specify a value column. One purpose of this function is to receive the change data sent from the TP database, which is usually the entire line of content.

I see that this function support all the load interface. And insert data and delete data in one batch, how can user delete data without value column?

  1. This function is only for the table of the unique key model, and has nothing to do with the materialized view

How about "AGG_KEYS 表 目前AGG_KEYS 表读取时是从低版本向高版本合并数据,当发现有当前行有delete标记时清除之前的数据,剩下的数据继续合并,如果没有数据跳过读取下一个key。没有发现delete 标记则按现有流程调用agg_update。"

Even this function is only for unique model, however this is a "user-interface" level function. We should discuss from a more macroscopic and creative perspective, not the immediate function. If this function is only for unique model, what's the funtion about duplicate and aggregate model?

morningman commented 4 years ago

Why not change it to 'Merge on Write'?

It can be done in some other new data model in future. Changing the current data model to "Copy-on-Write" is a very costly thing. Having you any idea about it?

I see that this function support all the load interface. And insert data and delete data in one batch, how can user delete data without value column?

Actually, I think the value column can be filled by some dummy placeholder values. And user can also specify the column mappings in columns properties to specified the key columns, and the missing value columns can either will filled by default value automatically, or filled by user manually.

Even this function is only for unique model, however this is a "user-interface" level function. We should discuss from a more macroscopic and creative perspective, not the immediate function. If this function is only for unique model, what's the function about duplicate and aggregate model?

Oh, I forgot, uniq key is also a kind of aggr key model, so this can be performed on aggr key model in some scenarios. It is just like DELETE FROM method that system will check if all key columns exists in rollups or materialized views.

Duplicate key model is not within our consideration. Cause we do not found a good way to handle this model. First of all, the main purpose of this delete function is to receive the data changes of the TP system captured by the CDC system. The data in the TP system usually has the concept of a primary key. And many CDC systems or data transmission systems themselves also require data to have a primary key, otherwise effective incremental data capture cannot be performed. In my opinion, duplicate key is not a data model for strict CRUD scenarios.

imay commented 4 years ago

It can be done in some other new data model in future. Changing the current data model to "Copy-on-Write" is a very costly thing. Having you any idea about it?

We can do it by add delete flag for old rows when doing delete operation.

Actually, I think the value column can be filled by some dummy placeholder values. And user can also specify the column mappings in columns properties to specified the key columns, and the missing value columns can either will filled by default value automatically, or filled by user manually.

Now, in one Load job, there are inserted rows and deleted rows, columns mapping works for all of them. Which means loaded rows and delete rows should have the same number of columns.

Oh, I forgot, uniq key is also a kind of aggr key model, so this can be performed on aggr key model in some scenarios. It is just like DELETE FROM method that system will check if all key columns exists in rollups or materialized views.

Because all columns should be given in Load, actually all the delete will be disabled for aggregate model.

Duplicate key model is not within our consideration. Cause we do not found a good way to handle this model. First of all, the main purpose of this delete function is to receive the data changes of the TP system captured by the CDC system. The data in the TP system usually has the concept of a primary key. And many CDC systems or data transmission systems themselves also require data to have a primary key, otherwise effective incremental data capture cannot be performed. In my opinion, duplicate key is not a data model for strict CRUD scenarios.

Actually, when you create a MySQL table, there is no primary key. And primary key is just a constraint, it is OK to delete/update rows for a table without primary key.

And what's more, if we support this scheme for deletion, how can we support standard SQL delete syntax.

yangzhg commented 4 years ago

@imay

And what's more, if we support this scheme for deletion, how can we support standard SQL delete syntax.

I will modify the proposal, only support the UNIQE_KEYS. The standard SQL delete syntax is still support, this is only provide a way to delete a batch data and not generate so many versions, just like add filter to the original code