opensource4you / astraea

釋放kafka的無限潛能
Apache License 2.0
130 stars 46 forks source link

[Metric] 討論 MetricStore 支援 retry 機制 #1660

Open harryteng9527 opened 1 year ago

harryteng9527 commented 1 year ago

related https://github.com/skiptests/astraea/pull/1524#discussion_r1167557440 此 issue 討論要如何新增retry機制在 MetricStore 中

Consumer 剛開始啟動並使用 CostAwareAssignor 分配 partitions 時,會因為 metrics 還沒完整拉取完畢(導致有些 partitions 會被當作沒有 cost),進而產生不平衡的分配

目前解決方案

在 assignor 中實作 retry 機制,預防沒有拉取到完整的 metric,避免影響到後續的分配以及效能

chinghongfang commented 1 year ago

我們想要等到有 "足夠" metric ,但其實 MetricStore 不知道 "應該要有哪些 metric" 這件事, MetricStore 要做的事只有不斷的呼叫 Reciever 然後把 metric 存起來。

如果要在 MetricStore 知道 "拉取完畢",blocking call 可能會長這樣:

/**
* Block until all given broker, topic-partition, type of metrics has value.
* @param idAndProperties the broker id and corresponding partition, metric type that we want
*/
ClusterBean blockUntilExist(Map<Integer, Collection<Map<String, String>>> idAndProperties);

這個方法會不會有點偏向特定用途?

或者,不斷重試會比較通用,但是檢查的程式碼仍然會放在 caller 。

ClusterBean blockUntilExist(Function<ClusterBean, Boolean> allValueExist);
chia7712 commented 1 year ago

加入新的方法會複雜化這個物件,我們是否有機會沿用 metrics sensor 這個方法來達到 「監聽」的用途?

chinghongfang commented 1 year ago

我們是否有機會沿用 metrics sensor 這個方法來達到 「監聽」的用途?

原來如此,請問是指可以在 MetricStore 內檢查 "是否每一個 MertricSensor 都有拉到資料" 嘛? 這樣就可以知道什麼 metric 還沒有拉到資料。

目前想到需要處理以下問題:

  1. MetricSensor 變動時的問題
  2. broker 數量
  3. broker 與 metric 的對應

MetricStore 內的 MetricSensor 是可以變動的,也許我們要以最新一次的 recieve() 來決定要檢查哪些 MetricSensor

每個 broker 都有共通 or 各自要拉的 metric,所以在判斷的時候也要知道哪個 broker 沒有拉到 metric,而MetricStore 內並不會知道我們有多少 broker 。

並不是每個 "broker" 都會有 MetricSensor 要的資料,如 broker 端就不會有 producer metric。原本這件事情是在 cost function 判斷。

不知道我理解有沒有錯?

chia7712 commented 1 year ago

原來如此,請問是指可以在 MetricStore 內檢查 "是否每一個 MertricSensor 都有拉到資料" 嘛? 這樣就可以知道什麼 metric 還沒有拉到資料。

我的想法是要做一個介面讓使用者可以輸入特定的量,然後實作其實是用 sensor 來做,例如 sensor 可以檢查現在已經存在 store 的 metrics 有沒有滿足

每個 broker 都有共通 or 各自要拉的 metric,所以在判斷的時候也要知道哪個 broker 沒有拉到 metric,而MetricStore 內並不會知道我們有多少 broker 。

「有多少broker」是條件之一嗎?

chinghongfang commented 1 year ago

我的想法是要做一個介面讓使用者可以輸入特定的量,然後實作其實是用 sensor 來做,例如 sensor 可以檢查現在已經存在 store 的 metrics 有沒有滿足

請問是讓 MetricSensor 除了做過濾,也讓它可以檢查 metrics 有沒有滿足嘛?

「有多少broker」是條件之一嗎?

我認為是需要的,原先也是因為某些 broker 還沒拉回來 metric 所以資料不足,無法判斷。這裡 broker 還沒拉回來不一定是 broker 有問題,可能只是拉取順序的問題。

chia7712 commented 1 year ago

@chinghongfang 既然規格和需求已經明確,可否麻煩你設計看看API?一種是全新的實作用來監控內部的狀態並回報,另一種則是使用/擴充 metrics sensor 來完成看看

chinghongfang commented 1 year ago

好的,我來試著設計看看。

chinghongfang commented 1 year ago

因為一直想不出來要如何設計,所以這邊先整理思緒。

目的

這次修改目的是:減少重複的程式碼 (Assignor, Balancer) Assignor, Balancer 都有等待 metric 蒐集完畢的需求,所以我們也許可以把 metric 蒐集完畢的邏輯放入 MetricStore 裡面。 Partitioner 需要的 metric 卻是即時的,有蒐集到多少 metric 就回傳多少。


限制

這邊不希望複雜化 MetricStore 的實做,所以希望擴展 MetricSensor 的功能,讓 MetricSensor 辨別什麼樣的 clusterBean 是"完整的"。

另外,是要擴展 MetricSensor 的功能不是重構,所以希望維持 interface 構造,不紀錄狀態

因為 "完整的" 定義會隨使用端有所不同 (Assignor, Balancer 要蒐集全部 partition,Partitioner 要盡快),所以 "完整的" 定義將要放在使用端。


推論

