opensource4you / astraea

釋放kafka的無限潛能
Apache License 2.0
130 stars 46 forks source link

Discussion of Balancer #289

Closed garyparrot closed 2 years ago

garyparrot commented 2 years ago

索引

garyparrot commented 2 years ago

3/24 開會記錄

內容

現在要做的事情

討論的重點

今天的總結

chia7712 commented 2 years ago

這個討論和結論很棒,以下一些想法:

是不是只有 Leader Replica 會把資料 copy 給 Follower。會不會有 Follower copy 給 Follower 的情況?

Follower只會跟leader拉資料。這關係到資料一致性

網路的讀取,不一定會用到 Disk (有 Memory 當 hot data 的 buffer)

這個只會在不斷拉最新資料的狀況下比較容易發生。對於一些仰賴seek的應用來說,每次讀取是會碰到硬碟

Cost Function 可能要支援非 JMX 的 metric? 如果我們要走這部分?

yep,如果有需要的話

chia7712 commented 2 years ago

你們這些討論很棒,不過要注意能否聚焦到能動手做的地步,別試著要做一個全世界通用的解法。

從實務的角度上,我會建議你們反過來先弄實驗,這樣的好處是你們可以從「真實」的狀況去體驗什麼是負載不平衡。例如先模擬以下情景:

1)不斷有資料進出 2)資料會定期被刪除 3)隔一段時間會產生新的topic or partition 4)隔較久的時間會加入新節點

觀察上述實驗下叢集的「整體吞吐量」是變高還是變低?各節點的資源使用量有平均嗎?

上述這個弄完就代表你們已經做了一個「評測工具」了

garyparrot commented 2 years ago

3/26 討論結果

討論

負載情境

  1. Realtime 的應用, 持續性的讀, 持續性的寫
    • n * Producer 向 Topic 塞資料, 1 Consumer 讀取資料
    • 1 * Producer 向 Topic 塞資料, n Consumer 讀取資料
    • n * Producer 向 Topic 塞資料, n Consumer 讀取資料
    • 1 * Producer 向 Topic 塞資料, 1 Consumer 讀取資料
  2. 讀寫對象 Topic 會和時間點有關聯
    • 時間為 i 時,n * Producer 向 Topic_i 塞資料, n Consumer 讀取 Topic_i 資料
  3. Offline Log Processing, 只在特定時間點會有 Consumer 爬起來從頭讀到尾
    • n * Producer 向 Topic 塞資料, 平常沒有 Consumer 讀取,只有在特定時間點時 Consumer 起來,從頭讀到尾
  4. Producer/Consumer 的讀寫存在極端峰值
    • Producer 寫入的資料存在很顯著的 Burst Loading
    • Consumer 讀取的資料量存在很顯著的 Burst Loading
  5. Topic 的 partition key 存在熱門 key
    • 單位時間下,每個 Partition 會遇到不同的輸入流量
  6. 建立好的 Topic 偶爾會被刪除
  7. 偶爾會有新 Broker 加入
  8. 偶爾會有新 Topic 或應用加入

總結

chia7712 commented 2 years ago

@garyparrot 索引的日期寫錯了

garyparrot commented 2 years ago

實驗 1

實驗結論

  1. CPU 的使用在我們的情境中沒有特別顯著,不過我們沒有測 Compression 或是其他會依賴 CPU 的應用情境所以重不重要不好說。
  2. data retention的設定非常重要,因為搬移replica需要估計其成本,因次也需要考慮未來有多少資料會被retention移除。
  3. broker的負載無法用 replica個數來預估,因為每一個replica上的負載可能不一。
  4. 如果以 disk 儲存負載來說,每個 replica 經歷到的壓力是一樣的。但是以 network 來說 leader 會經歷到更大的 egress 壓力,因此也需要考量到leader平衡的問題。

實驗情境

所有設備直接連到那臺 10 Gbits Switch

數據

第一次實驗結果

https://snapshots-origin.grafana.net/dashboard/snapshot/NrjQN7IVzsF3catV8rfbRWXYlzydPl0Y?orgId=2

第二次實驗結果

https://snapshots-origin.grafana.net/dashboard/snapshot/AHpF4U00hYgj68Zm06zMvppetl37wYdb?orgId=2

觀察心得

image-20220329183535306

image-20220329183023686

資料一直堆積,總有一天會塞滿能夠使用的儲存空間,如果要平衡 Data Folder 儲存空間的使用量,未來會有多少資料會因為 retention 而被移除也必須要考量進去。

image-20220330150937689

注意到雖然現在每個 broker 服務的 replica 數量大致上一樣,但是每個 replica 帶來的壓力不一樣,所以每個 broker 經歷不一樣的負載

image-20220330151203251

最忙碌的 broker 上,負載最顯著的是 HotKey Application 中最繁忙的那個 Partition,其 30 min window 下的平均負載約 16.2 MiB,是第二名的一半

image-20220330151814766

如果 Balancer 對這個忙碌的 partition 進行移動,他要付出的單位搬移流量會需要超過 16.2 MiB,而且搬移是連過去累計的資料也要搬移,代價會比較高。

image-20220330152542858

