AmateurEvents / article

记录一些有趣的文章
136 stars 28 forks source link

Apache Pulsar事务设计草案 #3

Open tuteng opened 5 years ago

tuteng commented 5 years ago

动机

  1. 用例
    1. 事件处理
    2. 原子性
  2. 事务保证
    1. 隔离级别
  3. 总览
    1. 概念
      1. 事务协调器
      2. 事务缓冲区
      3. 事务确认
      4. 物化机制
    2. 事务流
      1. 事务开始
      2. 事务循环
        1. 增加分区到事务
        2. 发送消息到分区
        3. 增加订阅到事务
        4. 响应消息到订阅
      3. 结束事务
        1. 结束事务请求
        2. 最终确定过程
        3. 标记事务为COMMITTED或者ABORTED

设计选型

  1. 事务缓冲区
    1. Marker Approach
      1. 预览
      2. 挑战
        1. 物化
        2. 清理
        3. 保留
      3. 改变
      4. 讨论
      5. Sidecar Approach
        1. 预览
        2. 挑战
          1. 物化
          2. 清理
          3. 保留
        3. 改变
        4. 讨论
          1. 大事务

本文为我学习PIP 31: Transaction Support时做的一些笔记,中间有不少翻译的不准确的地方,可以结合原文来进行更详细的阅读

materialization:物化,这个概念需要和事务的隔离级别READ_COMMITTED一起来理解更好

动机

本文概述了Apache Pulsar支持事务性消息传递的草案。事务被用来加强Apache Pulsar的消息传递语义,以及Pulsar Functions的处理保证。

Apache Pulsar目前提供的最高级别的消息传递保证是通过幂等生成器在一个分区上“准确地”生成一次。保证用户通过幂等生成器生成到单个分区的每条消息都将被准确保存一次,不会丢失数据。当生产者试图向多个分区产生消息时,不存在“原子性”。例如,当broker崩溃时,发布失败可能会发生,如果生产者没有重试或者已经用尽了重试次数,消息可能不会被写入pulsar。在消费者方面,确认目前是一项best-effort的操作,这将导致消息重新传递,因此消费者会收到重复的消息。pulsar只保证消费者至少消费一次。

类似地,Pulsar Functions只保证对幂等函数上的单个事件进行准确处理一次。它不能保证多个事件或产生多个结果时能够准确处理一次。例如,如果一个函数接受多个事件并产生一个结果(例如,窗口函数),该函数可能会在产生结果和确认传入消息之间,甚至在确认单个事件之间失败。这将导致所有(或一些)传入消息被重新投递和重新处理,并产生新的结果。

Pulsar和Pulsar Functions的用户将从事务支持中获取较大的收益。写入或处理的每条消息都将准确地发生一次,不会重复,也不会丢失数据——即使在broker或者函数实例失败的情况下也是如此。事务性不仅让基于Pulsar和Pulsar Funcitons来开发应用更容易,也使Pulsar的应用范围更广阔。

用例

事件处理

事务性保证事件处理(又名事件流、流处理)应用程序获得巨大好处。当事件流的被重复处理不可接受时,在事件处理的应用中典型的例子是“consume-transform-produce”类型的任务需要事务性保证。

Pulsar Functions就是这样的一个基于Apache Pulsar构建的事件处理框架。函数调用是典型的“consume-transform-produce”类型的任务——它从一个(或多个)主题中读取事件,通过调用函数来处理事件,并将结果产生到一个(或多个)主题中。

许多SPE (流处理引擎)或自定义的事件处理逻辑(使用Pulsar的生产者和消费者)属于“consume-transform-produce”的范畴。它需要能够在一次原子操作中向不同的主题分区生成并确认一批消息。原子性意味着要么提交所有消息(所有输出消息都被精确地保存一次,并且所有输入消息都被确认),要么没有任何消息被提交。

单个事务中生成的消息数量和确认的消息数量可能会有所不同,从几条消息到大量消息不等。主题分区(又名分区)的数量也会有所不同,从一个分区到多个分区。这取决于处理效率、窗口/缓冲逻辑以及许多其他与应用相关的因素。理解上述内容的典型例子是事件处理应用程序中的窗口操作。当读者阅读本文档时,我们鼓励大家记住这类用例,因为这将在本提案的剩余部分中推动许多设计选择。

原子性