MetricSensor 是藉由 sensorSupplier 在使用端供給,也就是要在 sensorSupplier 內取得判斷所需的資訊 (什麼 metric、有多少 broker or partition ?)。

MetricSensor 介面可能會長這樣

  // MetricSensor.java
  default boolean complete(ClusterBean clusterBean){return true;}

Assignor 內判斷 clusterBean是否 "完整",需要判斷所需的資訊。

  // Assignor.java
  metricStore =
        MetricStore.builder()
            .localReceiver(clientSupplier)
            .sensorsSupplier(
                () ->
                    this.costFunction
                        .metricSensor()
                        .map(
                            s ->
                                Map.of(
                                    MetricSensor.of(
                                        List.of(s),
                                        clusterBean -> {
                                          return admin
                                              .brokers()
                                              .thenApply(
                                                  brokers ->
                                                      brokers.stream()
                                                          .map(NodeInfo::id)
                                                          .map(
                                                              id ->
                                                                  clusterBean
                                                                      .all()
                                                                      .getOrDefault(id, List.of()))
                                                          .map(
                                                              beans ->
                                                                  beans.stream()
                                                                      .map(
                                                                          HasBeanObject::beanObject)
                                                                      .collect(
                                                                          Collectors
                                                                              .toUnmodifiableList()))
                                                          .map(
                                                              beans ->
                                                                  s.fetch(
                                                                      MBeanClient.of(beans),
                                                                      clusterBean))
                                                          .noneMatch(Collection::isEmpty))
                                              .toCompletableFuture()
                                              .join();
                                        }),
                                    (BiConsumer<Integer, Exception>) (integer, e) -> {}))
                        .orElse(Map.of()))
            .build();

這樣的實做非常不容易使用,也許需要放掉某些限制,再來實做好用的介面,我會再試著找找看方法,若有其他想法再上來更新。

chia7712 commented 1 year ago

@chinghongfang 可能故事有點太複雜了,一個簡單的版本如下:

  default void wait(Predicate<ClusterBean> check, Duration timeout) {
    while (System.currentTimeMillis() < timeout.toMillis()) {
      try {
        if (check.test(clusterBean())) return;
      } catch (NoSufficientMetricsException e) {
        Utils.sleep(Duration.ofSeconds(1));
      }
    }
    throw new RuntimeException("Failed to fetch clusterBean due to timeout");
  }

這個版本有一個副作用,就是會過於頻繁測試,更好的方式是在 cluster bean 有更新的時候再測試,這也是為何我說可以考慮用 metric sensor 去做,因為 metric sensor 可以明確知道何時有新的資料(也就是新的 cluster bean)

chia7712 commented 1 year ago

1524 的例子藉由上述的範例就可以改寫成:

    wait(
        clusterBean ->
            costFunction.partitionCost(clusterInfo, clusterBean).value().values().stream()
                .noneMatch(v -> Double.isNaN(v)),
        Duration.ofSeconds(1));
chia7712 commented 1 year ago

@harryteng9527 麻煩你也看一下喔

chinghongfang commented 1 year ago
default void wait(Predicate<ClusterBean> check, Duration timeout) {

看起來這段是放在 MetricStore 放在 MetricStore 的話,也就是說取 ClusterBean 前可以先呼叫 wait 等待一段時間,再來抓 ClusterBean。將要新增 MetricStore 的介面方法。

這個版本有一個副作用,就是會過於頻繁測試

認同。

這也是為何我說可以考慮用 metric sensor 去做,因為 metric sensor 可以明確知道何時有新的資料(也就是新的 cluster bean)

"metric sensor 可以明確知道何時有新的資料",請問 @chia7712 這個意思是 MetricSensor#fetch(client, clusterBean) 這個方法嘛?也就是說我們可以在每一次的 fetch 中檢查好 "metric 夠不夠",檢查後把結果存下來 ,然後 metric store 就可以隨時詢問每個 metric sensor "metric 夠不夠",請問是這樣嘛? 可是這樣的話我們的介面就會有狀態要儲存,在實做介面時就會複雜許多。

chia7712 commented 1 year ago

"metric sensor 可以明確知道何時有新的資料",請問 @chia7712 這個意思是 MetricSensor#fetch(client, clusterBean) 這個方法嘛?也就是說我們可以在每一次的 fetch 中檢查好 "metric 夠不夠",檢查後把結果存下來 ,然後 metric store 就可以隨時詢問每個 metric sensor "metric 夠不夠",請問是這樣嘛? 可是這樣的話我們的介面就會有狀態要儲存,在實做介面時就會複雜許多。

MetricStore#wait 這個介面可以不用動,要改的實作的方式,也就是 MetricStore impl中會有一個特殊的sensor專門用來處理event,那個event就是每次呼叫wait的時候會建立一個event,等待 sensor 被呼叫時拿取並且觸發。所謂的被呼叫就是「真的有新metrics」時才會觸發,就可以避免過度觸發的問題

這個優化可以再討論,我建議你先把基本的wait完成

chinghongfang commented 1 year ago

我想確認一下想法,也就是說,MetricStore 會多一個方法,比如說 wait(Predicate, Duration)

那我再按照這個想法實做,感謝 @chia7712 學長幫忙!

chia7712 commented 1 year ago

@chinghongfang 沒錯喔,描述得很好。可以就實作難度評估一下,分成兩個階段來完成也可以