這邊還有另外一個議題是 replica, 如果以 disk 儲存負載來說,每個 replica 經歷到的壓力是一樣的。但是以 network 來說 leader 會經歷到更大的 egress 壓力。我本來想畫每個 repliac 帶來的 network 壓力。但是如果要畫出這些資訊,需要 JMX 之外的資訊(誰是 leader 誰是 follower),而 Kafka 提供的 JMX 中得不到那些資訊,所以我沒辦法建立出這種圖表,可能後面我真的要做一個客製化的 exporter 來釋出這些訊息...(單純觀測用, 和 Balancer 無關)

image-20220330175446217

可以看到新加入的 broker 在 replica 分佈上沒有自動平衡。

image-20220330175722069

這是新增後,每個 log 在資料儲存上的負載。

chia7712 commented 2 years ago

而 Kafka 提供的 JMX 中得不到那些資訊,所以我沒辦法建立出這種圖表,可能後面我真的要做一個客製化的 exporter 來釋出這些訊息

使用Kafka APIs如何?傳遞bootstrap servers的資訊是可以接受的選項,因為是很輕盈的舉動

chia7712 commented 2 years ago

@garyparrot @qoo332001 感謝你們做的實驗(希望是你們兩個一起弄的)。上述實驗有觀察到一些現象了,可否在文章最前面先做一個總結?謝謝

garyparrot commented 2 years ago

使用Kafka APIs如何?傳遞bootstrap servers的資訊是可以接受的選項,因為是很輕盈的舉動

要怎麼把 Kafka APIs 的資訊吐到 Prometheus 裡面,中間應該還是要經過一個 exporter 轉換訊息格式?

https://github.com/danielqsj/kafka_exporter

這邊有一個現成的 Kafka Exporter,我看了一下裡面的東西可能可以直接拿來使用

garyparrot commented 2 years ago

下圖是我和 @qoo332001 確認過的

  1. 負載不平衡監控過程
  2. 負載平衡計劃產生過程(稱呼搬移計劃好像不太好,容易忽略透過更換 leader 也能負載平衡這件事)

的大略流程

https://user-images.githubusercontent.com/39105714/161544510-d848e550-17bc-48c9-a5df-4c1316ee507e.jpg

因為整體架構要能夠被使用者調整選用的 cost function,還有 fine-tune 很多參數,所以許多操作便得很抽象,到時候實作上的複雜程度應該會很高...

這是以完整的框架下去思考,內容比較抽象,後面不一定會這樣做,但只少他點出了一些後續會遇到我覺得很嚴重的難處。

收集 ClusterState

收集給 Cost Function 評分所需的資訊

另外,leader 對效能影響很大,但這些資訊透過 JMX 拿不到,必須要能夠客製化傳入的資訊。

餵 ClusterState 給 Cost Functions

假定到時候會有許多 Cost Functions,每個會依照自己關心的 aspect 給各個 broker 打一個分數,這階段會產生給每個 broker 根據不同項目產生出很多 0 ~ 1 的數字

Any Cost Violation?

這邊會有一個機制確定有沒有任何分數超過一個閥值,如果有就必須產生一個負載平衡計劃

向違反的 Cost 要 Suggestion

後續我們必須要產生一個 負載平衡計劃 來處理閥值越界的現象

在數學上我們要求一個函數 f 的最小值,我們可能可以看斜率或其他的資訊或演算法來找出,這一切大條件都建立在這個 f 是已知的這個條件。

但是由於專案有使用者能夠透過調整參數來變更 Cost Function 和參數的需求,所以我們的 f 在這邊是個未知。加上這個 f 的參數離散且內部邏輯由程式碼組成,基本上當程式碼面對一個使用者自定義的 Cost Function 回傳回來的數字 0 ~ 1 時,他基本上是沒有什麼有幫助的線索能夠進步這個數字的。

一個具體的例子是 RackAwarenessCostFunction ,在一個巨大的叢集,想象給一個 Broker 分配了 1000 個 partition, 然後其中有 m 個 partition 觸犯了 RackAwareness 的規則,假設這個 cost function 直接把值設定為 1 以表示他的不滿,為了修正這個問題我們需要挑出這 m 個 partition,但是以現成的 Cost Function 這件事情很困難,我們可能需要嘗試 1000! / (1000 - m)! 個組合才能找到具體是哪 m 個 partition 觸犯了這個規則,這還是建立在我們知道確實有 m 個違反了這個規則,實際上我們不諱會知道這個數字,所以應該是要嘗試 \sum^{1000}_{m=1} 1000! / (1000 - m)! 次才能找到滿足這個條件的方法。上述的這個手法還有一個問題是我們知道這個 CostFunction 在幹嘛所以可以這樣修,實際上在面對使用者自己寫的 CostFunction 時我們的 Balancer 可能完全沒有頭緒要怎麼滿足他的分數,最後可能只能每種排列組合都測試看看,或是向 BogoSort 那樣隨意打亂排序,希望下次可以刷出好一點的分佈,可是這些手法都不太可靠。

所以這邊提出的想法是 CostFunction 或另外一種和 CostFunction進行組合的物件,能夠主動的向上游提供進步的建議,然後上游會整合這些建議給出一個新的解決方案,當然具體要如何實作還有很多思考空間,實際上可能會很難,且由於要提供建議,所以這個提供建議的機制還需要知道上游負載平衡的機制是什麼(partitoiner for producer, balancer for cluster),所以這些建議機制可能還不能在不同的負載平衡手法之間共用。

把 Cost Suggestion 套用至 Cluster State, 給出更好的 partition, leader 分佈

能夠主動的向上游提供進步的建議,然後上游會整合這些建議給出一個新的解決方案