以原子方式产生多个消息是“consume-transfer-produce”类型的一种特殊情况,它只需要以原子方式“produce”消息。

Atomic produce有一个默认5M大小的消息限制在Apache Pulsar中。你可以将消息的限制调整到非常大,但这意味着你必须使用非常大的网络和磁盘IO,这对内存和磁盘不是很友好。如果大家将一条大消息分解成多条小消息,并将每个小消息作为一条pulsar消息发送,我们需要确保以原子方式发送这些消息。

数据库CDC或使用Apache Pulsar搜集日志是atomic producing的典型用例。

事务保证

如“动机”部分所述,提供事务将使事件流应用程序能够在一个原子操作中消费、处理和生成消息。

这意味着,事务中的一批消息可以从许多分区接收、产生并向其确认。事务中涉及的所有操作都将作为一个单元成功或失败。

然而,我们不能保证在提交的事务中产生的消息会被其下游消费者一起消费。这基于以下几个原因:

  1. 消费者可能不会从参与提交事务的所有分区中消费。因此,他们永远无法读取事务中包含的所有消息。
  2. 消费者可能有不同的接收者队列大小或缓冲/窗口大小,他们可能只对消费一定数量的消息感兴趣。该数量可以是任意数字。

然而,我们可能能够支持一起消费提交给一个分区的消息。但这取决于我们在下面选择的设计。由于对这一特性没有强烈的要求,我们暂时将这一点排除在保证范围之外。

隔离级别

与数据库事务类似,事件流系统中的事务也将具有隔离级别。隔离级别为:

Pulsar事务必须支持的隔离级别是READ_COMMITTED。是否支持READ_UNCOMMITTED或SERIALIZABILITY需要依赖Pulsar用户的输入。

总览

Pulsar中有许多设计事务支持的方法。所有这些建议都可以形成一个共同的框架,我们将在本节中讨论。之后的章节将详细描述基于这个框架的细节。

概念

事务协调器

为了实现消息的事务性,我们必须引入一个名为事务协调器(又名TC )的服务器端模块。TC管理生产者发送的消息和消费者发送的确认的事务,并作为一个整体进行提交或中止操作。

事务协调器会将事务状态持久保存在一些持久存储中(例如,由单独的主题或表服务中的表支持的事务日志)以进行恢复。我们将在下一节中讨论如何实现TC,以及TC如何管理事务状态。

事务缓冲区

事务中产生的消息将存储在事务缓冲区(又称TB )中。除非消费者提交事务,否则不会将TB中的消息持久化(可见)。当事务中止时,TB中的消息将被丢弃。根据TB的实现方式,可能需要一个清理过程(例如压缩)来清理中止事务的消息。

TB的实现要求:

  1. 无论生产者如何重试生成消息,生成到TB的消息都不会重复。
  2. 在broker崩溃期间不会丢失消息。 还有其他因素需要考虑,例如清理中止事务的消息、写扩增、排序等。我们将讨论TB的解决方案以及里面的一些权衡。

事务确认

事件流应用程序(例如Pulsar Functions)可能包括消费者和生产者,其中应用程序消费来自输入pulsar主题的消息,并产生新消息以输出到pulsar主题。为了实现精确的一次(exactly one),我们需要确保输入消息上的确认作为事务的一部分发生,以便实现原子性。否则,如果确认输入主题和生产输出主题的消息之间出现故障,将根据两个操作的顺序发生数据重复或数据丢失:如果首先生产者提交了消息,然后发生了故障,则输入消息将在恢复时重新投递,因为它们未被确认,因此数据重复;如果首先确认输入消息,则提交失败的输出消息将不会重新生成,因为输入消息已被确认,因此会丢失数据。

因此,我们需要在事务中包含确认来保证原子性。为了实现这一点,我们必须改变事务中确认的行为。因为目前pulsar中的所有确认都只是best-effort的操作。ack可能在网络断开或broker崩溃期间丢失,这将导致数据重复。

我们还需要考虑单个确认和累积确认之间的提交冲突。在接下来的章节中,我们将讨论如何增强消费者协议和游标管理,以支持事务中的确认。

物化机制

对于附加到TB的消息,事务实现还应该提供物化机制来物化未提交的消息,以使它们在事务提交时对消费者可见。这种物化机制因TB的实现而异。

物化机制还应该考虑隔离级别(如果我们想要支持比READ_COMMITTED更高的隔离级别)。

