PeterSH6 / paper-notes

My paper reading notes
1 stars 0 forks source link

Flink: Lightweight Asynchronous Snapshots for Distributed Dataflows #7

Closed PeterSH6 closed 3 years ago

PeterSH6 commented 3 years ago

PDF: https://arxiv.org/abs/1506.08603 不鸽了 开始把暑假读的论文都总结一遍 以后读完一篇要马上总结,不然要点都忘了

PeterSH6 commented 3 years ago

分布式快照算法使用场景:在缺乏全局时钟或者全局时钟不可靠的分布式系统中,确定一个全局的可恢复的有意义的状态。最经典的分布式快照算法是 #8 Chandy-Lambert算法,Flink是基于此算法的一个改进,两个算法都是异步的。

Target: Distributed Snapshot but low impact on performance; low space costs that contains only operator states in acyclic execution topologies.

整个算法分成两个部分,一个是acyclic,另一个是cyclic。对于acycilc部分,只需要保存每个节点的状态,不需要保存节点所发出的信息。对于cyclic,首先需要识别构成环的节点,就是说有back-edge的节点。识别出这个back-edge之后,需要back-edge所在节点中收到的信息。算法细节后面分析。

对于acyclic单向图的部分 这篇文章解释的很清楚,https://zhuanlan.zhihu.com/p/43536305

  1. 所有的通信channel需要是先进先出(FIFO)按顺序的
  2. 中心coordinator:需要有一个中心coordinator来不断广播持续增长的stage barrier到所有的src数据流里。(比如先给所有src发1,然后5秒后发2,10秒发3..... 如此增长)
  3. 数据源src,当数据源收到第n个barrier的时候: -保存状态,保证当需要replay从任意n开始的消息时,可以replay在自己收到barrier-n之后的所有消息。 -广播barrier给下游。
  4. 中间处理节点或最终叶子节点:假设一个中间处理节点或最终叶子节点需要m个input流,当在某个input流收到barrier-n的时候, -block这个input流保证不再收取和处理。 -当收到所有m个input的barrier-n的时候, --Pi-LocalSnapshot-n: 保存本地状态(take local snapshot n), 保证可以从这个状态恢复(比如存到云端, 在另外x台机器做replica),假设我们每个logic processor都有一个id为Pi,那么每个logic processor的在收到所有inputs的barrier-n之后所保存的本地状态快照则设为Pi-LocalSnapshot-n --向自己的下游广播barrier-n (如果是叶子节点没有下游,那么不需要广播) --广播给下游之后需要unblock自己的input流,以便计算可以继续进行下去
  5. CompleteGlobalSnapshot-n: 当所有的节点(源,中间处理,叶子节点)都处理完barrier-n且完成取快照(take snapshot)的任务之后,我们说我们有了一个完整的全局快照。这意味着我们的deterministic的进度,进步到了barrier-n

当Fail的时候:当需要任意节点挂掉,我们从最近的Complete Global Snapshot-n,来重启整个系统;即,健康的节点rollback自己的状态到“接收到barrier-n时候所取的状态快照。fail掉的节点的逻辑Processor Pi被jobManager之类的东西,用自己在的Pi-LocalSnapshot-n重启设置本地状态之后,才开始接收上游的消息。

Why Consistent: 可以看到当failover的时候,全部节点的状态都回退到了barrier-n之前的数据源message所导致的全网状态,就好像数据源在barrier-n之后根本没有发过消息一样。不断发出的barrier就好像逻辑时钟一样,然而“时间”流动到不同地方的速度不同,只有当一个时间“点”全部流动到了全网,且全网把这个时间“点”的状态全部取了快照(注意当网络很大,最后一个节点取完快照,初始节点可能已经前进到n+5,n+10了,但是由于最后一个节点才刚取完快照,CompleteGlobalSnapshot-n只到n,n是全局consistent的记录点) 这里应该用到了一个Chandy-Lambert算法中提到的概念,pre-barrier message和post-barrier message。Flink的算法保证所有pre-barrier message都被take in到了当前节点的状态中。对于深度学习训练任务来说,是不是可以理解为这些message(gradient)已经update到了当前的model state。

一些可能的优化:在原来的恢复过程中,所有健康的节点似乎都要rollback。但如果一些健康的节点的路径和fail节点的路径完全独立呢?可以不进行rollback。但是似乎仍然要等待?所以不rollback可能也不一定会有什么效果。

截屏2021-09-08 下午2 50 54

PeterSH6 commented 3 years ago

感觉训练任务的拓扑结构主要是无环图。所以有环图的部分简略总结。

截屏2021-09-08 下午2 53 57
  1. 首先通过DFS识别back-edge,识别到一个节点被访问两次,最后访问那次经过的路径就是back-edge
  2. 大部分与acyclic类似,但是对于back-edge返回的节点,当其余inpu流收到barrier-n之后,就进行snapshot-n,完成之后unblock,同时开始保存从back-edge返回的message,这些message属于pre-barrier message因为此时back-edge还未把barrier-n传回给该节点。此时虽然保存了back-edge的pre-barrier信息,但这些pre-barrier信息仍然改变当前节点的状态
  3. 当back-edge发送barrier-n给回节点时,将之前snapshot-n和pre-barrier message保存到reliable storage。
PeterSH6 commented 3 years ago

最后梳理一下本文用到的Theory Part

  1. Exactly One Message: snapshot-n到storage中的状态保证是由pre-barrier-n message所改变的。因为是正常运行,所以Exactly Once。恢复之后,只需要replay post-barrier-n message,此时也是正常运行,Exactly Once。如果snapshot-n途中出现fail,则失败,下次需要roll-back到snapshot-n-1
  2. Termination: Every node will eventually receive barriers from all its inputs.
  3. Feasibility: FIFO & Blocking of the input channel upon barriers ensuring that no post-shot records of a stage (records succeeding a barrier) are processed before a snapshot is taken
PeterSH6 commented 3 years ago

Further Reading:

  1. 9 (极大拓展了Flink的distributed snapshot的应用场景。)