如同上述

餵 improve 後的 cluster state 給 cost function

同上

Any Cost Violation?

同上面的 cost violation 判斷,如果產生的分佈有問題那要再重新跑一次。

如果沒有任何的 cost violation 則負載平衡計劃拍板定案,送到下一個流程處理。

chia7712 commented 2 years ago

另外,leader 對效能影響很大,但這些資訊透過 JMX 拿不到,必須要能夠客製化傳入的資訊。

可以傳入bootstrap servers,然後用Kafka APIs去拿

但是由於專案有使用者能夠透過調整參數來變更 Cost Function 和參數的需求,所以我們的 f 在這邊是個未知。加上這個 f 的參數離散且內部邏輯由程式碼組成,基本上當程式碼面對一個使用者自定義的 Cost Function 回傳回來的數字 0 ~ 1 時,他基本上是沒有什麼有幫助的線索能夠進步這個數字的。

我再補充一次cost function的目的:它的功用是用來評分,目前可以用來說某個節點的“成本”很高(例如1)。至於要如何使用這些評分不是cost function要關心的事情

一個具體的例子是 RackAwarenessCostFunction ,在一個巨大的叢集,想象給一個 Broker 分配了 1000 個 partition

這個例子很好,我們就從這個再下去討論。

有 m 個 partition 觸犯了 RackAwareness 的規則,假設這個 cost function 直接把值設定為 1 以表示他的不滿,為了修正這個問題我們需要挑出這 m 個 partition,但是以現成的 Cost Function 這件事情很困難,我們可能需要嘗試 1000! / (1000 - m)! 個組合才能找到具體是哪 m 個 partition 觸犯了這個規則

目前cost function評分的角度是從節點來看,但如果有需要我們可以調整架構將其改成從partition來看(也就是回傳值要從Map<Integer, Double>改成Map<TopicPartition, Double>之類的)。如此第一版的balancer就可以設定一個閥值來處理分數大於0.5之類的partitions來作為“違反”規則的條件(當然這個有點粗糙)

實際上在面對使用者自己寫的 CostFunction 時我們的 Balancer 可能完全沒有頭緒要怎麼滿足他的分數,最後可能只能每種排列組合都測試看看,或是向 BogoSort 那樣隨意打亂排序,希望下次可以刷出好一點的分佈,可是這些手法都不太可靠。

同上,如果每個cost function都能針對每個partition來做評分,那麼balancer可以依照自己的演算法來針對這些分數做加總加權來找出“違反“的partitions

能夠主動的向上游提供進步的建議,然後上游會整合這些建議給出一個新的解決方案

承上,我這邊舉一個很常見的做法 - 那就是不斷讓cost function針對新的分佈算分,直到取得一個比例的進步或是總共算了幾次中取最好。也就是說,假設balancer用某種方式(第一版可以先用簡單的方式,例如random)算出了一個新的分佈,那就可以拿這個分佈再去餵給cost function,然後比較前後兩種分布是否有進步,如果沒有,那就再算一個新的分佈出來,再來算分

這個做法的精神是:在分散式系統裡面,要去算“最佳解“很難成本太高,因此我們改為在有限的資源下找出區域最佳解。因此當你已經找到需要處理的分區(如上面敘述),那麼就用random的方式去算一個新的分佈,然後重複使用cost function來評斷“這個新的分佈”是不是好棒棒?好棒棒,那就用,不好棒棒,那就再random一個分佈出來


我上面講的都是一些很常見的手法和概念,希望不會是你們最後的解法。

garyparrot commented 2 years ago

04/08 Balancer 第一版確定

目標

Apache Kafka 預設會將 Partition 平均地擺放,但是 Partitions are not equal,每個因爲上層業務邏輯不同可能經歷不同的吞吐量。

chia7712 commented 2 years ago

平衡 Consume 流量 (可行性還需要分析)

提醒一下,當balance開始進行時,partition的搬移會增加許多節點之間的橫向移動(就算同一個節點也是)。因此一個可能的極端狀況是為了平衡consume的量卻反而造成更高的consume的量,這也就是我之前有說,“成本考量”會是一個很重要的議題

另外只考慮network的前提是 - disk的頻寬遠大於network,也就是將disk的負擔當成0。有些叢集作為純ssd組合可能可以這樣假設,但之後我們也要關心那些用hdd的環境

garyparrot commented 2 years ago

04/17 目前進度

目前進度

https://github.com/garyparrot/astraea/tree/62c4e7145a027d7a7a73d5b6d8a7414e43424193/app/src/main/java/org/astraea/balancer/alpha

Balancer

這是 Balancer 目前主要的進入點:org.astraea.balancer.alpha.Balancer

// https://github.com/garyparrot/astraea/blob/62c4e7145a027d7a7a73d5b6d8a7414e43424193/app/src/main/java/org/astraea/balancer/alpha/Balancer.java#L56-L76

  public Balancer(Argument argument) {
    // initialize member variables
    this.argument = argument;
    this.jmxServiceURLMap = argument.jmxServiceURLMap;
    this.registeredBrokerCostFunction = Set.of(new LoadCost(), new MemoryWarningCost());
    this.registeredTopicPartitionCostFunction = Set.of();
    this.scheduledExecutorService = Executors.newScheduledThreadPool(8);

    // initialize main component
    this.balancerThread = new Thread(this);
    this.metricCollector =
        new MetricCollector(
            this.jmxServiceURLMap,
            this.registeredBrokerCostFunction.stream()
                .map(CostFunction::fetcher)
                .collect(Collectors.toUnmodifiableList()),
            this.scheduledExecutorService);
    this.topicAdmin = TopicAdmin.of(argument.props());
    this.rebalancePlanGenerator = new ShufflePlanGenerator(2, 5);
    this.rebalancePlanExecutor = new StraightPlanExecutor(argument.brokers, topicAdmin);
  }