我们将在后面的章节中讨论事务如何实现未提交的消息。

事务流

所有事务实现都可以使用以上章节中描述的这些关键组件/概念来构造。

image

在图1中,事务流如下:

开始事务

在事务开始时,pulsar客户端会找到一个事务协调器(TC)。TC将为事务分配一个事务id(又名TxnID)。事务将在事务日志中记录其事务id和打开状态(表示事务是打开的) (如步骤1a所示)。这确保了无论TC崩溃,事务状态都保持不变。事务状态被记录到日志后,TC将事务id回复给pulsar客户端。

事务循环

在这个阶段,pulsar客户端将进入一个事务循环,重复consume-transform-produce由事务组成的消息的动作。这是一个漫长的阶段,可能包含多个生成和确认请求。

增加分区到事务

在pulsar客户端向新的主题分区生成消息之前,客户端向TC发送一个请求,将该分区添加到事务中。TC将事务的分区更改记录到其事务日志中,以确保持久性(如2.1a所示)。这一步确保TC知道事务接触的所有分区,因此TC可以在分区结束阶段提交或中止每个分区上的更改。

发送消息到分区

pulsar客户端开始向分区产生消息。该生产流程与正常消息生产流程相同。唯一的区别是由事务产生的一批消息将包含事务id。接收该批消息的broker检查该批消息是否属于事务。如果它不属于事务,broker将批处理直接写入分区的managed ledger(这是正常的生产流程)。如果它属于一个事务,broker将把它们写入事务的事务缓冲区。

事务缓冲区必须满足以下要求: a. 就算broker崩溃,附加到事务缓冲区的消息都应该持久保存。 b. 无论生产者在网络断开时如何重试产生相同的消息,消息都应该精确地追加一次。 c. 在提交事务之前,不应将消息物化呈现给消费者。

事务缓冲区可以以多种方式实现。它可以是managed ledger本身,一个独立的managed ledger,或一些其他实现。我们将在后面的章节中讨论关于事务缓冲区设计选择的更多细节。

增加订阅到事务

pulsar客户端在新的订阅首次被确认为事务的一部分时向TC发送请求。TC在步骤2.3a中记录事务的订阅添加。该步骤确保TC知道事务覆盖的所有订阅,因此TC可以在结束事务阶段提交或中止对每个订阅的更改。

响应消息到订阅

pulsar客户端开始确认订阅消息。该事务确认流程与正常确认流程相同。然而,确认请求将携带一个事务id。接收确认请求的broker检查确认是否属于该事务。如果它属于一个事务,broker将把消息标记为PENDING_ACK状态。PENDING_ACK状态意味着在确认被提交或中止之前,消息不能被其他消费者确认或否认。(参见“New Acknowledgement State”部分的详细信息)这使得如果一条消息上有两个事务试图确认,只有一个会成功,另一个会中止。

结束事务

在事务结束时,应用程序将决定提交或中止事务。当在确认消息上检测到冲突时,事务也可以中止。

结束事务请求

当pulsar客户端完成一个事务时,它可以向TC发出一个结束事务请求,其中一个字段指示事务是提交还是中止。

收到该请求后,TC将:

  1. 将提交或中止消息写入其事务日志(如3.1a所示)。
  2. 开始向该事务中涉及的所有分区提交或中止消息或确认的过程。它在3.2中展示并进行相应的描述。
  3. 在成功提交或中止该事务中涉及的所有分区后,TC将提交或中止消息写入其事务日志。它在3.3中展示并进行相应的描述。
最终确定过程

在此阶段,TC将通过提交确认、终止确认所有分区上的消息来完成事务。

提交生产的消息是将消息进行物化,并使它们对消费者可见(如图3.2a所示)。由于故障(例如恢复后的重试、网络断开等),提交操作可能会发生多次。TB实现必须确保在提交过程中不会引入重复。

中止生成的消息将丢弃TB中的消息。如果事务中止,TB必须确保清理这些消息并回收空间。 提交确认将消息从PENDING_ACK移动到ACK.。中止确认将不会确认消息,因此该消息将被重新传递给其他消费者。

标记事务为COMMITTED或者ABORTED

对于所有分区生产的消息被提交或终止确认之后,TC将最终COMMITTED或ABORTED 的事务状态消息写入其事务日志,指示事务已完成(如图3.3a所示)。此时,与其事务日志中的事务相关的所有消息都可以安全地删除。

