junhuihuang / webpages

1 stars 0 forks source link

讲一讲Delta Lake的原理 #23

Closed junhuihuang closed 1 year ago

junhuihuang commented 1 year ago

之前几篇文章,我们已经讲了 Hudi 和 Iceberg 的原理,以及它们的一些特色功能,今天就来讲一讲数据湖三剑客中的最后一家——Delta。相对于 Hudi 在国内的流行,国外则比较流行 Delta,而且这两个项目想要解决的问题是同一个问题,又都启动于 2017 年,颇有一时瑜亮的感觉。接下来还会讲到,这两个项目即使在原理层面,也是非常相似的。推荐没有读过前几篇文章的朋友,可以先把讲Hudi 原理的文章读完。

01

按照我们的惯例,要了解一个项目的首要问题是理解这个项目当初为了解决什么问题。

首先从 Delta 的历史说起。Delta 是 Databricks 于 2017 年在自家平台上推出的功能,主打的概念是Lakehouse(湖仓一体),最初只有 Databricks 的付费用户可以使用。2019 年的时候 Databricks 将 Delta 开源,进一步提高了 Delta 的影响力和市场份额。不过开源版的 Delta 并不是完整的 Delta,有部分功能 Databricks 没有放到开源版里面,在文章的最后我们会简单的讲一讲这些功能。

Lakehouse 是 Databricks 主推的一个概念

讲完 Delta 的历史,回到我们最初的问题,即:Delta 想解决什么问题。在 Databricks 撰写的关于 Delta 论文中,Databricks 解释了当初为什么要创建 Delta 这个项目

As a result, many organizations now use cloud object stores to manage large structured datasets in data warehouses and data lakes.
......
Unfortunately, although many systems support reading and writing to cloud object stores, achieving performant and mutable table storage over these systems is challenging, making it difficult to implement data warehousing capabilities over them.

从这段文字中我们可以看到,Delta 的目标是在云环境的对象存储之上,构建一个高性能,可修改的数据仓库 **。**

现代的数据湖大多构建于对象存储之上

那么对象存储究竟有哪些问题,使得实现高性能,可修改的数据仓库这件事变得比较困难呢?主要是以下几个点:

  1. 对象存储的文件写入后就不可修改,只能重写。
  2. 对象存储上的 list 操作性能较差。
  3. 对象存储只能满足最终一致性,即 A 写入成功的文件,B 不一定能够马上看到这个文件。

其实这几个问题,在介绍 Hudi 和 Iceberg 的文章中也有提及,相信读过的朋友并不陌生。而 Delta 解决这些问题的思路也和前两者比较类似,就是

在数据更新时重写受影响的文件,把新写的文件和未受影响的老文件一起,产生一个新的 snapshot,并记录到提交历史里面。

可以看到这个思想贯穿了三个数据湖系统:Hudi,Iceberg 和 Delta。 在接下来讲解实现原理的一节中也会提到,从本质上来说 **,Delta 的实现逻辑和 Hudi 的 COW 表是基本相同的。**

02

接下来还是按照惯例,我会以 Delta 的一个标志性功能作为主线,讲解下 Delta 的实现原理。

首先罗列下 Delta 的功能,注意这里展示的只是开源版 Delta 的功能。

  1. ACID 事务
  2. UPDATE,DELETE 和 MERGE
  3. Time Travel(时间旅行)
  4. Streaming IO(流式读写)
  5. Schema Evolution(Schema 演化)

从功能层面来看,Delta 和 Hudi,Iceberg 区别不大。上一节有提到,Delta 希望实现的是” 在对象存储之上的可修改数据仓库 “,所以我们就以 UPDATE 功能作为本次原理讲解的主线

UPDATE 功能有两种用户 API,分别是通过 DeltaTable 类和 SQL 的方式,相对来说 SQL 应该大家更为熟悉,就以 SQL 举例

UPDATE foobar SET val = 10 WHERE id = 5;

更新 id=5 的 record,把 val 更新为 10

Delta 背后处理这条 SQL 的过程整体来说分为以下 4 步:

  1. 改写 Spark 的 LogicalPlan,替换为 Delta 的更新逻辑。
  2. 执行更新逻辑,确定本次更新影响到的文件列表。
  3. 重写受影响的文件,对数据进行更新。
  4. 将本次操作新增了和删除了哪些文件等信息 commit 到提交历史中。

在讲解每一步的具体逻辑之前,首先展示一张 Delta 表的目录结构

可以看到除了_delta_log 目录以外,其余的目录和普通的 parquet 表没有区别。 _delta_log 目录就是 Delta 的元数据目录(和 Hudi,Iceberg 类似),里面保存的是每次 commit 的变更历史,这是时间旅行和多版本隔离等功能的基础。

Step 1. 改写 LogicalPlan

Delta 由于是 Databricks 开发的项目,因此充分利用了 Spark 的能力。Delta 并不会自己解析 SQL,SQL 的解析全部由 Spark 完成,Delta 在 Spark 里注册了一条自己的 Rule,会将 Spark 解析后的 LogicalPlan(逻辑执行计划)中的节点替换成 Delta 的节点,然后再交由 Spark 来执行这些执行计划的节点