Constructor 裡面記載一些運作是會用到的模組和物件,比較重要的有

  1. registeredBrokerCostFunction, registerTopicPartitionCostFunction,這些是 Balancer 要採用的 Cost Function,目前是寫死狀態,在 Broker 和 TopicPartition 的界面整合起來之前只能這樣用。
  2. MetricCollector, 類似 NodeCollector 的東西,在蒐集 metrics,我有試著用 NodeCollector 寫過但是最後看起來很奇怪,中間多了很多我覺得不必要的抽象和潛在的意外,最後選擇自己寫一個。
  3. TopicAdmin 用來建立假的 ClusterInfo 物件,還有下達 Reassignment 指令。
  4. RebalancePlanGenerator 定義一個計劃生成模組的界面,每當呼叫 RebalancerPlanGenerator#generate(ClusterInfo clusterNow) 必須要給出一個 RebalancePlanProposal,後面再談細節。
  5. RebalancePlanExecutor,這個模組接受一個 RebalancePlanProposa 且嘗試執行他。

整體流程

主要邏輯在這裡

用人話來說過程是:

  1. 透過 TopicAdminMetricCollector 建立目前叢集狀態的 ClusterInfo
  2. ClusterInfo 餵給每個 CostFunctions, 取得他們的回傳結果。
  3. 重複執行 RebalancePlanGenerator 幾次(目前固定 1000),產生出多種可能的 Proposal。
  4. 把每個 Proposal 給出的分佈和過去的 metrics (BeanObjects) 湊成一個虛假的 ClusterInfo。把這個虛假的 ClusterInfo 餵給 CostFunctions[^1],透過 Balancer#costSum,將給定的 CostFunction 分數們透過特定的邏輯計算成一個整體分數 [^2],這個分數越低越好。這個特定的邏輯目前是寫成所有數字的平均,這個手法沒有物理意義,等真正的 cost function 做好之後會在修正這部分。
  5. 比較當前叢集和 generator 提出的 cost function 結果分數,找出 costSum 最低最好的那一個,如果 generator 提出的 cost function 下的 costSum 分數有比目前的分佈的 costSum 好一點,那就呼叫 RebalancePlanExecutor 來執行這個 costSum 最佳的 RebalancePlanProposal

[^1]: 這部分可能有點爭議,因為舊的 Metrics 是源自目前的叢集狀態分佈,我們製造了一個 generate 出來的假分佈,然後將不屬於他的 metrics 和這個假分佈粘在一起打分好像不太對勁,因為這個 metrics 不是源自 generate 出來的這個分佈。但是這部分我目前沒想到解法,不知道有沒有其他更可靠的手法來衡量一個虛假的分佈好不好。 [^2]: 這個 costSum 計算會高度依賴採用的 Cost Function,如果 Cost Function 有換或是更新,這個 costSum 的計算也要被重新實作。

RebalancePlanGenerator

計劃生成的部分目前是使用 ShufflePlanGenerator

隨機移動給個 Replicas,然後當作一個 RebalancePlanProposal。可能會有人 argue 這個實作太無腦,但是在小規模的叢集和給定大量的時間下這個手法應該還行。

我們可以進步這個 Generator,可能讓他能夠取得更多資訊,以此來給出一些比較準確,比較沒有問題的計劃提案。但是這只會增加叢集 regress 到 stable state 的速度,最終這個方案多好還是由 Cost Function 說了算,受限專案時間壓力我目前不打算把時間壓在這裡,1 秒產生出可用的計劃提案和 60 秒產生出來對目前的我們來說還好,先有能動的東西再說,比較好的 Generator 我想先當 future work,現在投資這個沒太大的報酬率。

可能有人會說計劃生成為何會採取隨機的方法,為何不直接由 Cost Function 對症下藥下去產生,比如我們知道某個 Broker 比較忙,那就把那個 Broker 的 loading 移動到比較輕的 broker 去(就像絕大多數的負載平衡 Paper 那樣)。這是因為專案有能讓使用者客製化平衡定義的需求,所以這個對症下藥的部分在客製化平衡定義的要求下有衝突。實際上也不存在 100% 照料到所有情況的 Balance Solution(如果有我猜他可能表現分數只有到 60 分),我感覺最終都回歸到 Admin 對現在的叢集知道多少 workload pattern,知道的情報越多能夠 optimize 的空間越大(60 分繼續往上)。

RebalancePlanExecutor

這個模組管理如何執行給定的 Proposal,執行的部分必須要畫出來當做一個獨立的模組,因為搬移會給叢集帶來一定的開銷,如果全搬移的話可能會衝擊到其他服務流量。這裡要有個一定的邏輯來 guard 這個過程。

目前是使用 StraightPlanExecutor,給定的 reassignment 全部立刻執行。