该图显示了涉及不同组件的整个事务流程。然而,这些组件的实现细节在这里没有很好地讨论。我们将在下面几节中更详细地讨论它们,并逐个组件地比较设计选择。

此外,在改进事务流程方面还可以进行许多优化。这些都被排除在这个提案之外,以确保我们从一个可靠和健壮的实现开始,先把事情做好。

设计选型

“Transaction Coordinator”和“Transactional Acknowledgement”很容易实现。详见“A Full Proposal”一节。最具挑战性的部分将是“Transaction Buffer”部分,因为将会有许多不同权衡的建议。这些提案将在下文讨论。

事务缓冲区

概括我们上面对事务流的描述,事务缓冲实现应该考虑以下几点:

  1. 事务循环期间: a. 即使broker崩溃,附加到事务缓冲区的消息都应该持久保存。 b. 无论生产者在网络断开时如何重试产生相同的消息,消息都应该精确地被追加一次。 c. 在提交事务之前,不应将消息物化呈现给其他消费者。
  2. 物化机制,用于物化事务缓冲区中的消息,使它们对其他消费者可见 a. 消息如何物化将影响我们如何向消费者发送消息
  3. 一种清除机制,用于清理事务缓冲区中的消息以回收磁盘空间。

Marker Approach

实现事务缓冲区的方法之一是:

预览

image

图2标记方法演示了标记方法的样子。灰色方框代表正常客户端(通过非事务流程)产生的消息;颜色框代表由事务产生的消息;不同的颜色表示不同的事务。事务产生的每个消息将被标记为“- ”(例如“txn2-m2”)。<txn>-commit< txn>-abort是提交或中止给定事务时附加的标记。

在这种方法下,所有事务性消息都直接附加到分区的managed ledger下。向broker发送消息时,需要添加额外的元数据(例如TxnID字段)。broker调度程序检查这些元数据以及事务状态,以决定是否应该调度它们。每个事务都将使用TxnID作为生产者ID,因此broker可以使用de-duplication来确保消息只被添加到分区一次。当事务协调器开始提交或中止事务时,它会向分区日志中写入'-commit'或者'-abort',以将事务标记为“COMMITTED”或“ABORTED”。此时,COMMITTED事务的消息可以安全地发送给消费者,ABORTED事务的消息可以通过后台的扫描进程来清理。 图2展示了3个事务,“txn1”、“txn2”和“txn3”。“txn1”和“txn2”被提交,而“txn3”被中止。

挑战

这种方法存在一些挑战。

物化

<txn>-commit是用于将事务标记为'COMMITTED'并将消息物化给消费者的提交标记。它也是事务的“fencing”点——在这个标记之后产生给同一个事务的任何消息都将被拒绝。

因为一个事务可以在多个消息上传播,所以我们需要一个为事务索引消息的机制。因此,当物化发生时,调度程序知道如何获取消息并正确地调度它们。

这可以通过MessageDeduplication游标来完成。当前,消息MessageDeduplication游标维护生产者标识与其序列id之间的映射。我们可以扩展它来维护txn id和它的消息的消息id列表之间的映射。当TC提交事务时:

  1. 将事务的消息id列表添加为提交标记的一部分。
  2. 将提交标记写入分区。
  3. 成功写入标记后,从MessageDeduplication游标中删除事务,因为事务已经实现。如果需要,可以将事务添加到事务缓存中进行快速查找(可选)。
清理

<txn>-abort是用于将事务标记为'ABORTED'的提交标记。一旦事务被标记为“ABORTED”,该事务的消息就可以安全地删除。但是,由于managed ledger仅支持追加,因此无法从分区中删除单个消息。所以这些信息不容易删除。消息必须等到保留过期,或者需要额外的“压缩”过程来压缩成段以删除中止事务的消息。这需要重写一个新的段。我们可以改进当前pulsar的压缩逻辑来实现它,或者作为将数据移动到分层存储的一部分来处理这个过程。

保留

在当前的方法中,由于事务性消息(提交和中止的消息)与普通消息交织在一起,broker应该小心确认。因为如果消息所属的事务尚未完成(提交或中止),游标不能向前移动。

改变