例如对于 UPDATE 类的操作,Delta 会把 Spark 的 UpdateTable 节点替换成 UpdateCommand 节点,所有的 Delta 更新逻辑便是在 UpdateCommand 里完成的。

Step 2. 确定本次更新影响到的文件列表

具体执行更新逻辑(也就是 UpdateCommand)后,第一步就是需要确定影响到的文件范围,也就是定位到需要更新的那些记录所在的文件。本质其实和 Hudi 的 tagging 过程所做的事情相同。

Delta 是如何实现 tagging 的?和上一步类似的,由于 Delta 和 Spark 深度绑定的关系,Delta 在实现时经常会使用 Spark 的能力。tagging 就是通过 Spark 的条件过滤来找到需要修改哪些文件的。

val filesToRewrite =
  withStatusCode("DELTA", s"Finding files to rewrite for UPDATE operation") {
    data.filter(new Column(updateCondition))
      .filter(updatedRowUdf())
      .select(input_file_name())
      .distinct().as[String].collect()
  }

查找需要重写的文件列表的源代码

Step 3. 重写受影响的文件

重写文件的逻辑很简单,就是读取原文件的数据,更新需要更新的部分,然后写入成为新的文件。这些操作也是通过调用 Spark 来实现的。Delta 实现这部分逻辑的源代码如下

val targetDf = Dataset.ofRows(spark, newTarget)
val updatedDataFrame = {
  val updatedColumns = buildUpdatedColumns(condition)
  targetDf.select(updatedColumns: _*)
}
txn.writeFiles(updatedDataFrame)

Step 4. commit 到历史记录

更新工作完成后,Delta 的最后一步工作就是把本次更新的信息写入成为 json 文件,保存在_delta_log 目录下。

保存的信息主要包括:

  1. 删除了哪些文件
  2. 新增了哪些文件
  3. 本次 commit 的元数据

下面是一个典型的 json 文件的内容

{
  "remove": {
    "path": "part-00000-0d86b235-fd75-4702-ad2c-685c098b2754-c000.snappy.parquet",
    ...
  }
}
{
  "add": {
    "path": "part-00000-afcf4345-91fa-4712-b89e-fddd0c88e5bb-c000.snappy.parquet",
    ....
  }
}
{
  "commitInfo": {
    "timestamp": 1650112268645,
    "operation": "UPDATE",
    ...
    "operationMetrics": {
      "numRemovedFiles": "1",
      "numAddedFiles": "1",
      ...
    },
    "txnId": "468d127f-3a3c-4702-b408-a4d344bd532b"
  }
}

类似的逻辑在 Hudi 和 Iceberg 中都有,相信读过以前文章的朋友都比较熟悉了。但 Delta 有一点和其他两者不同,就是 Delta 在 commit 文件里只保存了差分而不是快照,所以当需要获得某个 commit 的完整文件列表就需要把之前的差分都重放一遍,才能计算出当前的状态。或许这就是 Delta 这个名字的由来吧。

可以想见,当变更历史非常长时,从头开始重放一遍历史是比较耗时的。因此 Delta 引入了 checkpoint 机制,当 commit 积累到一定数量时,会生成一个 checkpoint。后续的读取就可以从这个 checkpoint 开始重放。

03

以上是完整的 Delta 进行 UPDATE 的流程。如同第一节末尾所说的,Delta 的思想和 Hudi 的 COW 表是基本相同的,实现上也有很多相似之处。 不过借助于 Spark 的能力,Delta 能够实现 Hudi 不容易做到的功能,例如 “非主键条件更新”,“MergeInto 语法” 等。反过来说,正是因为 Delta 和 Spark 的深度绑定关系,又使得 Delta 很难支持 Spark 以外的计算引擎,例如 Flink。我想 Delta 在国内不是太火,应该和它不支持 Flink 关系比较大。

最后提一下 Delta 企业版相比于开源版所额外具备的功能:

1. 小文件优化功能

提供 OPTIMIZE 命令,可以把小文件批量合并成大小约 1GB 左右的大文件。减少查询的文件总量,提高查询的整体性能。

这个功能算是数据湖的标配了,在 Hudi 和 Iceberg 里都有支持

2. Z-Ordering

通过对数据使用 Z-Order 进行排序保存,对于有多个条件字段的表可以提高数据的 locality(局部性),减少需要读取的数据量,从而提高查询性能。

Z-Ordering 是个比较高级的优化功能,目前 Hudi 和 Iceberg 都还不支持。Z-Ordering 是一种相对复杂的优化存储布局的办法,本身也是个有意思的话题(需要一些数学知识),考虑以后专门开一篇文章来讲讲。

关于 Delta 的原理差不多就讲到这里了。总结下这篇文章的知识点:

  1. Delta 的设计目标是为了在对象存储之上实现可更新的数据仓库。
  2. Delta 的实现原理和 Hudi 的 COW 表基本相同。
  3. Delta 的实现和 Spark 深度绑定,目前只支持 Spark 计算引擎。 https://mp.weixin.qq.com/s/4YbwgqWdvi8rsiPUmABNmw
github-actions[bot] commented 1 year ago

This issue is stale because it has been open for 30 days with no activity.

github-actions[bot] commented 1 year ago

This issue was closed because it has been inactive for 14 days since being marked as stale.