雖然前面提到全搬移的話會衝擊流量,不過目前套用的 ShufflePlanGenerator 不會套用太多的變更(頂多 2 ~ 5 個),所以影響還好,如果影響太大的話也能再調成一次只搬移一個,最終結果只是 regress 到 stable state 的所需時間上升,這個部分也是 future work,等以後有空的時候在來思考怎樣的執行過程比較好,現在投資這裡沒什麼意思。

Movement Cost

搬移成本會是一個獨立的 Cost Function,他的參數可能會是 old ClusterInfo 和 new ClusterInfo。當做分數打,好處是我們的產生出來的計劃可以顧及到這部分,潛在的問題是可能又要去更動 Cost Function 的實作,不過考慮到 Partitioner 絕對不會用到這個 Cost Function 所以可能把它寫死成 Balancer Only 的 Cost Function 會比較好。

(這個還沒弄進去,我剛剛想到我沒弄這個)

幾個未來的重點

chia7712 commented 2 years ago

@garyparrot 能不能發個PR? 這樣方便針對特定程式碼展開討論,可以在PR上標註為draft

chia7712 commented 2 years ago

實際上也不存在 100% 照料到所有情況的 Balance Solution(如果有我猜他可能表現分數只有到 60 分)

這個在業界的討論是要走到建模型去猜“未來應該會怎麼用”,也就是比起隨機到60分,我們用大量過去資料找出未來的可能軌跡,然後就試著往那邊調整,不過這完完全全是另一種解法和議題了

chia7712 commented 2 years ago

這部分可能有點爭議,因為舊的 Metrics 是源自目前的叢集狀態分佈,我們製造了一個 generate 出來的假分佈,然後將不屬於他的 metrics 和這個假分佈粘在一起打分好像不太對勁

你講的很有道理,這的確可能是個問題,畢竟舊的metrics跟新(隨機產生)的分佈的匹配會失真。可否麻煩你開個議題來追蹤這件事情或是在描述上把它重點起來?

chia7712 commented 2 years ago

雖然前面提到全搬移的話會衝擊流量,不過目前套用的 ShufflePlanGenerator 不會套用太多的變更(頂多 2 ~ 5 個),所以影響還好,如果影響太大的話也能再調成一次只搬移一個,最終結果只是 regress 到 stable state 的所需時間上升,這個部分也是 future work,等以後有空的時候在來思考怎樣的執行過程比較好,現在投資這裡沒什麼意思。

這會是一個嚴重影響品質的關鍵,例如在成本考量只能移動一個partition和可以一次移動10個partitions,後者能改善的程度一定比較好(如果比較差那就代表...)。而在”固定成本“的考量下”搬動“哪些partitions能達到此成本下的最好效果(有點類似最佳化的過程還要額外考慮成本,而成本的量化也會是一個題目)就會是你們balancer一個很重大的特色。就如同我會議提到的故事,台灣很多使用者的叢集是沒有太大餘力來應付負載平衡,換言之,如果叢集的95%資源已經用在日常業務上,那就代表負載平衡能用到的資源最多只有5%,而我們的目標就是如何在這5%下擠出最佳的配置

garyparrot commented 2 years ago

畢竟舊的metrics跟新(隨機產生)的分佈的匹配會失真。可否麻煩你開個議題來追蹤這件事情或是在描述上把它重點起來?

其實 Balancer 的實作還有討論空間,要現在弄上去還是等 Merge 後再開?我本來是想說等到實際實驗的時候看看這個問題有沒有彈出來。其實這部分我們沒辦法預知未來,好像也沒什麼能做的,頂多警告 cost function 我們現在丟進去的是假分佈,不是一個 fact,你打分數的時候小心一點。

如果叢集的95%資源已經用在日常業務上,那就代表負載平衡能用到的資源最多只有5%,而我們的目標就是如何在這5%下擠出最佳的配置