总之,这种方法需要更改以下组件: 在消息元数据中引入新字段,让broker判断消息是否属于事务。 在消息元数据中引入新字段,以判断消息是否是事务标记。 更改MessageDeduplication以维护事务id及其消息id列表之间的映射。 更改broker调度程序,跳过未物化的消息调度 更改压缩或卸载程序逻辑以丢弃属于中止事务的消息

在这种方法中,我们最终可能会触及broker的几乎每一个部分。

讨论

有几个性能相关的讨论点: 由于附加的事务消息和事务提交可以在不同的时间发生,所以同一事务的消息不会连续存储(逻辑上和物理上都在bookie上)。因此条目缓存行为可以非常随机。例如,在图2中,当读取txn2的消息时,它必须跳回txn2-m1,然后读取“txn2-m1”和“txn2-m2”;broker读取txn1的消息,它必须跳回读取txn1-m1。

不仅如此,在这个提案中,我们将普通消息与事务性消息混合在一起,这将显著改变普通消息的缓存行为,这可能会导致代理有更多的网络I/O。

Sidecar Approach

与标记方法相反,其他方法可以描述为sidercar approach,基本要点如下:

预览

image

图3 Sidecar Approach展示了Sidecar Approach的样子。灰色方框代表正常客户端(通过非事务流程)产生的消息;颜色框代表由事务产生的消息;不同的颜色表示不同的事务。事务产生的每个消息将被标记为“txn- ”(例如“txn2-m2”)。<txn>-commit<txn>-abort是提交或中止给定事务时附加的标记。

在这种方法中,所有事务消息都直接追加到分区的事务日志中。每个事务都将使用TxnID作为生产者ID发送到分区的事务日志中,因此broker可以使用de-duplication逻辑来确保消息准确地附加到事务日志中一次。

  1. 当事务协调器开始提交事务时,它会向分区写入一个“-commit”标记,以指示事务标记为“COMMITTED”。此操作会密封事务。当broker调度程序看到提交标记时,它将作为事务消息的索引。必须改进deduplication游标,使其包含事务日志和分区,以确保提交标记准确写入一次。
  2. 当事务协调器开始中止一个事务时,它会向事务日志中写入一个“-abort”标记,以指示该事务被标记为“ABORTED”。此操作会密封事务。之后,在事务日志中的事务消息能被后台进程清理

与标记方法相比,只有“commit”标记被写入分区,因此调度器和保留策略几乎不会改变。“commit”标记只是一个指向一批消息的指针。事务数据和普通数据的分离将确保: 事务性用例不会影响正常的用例 它在事务用例和正常用例之间建立了隔离。尤其是在缓存方面。正常用例的缓存行为将保持不变。我们可以创建一个增强的条目缓存来优化事务消息访问。

图3展示了3个事务,“txn1”、“txn2”和“txn3”。“txn1”和“txn2”被提交,而“txn3”被中止。

挑战

在这个方法中有几个挑战

物化

与标记方法类似,我们使用提交标记“-commit”将事务标记为“COMMITTED”,以将消息物化后给消费者。

清理

<txn>-abort是用于将事务标记为'ABORTED'的提交标记。一旦事务被标记为“ABORTED”,该事务的消息就可以安全地删除。但是,由于事务日志是仅追加的,因此无法从分区中删除单个消息。需要在后台运行一个额外的“压缩”过程来压缩事务日志,以删除中止事务的消息。

保留

与标记方法相比,保留变得容易得多。当确认发生在提交标记上时,它会将提交标记加载到内存中,并找到要确认的事务的消息id。然后,它会将这些消息标记为事务日志中已确认的消息。

改变

总之,这种方法需要更改以下组件:

  1. 引入一个新的元消息——元消息有一个指向其他消息的指针列表。调度程序将通过跟踪指针来解析元消息以读取实际消息。
  2. 更改MessageDeduplication以维护事务id及其消息id列表之间的映射,并处理事务日志和分区的游标。
  3. 引入新的压缩逻辑来压缩一个段,以丢弃中止事务的消息

讨论

大事务

如果我们将支持无限大小的消息建模为一系列消息块的事务,我们可以引入一个设置来告诉broker使用单独的ledger在分区上存储给定事务的消息。在这种方法中,我们可以让提交标记直接指向ledger。删除ledger类似于提交标记被删除时的删除消息。

Sidecar Approach的详细实现在“Broker - Transaction Buffer”一节中描述。

完整的设计草案

接下来是一份更详细完整的的设计草案,阅读起来要比上面简单,暂时没有翻译