Open andrew0928 opened 1 year ago
https://columns.chicken-house.net/2023/10/01/reorder/
好久沒有來練習解題的技巧了,這次來聊個有趣的題目: 如果我透過 API 短時間收到大量的 Request, 我如何保證訊息必須按照順序處理?順序問題,我常碰到的是都不分青紅皂白的就丟到 Message Queue, 結果塞進去的順序就已經不對了,才來問我順序該怎麼辦, 我只能雙手一攤說我也沒轍啊 XDD! 如果你沒辦法一開始就用對的順序放進 Message Queue,你就必須自己面對這問題了。這篇我就是想來聊聊,當 “必要” 時,你會如何處理這種問題?當年在學網路通訊時,TCP 跟 UPD 的差異就是會處理封包順序 & 重送問題,也剛好讀過背後的作法,你懂原理後就會有機會自己解決。類似例子其實很多,QoS 的應用也是,先前研究過的 微服務基礎建設: 斷路器 #1, 服務負載的控制 那篇,你懂 QoS 怎麼做 Rate Limit,必要時你就能自己解決商業需求。練習前的思考: 我需要了解這些機制嗎?所以,別再糾結 “是否有必要重新發明輪子” 的問題了,你不需要重新發明每個輪子,但是要不要讓自己有能力重新建立 “必要” 的輪子,就是你的選擇了。我認為最佳的平衡點,就是做好必要的練習就夠了,一方面投入不算多,另一方面可以得到保障,確保你有能力建構複雜且需要高度整合的系統。我自己擔任架構師的角色,這投資是很值得的,因為我有不得不面對這些難題的理由。對這問題我下個結論: 你需要有能力了解原理 (但不用真的去開發),然後才能判斷要不要自己開發 (重新發明輪子)。如果必要,你才有能力執行。因此,有沒有必要? 你需要自行判斷。一般小規模的案子應該都不需要用上這樣的技巧,但是當你負責特定系統的關鍵設計時,你可能就需要具備這樣的能力,確保系統的開發不會被技術限制所影響。你需要做的,至少先了解背後的原理或是演算法,並且照著這篇的練習,練過你就知道真正要自己做的難度,在心裡記得這經驗即可。這些練習不大花時間,但是將來你在判斷技術選擇時,就能幫助你立刻做出技術決策。1. 訊息排序的基本觀念回到主題: re-order 重新排列。由於你沒辦法像 “排序” (sort) 一般,等到資料收齊了再一次性排序;這邊面對的狀況是你是 “一個一個” 收到不一定正確的順序排列的訊息,而你又必須盡快地照順序處理掉,因此你需要的流程大致上是這樣: 如果收到的訊息是對的,就馬上處理;如果不對,那就放在手上 (時間越短越好),看是要等缺的訊息補上,或是放棄某個訊息繼續往下。其中你要抉擇的是,到底是要等待的時間越短越好? 還是放棄的訊息數量越少越好? 兩者通常必須取捨..其實除了 TCP 通訊機制之外,也有不少類似問題的討論。我貼幾篇我看過的參考:Event Ordering in Distributed System - GeeksforGeeksCausal Ordering of Messages in Distributed System - GeeksforGeeks另外,我也貼一下 Wiki 上面提到 TCP 處理封包沒有依序傳輸的處理方式:傳輸控制協定 (TCP, Transmission Control Protocol)其中有一張圖,說明了傳輸過程中接收端怎麼靠 buffer + ack 達成這件事:平常我都會看這類介紹看一下人家怎麼解題的,但是大部分都看完有個印象就好了。這年代你不需要用腦袋 “背” 下所有的解決方式,腦袋很貴的,你只要當作索引,需要時有正確關鍵字 (或是現在要改成: 有正確的 Prompt) 等到真的需要實做時,你能馬上查出正確解法就夠了。我自己消化過這些問題,結論有幾個: 你的訊息必須要能標示順序當你拿到任意兩個 message, 你能否從訊息本身知道誰先誰後? 能否知道中間有無漏掉第三個 (或更多) 訊息? 舉例來說,有來源標記的 timestamp, 就能判斷先後;有來源標記的 sequence number, 你就能判斷中間有無掉號碼.. 你必須界定處理範圍 (緩衝時間)如果你先收到後面的訊息,你有多少時間 (緩衝) 能等待前面的訊息送過來? 例如 3 sec 內前面的訊息沒收到你就要放棄了 (有些訊息有時效性,你無法抓著他等太久) 你必須界定處理範圍 (緩衝空間)類似 (2),如果你先收到後面的訊息,你有多少空間 (緩衝) 能等待前面的訊息送過來? 例如中間掉的訊息超過 10 個你就要放棄了 (有些場合你的暫存空間有限,你無法保留過多訊息) 其中,(1) 談的其實是訊息結構的設計與定義,(2) (3) 則是處理能力與容錯能力 (我把他歸類在 SLO) 的要求。基本概念就是這樣,從來源不斷收到的訊息,會擺在 buffer 內,如果你判定順序都對,而且是連續的不需要處理,就直接處理掉了。如果不連續,你就必須先收著 (留在 buffer 內),並且等待下一個應該收到的訊息到來,直到 buffer 內暫時留著的訊息也能拿出來為止。而例外處理則是 buffer 空間不夠,或是保留的時間太久,你不能再等下去了,你就要明確地告訴後面一關,有多少訊息被放棄了。這大概就是約略的處理方式。想清楚作法,我想建立一個模型,來表達上面的結構。同時這個模型,我會把關鍵的部分 (溝通介面 & 資料定義) 直接用 code 來描述規格。我想像的處理模型:對應到程式碼,主要就這三部分,以及我自己追加的監控體系: command source:訊息來源, 會按照流水號標號送出, 並且模擬網路傳輸會有不固定的延遲, 導致接收時可能會有順序的變化 ( ‘GetCommands` ) command buffer:需要緩衝 (buffer) 來存放未按順序收到的 command, 重新排列後往後送的機制 (IReOrderBuffer) command handler:執行接收到的 command, 這邊預期一定要是正確的順序 (ExecuteCommand) monitor:Metrics, (後面說明)接下來,我就分別用程式碼來描述上面這四項應該長成什麼樣子,先把整個解題的框架建立起來,而你要練習的,就是把 (2) 用你的想法做出來,讓整個體系能夠運作。最後 (4) 則是我要模擬觀察特定的指標 (Metrics) 來了解整個系統的運作狀況。相關程式碼,如果你忍不住的話可以先看我的 GitRepo: Andrew.ReOrderDemo, 或是看完我的說明再去挖 code 出來看也可以。上面列的四點,都分別在 Program.cs 內有應用的方式,我按照這邏輯個別介紹:1-1, 訊息來源 ( GetCommands )直接來看 code, 我也不多寫什麼框架或是類別了,我直接在 Program.cs 就放上我的主程式架構:// 模擬實際接收到的 Command 順序 (會按照亂數,隨機的局部前後位移)static IEnumerable GetCommands(int period = 100, int noise = 500) { ... } // 接收到的 Command 結構public class OrderedCommand{ // 流水號,從 0 開始,按照約定必須是連續編號 public int Position = 0; // command 建立的時間 public DateTime Origin = DateTime.MinValue; // command 收到時間 public DateTime OccurAt = DateTime.MinValue; // command 內容說明 public string Message;}我先定義每個 Command 該長什麼樣子的模型,我用了 OrderedCommand 這類別來代表。前面提到一定要能分辨明確的順序,並且判定中間是否有漏接 Command, 我用的是 Position 流水號欄位來標示。而 IEnumerable GetCommands(...) 則是用 yield return 連續產生一連串會隨機局部錯開順序的 Command 產生器。每個 Command 要標示順序 (Position), 預設從 0 開始編號, 一定會是連續流水號。除了標號之外也要標示來源端發出時間 (Origin), 以及模擬網路傳輸,可能造成的隨機延遲 ( 0 ~ {command_noise} msec ) 標示實際收到的時間 (OccurAt)。最終 GetCommands() 傳回的順序,會按照 OccurAt 來排列的,而不是 Origin (意思是會考慮傳輸的隨機延遲後真正收到的順序), 因此你要在後面的階段 (Command Buffer) 想辦法把錯亂的順序修正回來。修正過程中可能會有些狀況,因此你必須盡快的辨識出哪些 Command 能正常送出 (SEND),那些已收到的 Command 必須被丟棄 (DROP),那些應該收到但是遲遲未收到的應該被略過 (SKIP)。1-2, 重整訊息順序 ( IReOrderBuffer )public interface IReOrderBuffer{ public bool Push(OrderedCommand data); public bool Flush(); public event CommandProcessEventHandler CommandIsReadyToSend; public event CommandProcessEventHandler CommandWasDroped; public event CommandSkipEventHandler CommandWasSkipped;}我的構想是: 從 1-1 收到的訊息,都立刻放進 (Push) 能串流重新排序的 IReOrderBuffer,後續的動作都由 IReOrderBuffer 的實做 (就是要練習的部分) 來決定。你要決定哪些訊息可以被處理,決定好了就透過事件來通知 Command Handler 該做什麼事。這邊定義了三個事件: CommandIsReadyToSend:這事件就可以通知 Command Handler, 某個 Command CommandWasDroped:Command 已經收到, 但是因為各種原因 (例如收到過去流水號的指令,或是 Buffer 滿了被迫丟棄) 必須放棄處理的通知 CommandWasSkipped:跟 CommandWasDropped 類似,但是差別在於 Skip 的指令還沒收到 (因此我也不知道內容是啥),只是從流水號判定中間應該還有存在某些 Command 掉在半路上。同樣,因為這種原因,告知必須略過指定的 Command 事件通知1-3, 處理訊息 ( ExecuteCommand )這部分就沒什麼了,單純的把 IReOrderBuffer 判定結果後通知的 Command 接過來處理而已,處理前做一點基本的檢查 (順序):static object _sync_command = new object();static int _last_command_position = 0;static bool ExecuteCommand(OrderedCommand cmd){ if (cmd == null) return false; if (cmd.Position <= _last_command_position) { Console.WriteLine(
https://columns.chicken-house.net/2023/10/01/reorder/
好久沒有來練習解題的技巧了,這次來聊個有趣的題目: 如果我透過 API 短時間收到大量的 Request, 我如何保證訊息必須按照順序處理?順序問題,我常碰到的是都不分青紅皂白的就丟到 Message Queue, 結果塞進去的順序就已經不對了,才來問我順序該怎麼辦, 我只能雙手一攤說我也沒轍啊 XDD! 如果你沒辦法一開始就用對的順序放進 Message Queue,你就必須自己面對這問題了。這篇我就是想來聊聊,當 “必要” 時,你會如何處理這種問題?當年在學網路通訊時,TCP 跟 UPD 的差異就是會處理封包順序 & 重送問題,也剛好讀過背後的作法,你懂原理後就會有機會自己解決。類似例子其實很多,QoS 的應用也是,先前研究過的 微服務基礎建設: 斷路器 #1, 服務負載的控制 那篇,你懂 QoS 怎麼做 Rate Limit,必要時你就能自己解決商業需求。練習前的思考: 我需要了解這些機制嗎?所以,別再糾結 “是否有必要重新發明輪子” 的問題了,你不需要重新發明每個輪子,但是要不要讓自己有能力重新建立 “必要” 的輪子,就是你的選擇了。我認為最佳的平衡點,就是做好必要的練習就夠了,一方面投入不算多,另一方面可以得到保障,確保你有能力建構複雜且需要高度整合的系統。我自己擔任架構師的角色,這投資是很值得的,因為我有不得不面對這些難題的理由。對這問題我下個結論: 你需要有能力了解原理 (但不用真的去開發),然後才能判斷要不要自己開發 (重新發明輪子)。如果必要,你才有能力執行。因此,有沒有必要? 你需要自行判斷。一般小規模的案子應該都不需要用上這樣的技巧,但是當你負責特定系統的關鍵設計時,你可能就需要具備這樣的能力,確保系統的開發不會被技術限制所影響。你需要做的,至少先了解背後的原理或是演算法,並且照著這篇的練習,練過你就知道真正要自己做的難度,在心裡記得這經驗即可。這些練習不大花時間,但是將來你在判斷技術選擇時,就能幫助你立刻做出技術決策。1. 訊息排序的基本觀念回到主題: re-order 重新排列。由於你沒辦法像 “排序” (sort) 一般,等到資料收齊了再一次性排序;這邊面對的狀況是你是 “一個一個” 收到不一定正確的順序排列的訊息,而你又必須盡快地照順序處理掉,因此你需要的流程大致上是這樣: 如果收到的訊息是對的,就馬上處理;如果不對,那就放在手上 (時間越短越好),看是要等缺的訊息補上,或是放棄某個訊息繼續往下。其中你要抉擇的是,到底是要等待的時間越短越好? 還是放棄的訊息數量越少越好? 兩者通常必須取捨..其實除了 TCP 通訊機制之外,也有不少類似問題的討論。我貼幾篇我看過的參考:Event Ordering in Distributed System - GeeksforGeeksCausal Ordering of Messages in Distributed System - GeeksforGeeks另外,我也貼一下 Wiki 上面提到 TCP 處理封包沒有依序傳輸的處理方式:傳輸控制協定 (TCP, Transmission Control Protocol)其中有一張圖,說明了傳輸過程中接收端怎麼靠 buffer + ack 達成這件事:平常我都會看這類介紹看一下人家怎麼解題的,但是大部分都看完有個印象就好了。這年代你不需要用腦袋 “背” 下所有的解決方式,腦袋很貴的,你只要當作索引,需要時有正確關鍵字 (或是現在要改成: 有正確的 Prompt) 等到真的需要實做時,你能馬上查出正確解法就夠了。我自己消化過這些問題,結論有幾個: 你的訊息必須要能標示順序當你拿到任意兩個 message, 你能否從訊息本身知道誰先誰後? 能否知道中間有無漏掉第三個 (或更多) 訊息? 舉例來說,有來源標記的 timestamp, 就能判斷先後;有來源標記的 sequence number, 你就能判斷中間有無掉號碼.. 你必須界定處理範圍 (緩衝時間)如果你先收到後面的訊息,你有多少時間 (緩衝) 能等待前面的訊息送過來? 例如 3 sec 內前面的訊息沒收到你就要放棄了 (有些訊息有時效性,你無法抓著他等太久) 你必須界定處理範圍 (緩衝空間)類似 (2),如果你先收到後面的訊息,你有多少空間 (緩衝) 能等待前面的訊息送過來? 例如中間掉的訊息超過 10 個你就要放棄了 (有些場合你的暫存空間有限,你無法保留過多訊息) 其中,(1) 談的其實是訊息結構的設計與定義,(2) (3) 則是處理能力與容錯能力 (我把他歸類在 SLO) 的要求。基本概念就是這樣,從來源不斷收到的訊息,會擺在 buffer 內,如果你判定順序都對,而且是連續的不需要處理,就直接處理掉了。如果不連續,你就必須先收著 (留在 buffer 內),並且等待下一個應該收到的訊息到來,直到 buffer 內暫時留著的訊息也能拿出來為止。而例外處理則是 buffer 空間不夠,或是保留的時間太久,你不能再等下去了,你就要明確地告訴後面一關,有多少訊息被放棄了。這大概就是約略的處理方式。想清楚作法,我想建立一個模型,來表達上面的結構。同時這個模型,我會把關鍵的部分 (溝通介面 & 資料定義) 直接用 code 來描述規格。我想像的處理模型:對應到程式碼,主要就這三部分,以及我自己追加的監控體系: command source:訊息來源, 會按照流水號標號送出, 並且模擬網路傳輸會有不固定的延遲, 導致接收時可能會有順序的變化 ( ‘GetCommands` ) command buffer:需要緩衝 (buffer) 來存放未按順序收到的 command, 重新排列後往後送的機制 (IReOrderBuffer) command handler:執行接收到的 command, 這邊預期一定要是正確的順序 (ExecuteCommand) monitor:Metrics, (後面說明)接下來,我就分別用程式碼來描述上面這四項應該長成什麼樣子,先把整個解題的框架建立起來,而你要練習的,就是把 (2) 用你的想法做出來,讓整個體系能夠運作。最後 (4) 則是我要模擬觀察特定的指標 (Metrics) 來了解整個系統的運作狀況。相關程式碼,如果你忍不住的話可以先看我的 GitRepo: Andrew.ReOrderDemo, 或是看完我的說明再去挖 code 出來看也可以。上面列的四點,都分別在 Program.cs 內有應用的方式,我按照這邏輯個別介紹:1-1, 訊息來源 ( GetCommands )直接來看 code, 我也不多寫什麼框架或是類別了,我直接在 Program.cs 就放上我的主程式架構:// 模擬實際接收到的 Command 順序 (會按照亂數,隨機的局部前後位移)static IEnumerable GetCommands(int period = 100, int noise = 500) { ... } // 接收到的 Command 結構public class OrderedCommand{ // 流水號,從 0 開始,按照約定必須是連續編號 public int Position = 0; // command 建立的時間 public DateTime Origin = DateTime.MinValue; // command 收到時間 public DateTime OccurAt = DateTime.MinValue; // command 內容說明 public string Message;}我先定義每個 Command 該長什麼樣子的模型,我用了 OrderedCommand 這類別來代表。前面提到一定要能分辨明確的順序,並且判定中間是否有漏接 Command, 我用的是 Position 流水號欄位來標示。而 IEnumerable GetCommands(...) 則是用 yield return 連續產生一連串會隨機局部錯開順序的 Command 產生器。每個 Command 要標示順序 (Position), 預設從 0 開始編號, 一定會是連續流水號。除了標號之外也要標示來源端發出時間 (Origin), 以及模擬網路傳輸,可能造成的隨機延遲 ( 0 ~ {command_noise} msec ) 標示實際收到的時間 (OccurAt)。最終 GetCommands() 傳回的順序,會按照 OccurAt 來排列的,而不是 Origin (意思是會考慮傳輸的隨機延遲後真正收到的順序), 因此你要在後面的階段 (Command Buffer) 想辦法把錯亂的順序修正回來。修正過程中可能會有些狀況,因此你必須盡快的辨識出哪些 Command 能正常送出 (SEND),那些已收到的 Command 必須被丟棄 (DROP),那些應該收到但是遲遲未收到的應該被略過 (SKIP)。1-2, 重整訊息順序 ( IReOrderBuffer )public interface IReOrderBuffer{ public bool Push(OrderedCommand data); public bool Flush(); public event CommandProcessEventHandler CommandIsReadyToSend; public event CommandProcessEventHandler CommandWasDroped; public event CommandSkipEventHandler CommandWasSkipped;}我的構想是: 從 1-1 收到的訊息,都立刻放進 (Push) 能串流重新排序的 IReOrderBuffer,後續的動作都由 IReOrderBuffer 的實做 (就是要練習的部分) 來決定。你要決定哪些訊息可以被處理,決定好了就透過事件來通知 Command Handler 該做什麼事。這邊定義了三個事件: CommandIsReadyToSend:這事件就可以通知 Command Handler, 某個 Command CommandWasDroped:Command 已經收到, 但是因為各種原因 (例如收到過去流水號的指令,或是 Buffer 滿了被迫丟棄) 必須放棄處理的通知 CommandWasSkipped:跟 CommandWasDropped 類似,但是差別在於 Skip 的指令還沒收到 (因此我也不知道內容是啥),只是從流水號判定中間應該還有存在某些 Command 掉在半路上。同樣,因為這種原因,告知必須略過指定的 Command 事件通知1-3, 處理訊息 ( ExecuteCommand )這部分就沒什麼了,單純的把 IReOrderBuffer 判定結果後通知的 Command 接過來處理而已,處理前做一點基本的檢查 (順序):static object _sync_command = new object();static int _last_command_position = 0;static bool ExecuteCommand(OrderedCommand cmd){ if (cmd == null) return false; if (cmd.Position <= _last_command_position) { Console.WriteLine(