最佳配置就是讓他們搬 100 年,巧婦難為無米之炊(joke :3。

而在”固定成本“的考量下”搬動“哪些partitions能達到此成本下的最好效果(有點類似最佳化的過程還要額外考慮成本,而成本的量化也會是一個題目

我忘記在上面寫到一件事,搬移成本有點複雜:我想到兩個可能的 approach

  1. 搬移成本會是一個獨立的 Cost Function,他的參數會是 old ClusterInfo 和 new ClusterInfo。當做分數打,好處是我們的產生出來的計劃可以顧及到這部分,潛在的問題是可能又要去更動 Cost Function 的實作,不過考慮到 Partitioner 絕對不會用到這個 Cost Function 所以可能把它寫死成 Balancer Only 的 Cost Function 會比較好。 (這個還沒弄進去,我剛剛想到我沒弄這個)
  2. RebalancePlanExecutor 可能會新增失敗的 API Design,如果叢集真的如同上面提到沒什麼能流量能用來搬移時,比起卡在那邊他可能要直接 abort。
chia7712 commented 2 years ago

我忘記在上面寫到一件事,搬移成本有點複雜:我想到兩個可能的 approach

第一個方法看起來比較有機會,可否也筆記一下

其實 Balancer 的實作還有討論空間,要現在弄上去還是等 Merge 後再開?

先記錄下來(另外開議題或是寫在描述中),如你提到的可以先跑實驗看結果有沒有更緊急的要處理(例如噴出一堆error)。只是過程中很多想法不記下來容易忘記,所以可以先寫一下

qoo332001 commented 2 years ago

7/13 討論紀錄

CostFunction輸入參數:

我們經過討論之後,認為ClusterInfo應該比較適合當作參數傳入,原因如下:

  1. ClusterLogAllocation只能表示叢集的分布,沒辦法像拿到 ClusterInfo#replicas, ClusterInfo#nodes這些資訊,而未來若是有CostFunction會用到這裡面才有的資訊,這邊舉個例子像是未來可能會有CostFunction會根據叢集的broker是否在線上來給出一個分數,而這些資訊是ClusterLogAllocation裡面無法取得的
  2. 如果使用 ClusterInfo來表示一個假的分布,可能會有一些假分布沒有的東西必須手動填入,像是ReplicaInfo中的isOfflineReplica接須設定為true,但這其實不會影響到CostFunction的計算

ClusterCost

我們這邊會想要寫一個新的介面來讓我們的CostFunction可以針對整個Cluster吐出一個分數,而不是每一個Broker或replcia都吐一個分數,因為如果在計畫產生端處理這些分數感覺不太合理

處理多個CostFunction吐出的分數

承上,每一個CostFunction會吐出一個分數,而這些分數應該要經過一個aggregate function,裡面會做一些數學的計算,過程可能有加priority,取平均,加總等操作,最後合成出一個分數,之後再讓計畫產生端依照這個分數來比較多個plan

成本估計

現階段的成本估計如果要套用在計畫產生比較像是在判斷以下幾點

  1. 搬移過程中的流量是否會超出broker頻寬可用上限
  2. 搬移過程中是否會有broker的任一folder使用超過自訂的可用空間百分比
  3. 搬移的replica的資料流入量是否有比搬移的速度還要快

只要有達成上述任何一點,則不採用這個plan

chia7712 commented 2 years ago

我們經過討論之後,認為ClusterInfo應該比較適合當作參數傳入

可否給一段範例程式碼?

ClusterLogAllocation只能表示叢集的分布,沒辦法像拿到 ClusterInfo#replicas, ClusterInfo#nodes這些資訊,而未來若是有CostFunction會用到這裡面才有的資訊,這邊舉個例子像是未來可能會有CostFunction會根據叢集的broker是否在線上來給出一個分數,而這些資訊是ClusterLogAllocation裡面無法取得的

所以你未來要弄的cost function還需要ClusterLogAllocation作為評估的參數嗎?也就是說cost function的介面是否需要調整?

如果使用 ClusterInfo來表示一個假的分布,可能會有一些假分布沒有的東西必須手動填入,像是ReplicaInfo中的isOfflineReplica接須設定為true,但這其實不會影響到CostFunction的計算

同上,需要一個例子

我們這邊會想要寫一個新的介面來讓我們的CostFunction可以針對整個Cluster吐出一個分數,而不是每一個Broker或replcia都吐一個分數,因為如果在計畫產生端處理這些分數感覺不太合理

我記得當時說要生成partition cost是你們的主意,所以是現在又不需要評估partition-level的分數了嗎?

搬移過程中的流量是否會超出broker頻寬可用上限 搬移過程中是否會有broker的任一folder使用超過自訂的可用空間百分比 搬移的replica的資料流入量是否有比搬移的速度還要快

這些有對應的metrics可以查詢嗎?

只要有達成上述任何一點,則不採用這個plan

這個7/13會議提到的第三點,如何使用plan的搬移成本回頭去優化plan的選擇,如果要提早執行,這邊也需要把介面先設計出來

qoo332001 commented 2 years ago

可否給一段範例程式碼?

我們balancer會用到的CostFunction可能會長這樣

public interface HasClusterCost extends CostFunction {
  ClusterCost clusterCost(ClusterInfo clusterInfo ,ClusterBean clusterBean);
}

成本估計用ClusterInfo來表示搬移前的partition分佈和mertics等資訊,以及使用ClusterLogAllocation表示搬移後的分佈:

public interface HasMoveCost extends CostFunction {
    ClusterCost clusterCost(ClusterInfo originClusterInfo, ClusterInfo newClusterInfo , ClusterBean clusterBean);
}

ClusterCost:

public interface ClusterCost {
  /** @return cost of cluster */
  Double value();
}

所以你未來要弄的cost function還需要ClusterLogAllocation作為評估的參數嗎?也就是說cost function的介面是否需要調整?

如上面,只有成本的估計會用到ClusterLogAllocation,因為假分佈使用ClusterLogAllocation來表示會比較簡潔,如果使用兩個ClusterInfo來表示搬移前和搬移後的分佈會有太多重複的資訊,像是ClusterInfo#topics, ClusterInfo#nodes, 裡面的資訊會是重複的

同上,需要一個例子

假設未來有一個CostFunction是用來依照現在在線的broker術來打分,若是使用ClusterLogAllocation當作參數傳入CostFunctiion的話,沒辦法從ClusterLogAllocation中取得哪些broker在線的資訊,而若是用ClusterInfo當參數的話 可以輕鬆的從ClusterInfo#nodes中取得

我記得當時說要生成partition cost是你們的主意,所以是現在又不需要評估partition-level的分數了嗎?

當初需設計出PartitionCost是想要依照PartitionCost給出的分數來去產生搬移計畫,但現在搬移計畫的產生是隨機生成,所以目前沒有用到PartitionCost,現在新增的ClusterCost是為了要針對隨機產生的這個計畫去給出一個分數來決定一個計畫的好壞,而未來可能會依照PartitionCost給的分數去生成一些計畫,最後在使用ClusterCost來選出一個較佳的進化去執行,所以ClusterCost與PartitionCost都有存在的意義

這些有對應的metrics可以查詢嗎?

搬移過程中的流量是否會超出broker頻寬可用上限:

目前是使用BytesInPerSec(client端輸入流量)+ReplicationBytesInPerSec(broker端Leader,Follower備份) + 要搬移進broker的replicaIn流量(透過replica log size成長速度估計)來判斷一個broker的流量會不會overflow

搬移過程中是否會有broker的任一folder使用超過自訂的可用空間百分比

加總要搬移到某個broker的replica log size加上該broker原有的replica log size(透過KafkaMetrics.TopicPartition.Size取得)是否overflow(總剩餘可用空間超過一個百分比)

搬移的replica的資料流入量是否有比搬移的速度還要快

這個目前只有估算出replica料流入速度,還沒想到要如何估計一個replica的搬移速度,會有這一項是因為想要避免replica會有搬移不完的情況發生

chia7712 commented 2 years ago
public interface HasMoveCost extends CostFunction {
    ClusterCost clusterCost(ClusterInfo originClusterInfo, ClusterInfo newClusterInfo , ClusterBean clusterBean);
}

請問一下,這個介面的設計是期待實作評價originClusterInfonewClusterInfo兩者的差異嗎?如果是的話,是否要有一個“統一”的方式來產生“差異”?又或者預期實作只是將originClusterInfonewClusterInfo兩者的分數相減?如果是的話,那需要一次傳入originClusterInfonewClusterInfo嗎?

public interface ClusterCost {
  /** @return cost of cluster */
  Double value();
}

同上,如果是算叢集之間的差異的分數,那叫做ClusterCost是不是怪怪的

現在新增的ClusterCost是為了要針對隨機產生的這個計畫去給出一個分數來決定一個計畫的好壞,而未來可能會依照PartitionCost給的分數去生成一些計畫,最後在使用ClusterCost來選出一個較佳的進化去執行,所以ClusterCost與PartitionCost都有存在的意義

OK

目前是使用BytesInPerSec(client端輸入流量)+ReplicationBytesInPerSec(broker端Leader,Follower備份) + 要搬移進broker的replicaIn流量(透過replica log size成長速度估計)來判斷一個broker的流量會不會overflow

這個數字好像有點危險,如果這個broker是閒置的狀態...

加總要搬移到某個broker的replica log size加上該broker原有的replica log size(透過KafkaMetrics.TopicPartition.Size取得)是否overflow(總剩餘可用空間超過一個百分比)

所以這有辦法看出該節點真實有的“空間大小"?

這個目前只有估算出replica料流入速度,還沒想到要如何估計一個replica的搬移速度,會有這一項是因為想要避免replica會有搬移不完的情況發生

這個出發點不錯

qoo332001 commented 2 years ago

請問一下,這個介面的設計是期待實作評價originClusterInfo和newClusterInfo兩者的差異嗎?如果是的話,是否要有一個“統一”的方式來產生“差異”?又或者預期實作只是將originClusterInfo和newClusterInfo兩者的分數相減?如果是的話,那需要一次傳入originClusterInfo和newClusterInfo嗎?

前者是對的,是要評價originClusterInfo和newClusterInfo兩者的差異,我後來想了一下,未來也只會有一個MoveCost不會有其他個,所以不需要有這個interface,MoveCost會長這樣:

class MoveCost implements CostFunction {
    ClusterCost clusterCost(ClusterInfo originClusterInfo, ClusterInfo newClusterInfo , ClusterBean clusterBean) {
    ...
    }
    ...
}

這個數字好像有點危險,如果這個broker是閒置的狀態...

如果broker是閒置的話流量可能會接近0,這時候如果有replica要搬移進這個broker那應該是不會造成overflow,請問會有什麼危險呢?

所以這有辦法看出該節點真實有的“空間大小"?

沒有辦法看出真實有的空間大小,所以是用Properties讓user自行輸入,格式會像下面這樣

broker.0./tmp/log-folder-0=800
broker.0./tmp/log-folder-1=800
broker.1./tmp/log-folder-0=800
broker.1./tmp/log-folder-1=800
broker.2./tmp/log-folder-0=800
broker.2./tmp/log-folder-1=800
broker.3./tmp/log-folder-0=800
broker.3./tmp/log-folder-1=800
chia7712 commented 2 years ago

未來也只會有一個MoveCost不會有其他個,所以不需要有這個interface,MoveCost會長這樣:

我會建議還是要把介面訂好當作plug-in來處理,這樣後面要更換或組合比較容易。

前者是對的,是要評價originClusterInfo和newClusterInfo兩者的差異

可否描述一下可能會如何評價兩者的差異?例如我前面猜是個別算出評分後想減嗎?還是會先算出結構上的差異再從該差異算分?或是其他方式?

如果broker是閒置的話流量可能會接近0,這時候如果有replica要搬移進這個broker那應該是不會造成overflow,請問會有什麼危險呢?

0可能代表很閒也可能代表死了,可能需要搭配ClusterInfo來協助排除

沒有辦法看出真實有的空間大小,所以是用Properties讓user自行輸入,格式會像下面這樣

先避免要求使用者輸入資訊,想想看有沒有值得參考的項目?如果沒有,或許能先放在未來展望

qoo332001 commented 2 years ago

我會建議還是要把介面訂好當作plug-in來處理,這樣後面要更換或組合比較容易。

ok

可否描述一下可能會如何評價兩者的差異?例如我前面猜是個別算出評分後想減嗎?還是會先算出結構上的差異再從該差異算分?或是其他方式?

先算出兩個分佈的差異,計算出要移動replcia的流量, log size等,在這之後判斷這樣的搬移會不會超過資源可用上限,若會就給1分,目的是要過濾成本太高的班計畫

0可能代表很閒也可能代表死了,可能需要搭配ClusterInfo來協助排除

沒錯,這也是我們最後決定使用ClusterInfo而不用ClusterLogAlloction的原因

先避免要求使用者輸入資訊,想想看有沒有值得參考的項目?如果沒有,或許能先放在未來展望

目前還沒有想到有值得參考的指標,所以先暫時這樣做

chia7712 commented 2 years ago

沒錯,這也是我們最後決定使用ClusterInfo而不用ClusterLogAlloction的原因

那就麻煩開一隻PR來展示一介面,以及範例如何使用

garyparrot commented 2 years ago

COSCUP 報後疑問整理

  1. 當有出現 retention 的情況時,搬移的情況會如何進行
  2. 如果叢集狀態不太好,是不是應該暫停或是終止搬移
  3. 如何衡量當前網路狀況不適合進行負載平衡
  4. 如果 Balancer 負載平衡到一半有大量流量湧進來(網路情況改變),該如何處理
chia7712 commented 2 years ago

@garyparrot 可否試著回答看看這幾個問題?

garyparrot commented 2 years ago

當有出現 retention 的情況時,搬移的情況會如何進行

我們先前的測試結果,負載平衡的過程中,資料的複製都是從 leader 那邊最後一個 offset 撈到最新的 offset。如果過程中有觸發 retention,那麼過去已經複製的資料會被丟棄

如果叢集狀態不太好,是不是應該暫停或是終止搬移

executor 需要做到這件事情,能否暫停搬移我還沒試過,或許 replication throttle 設為 0 KB 能夠做到,但我還沒試過,先前使用 throttle 給我很不好的印象,我覺得這其中可能會有很多地雷。

如何衡量當前網路狀況不適合進行負載平衡

這個問題很困難,因為外部網路的狀況可能沒那麼好監控,如果 switch 有支援那樣的監控功能,或許這部分能做一些文章,不過那種設備應該價格很高,不太可能每個環境都買這種網路設備。這部分如何處理我還在想有沒有可靠的辦法做到,但目前沒有頭緒。現在我們是在一個相對簡單的網路環境假設下實作,所以這部分在我們的實驗中還不是個問題。未來如果有機會會推到比較複雜的網路模型。

如果 Balancer 負載平衡到一半有大量流量湧進來(網路情況改變),該如何處理

executor 要監控系統狀態,如果不適合負載平衡要應對處理

chia7712 commented 2 years ago

我們先前的測試結果,負載平衡的過程中,資料的複製都是從 leader 那邊最後一個 offset 撈到最新的 offset。如果過程中有觸發 retention,那麼過去已經複製的資料會被丟棄

這個回答有點粗糙,可以思考一下細節,例如retention要丟掉offset=100以下的值,但replica 正複製到 offset=50而已,那此時會發生什麼事情?

executor 需要做到這件事情,能否暫停搬移我還沒試過,或許 replication throttle 設為 0 KB 能夠做到,但我還沒試過,先前使用 throttle 給我很不好的印象,我覺得這其中可能會有很多地雷。

這應該可以延伸出一個議題:我們是否要能"監控"搬移狀態,並且“動態“的去調整搬移時付出的成本(也就是你說的throttle),但這個算是未來展望,但也是很有趣的題目,可否獨立開個議題記錄一下

這個問題很困難,因為外部網路的狀況可能沒那麼好監控,如果 switch 有支援那樣的監控功能,或許這部分能做一些文章,不過那種設備應該價格很高,不太可能每個環境都買這種網路設備。這部分如何處理我還在想有沒有可靠的辦法做到,但目前沒有頭緒。現在我們是在一個相對簡單的網路環境假設下實作,所以這部分在我們的實驗中還不是個問題。未來如果有機會會推到比較複雜的網路模型。

同意,或許可以透過調閱JMX來觀察吞吐量變化,跟上面的題目蠻類似的,可以放在同一個議題裡面

executor 要監控系統狀態,如果不適合負載平衡要應對處理

同上,這三個問題都是在問“搬移過程時遇到意外“該怎麼辦,而這邊討論的意外指得都是效能相關的議題

garyparrot commented 2 years ago

這個回答有點粗糙,可以思考一下細節,例如retention要丟掉offset=100以下的值,但replica 正複製到 offset=50而已,那此時會發生什麼事情?

已經複製的資料會被全部丟棄?

這應該可以延伸出一個議題:我們是否要能"監控"搬移狀態,並且“動態“的去調整搬移時付出的成本(也就是你說的throttle),但這個算是未來展望,但也是很有趣的題目,可否獨立開個議題記錄一下

已開 #551

chia7712 commented 2 years ago

已經複製的資料會被全部丟棄?

可否再更細緻一點,例如是「馬上」會被丟棄嗎?還是「過一會」才會被丟棄?如果是前者,那是目前正在寫資料的segment馬上刪除嗎?如果是後者,那個「過一會」是怎麼運行?

chia7712 commented 2 years ago

這個議題已經有點年代,因此關閉