opensource4you / astraea

釋放kafka的無限潛能
Apache License 2.0
125 stars 45 forks source link

[METRICS] New thread for waiting list checking #1836

Closed chinghongfang closed 10 months ago

chinghongfang commented 11 months ago

實驗時發現,MetricStore 檢查 waiting list 會影響指標拉取速率,這隻 PR 改成獨立使用一條執行緒來檢查。

在使用 Cost Aware Assignor 時,該程式會等待 MetricStore 有"完整"效能指標時才會做計算,其判斷"完整"的方式是實際代入 cost-function 計算,會在 MetricStore 有更新指標的時候檢查。但這些計算不應該由佔用 "MetricStore 的拉取執行緒" ,MetricStore 應該繼續拉取最新指標。

所以這裡的作法是建立一個新的執行緒檢查,且同時只會有一個執行緒在執行檢查(透過 isChecking 確保一次只會有一個執行緒)。 另外,先前的作法是看本次迴圈指標有無更新,再來執行檢查,但使用多執行緒執行的話,有可能會在前一個迴圈(執行檢查後)更新,為避免漏檢查,所以使用另一個參數 needChecking 確保每次更新都有檢查到。

chia7712 commented 11 months ago

在使用 Cost Aware Assignor 時,該程式會等待 MetricStore 有"完整"效能指標時才會做計算,其判斷"完整"的方式是實際代入 cost-function 計算,會在 MetricStore 有更新指標的時候檢查。但這些計算不應該由佔用 "MetricStore 的拉取執行緒" ,MetricStore 應該繼續拉取最新指標。

不好意思,可否分享這段程式碼的連結給我?

另外我試著釐清是說, MetricStore 的 receiveJob (head) 會呼叫 receive 拿到 metrics 後會餵食給 sensor,由於 receive and sensor 是允許外部自己定義,所以外部使用者可能在過程中埋入了 wait 的邏輯,然後就導致 receiveJob (thread) 卡住?這樣正確嗎

chinghongfang commented 11 months ago

不好意思,可否分享這段程式碼的連結給我?

執行的外部定義在這裡: https://github.com/skiptests/astraea/blob/4140b68ca6950206d65c6cc71be0203f1fa1ce74/common/src/main/java/org/astraea/common/metrics/collector/MetricStore.java#L389


receive and sensor 是允許外部自己定義,所以外部使用者可能在過程中埋入了 wait 的邏輯,然後就導致 receiveJob (thread) 卡住?

不大一樣,主要是 checker.test() 是外部自行定義的,而這個可能會牽扯到 partitionCost 計算。 這個 checker.test() 裡面也沒有 wait 的邏輯,看來是 partitionCost 計算很花時間,每次更新指標都計算一次的話會 receiveJob 效率會很低 (每次 receive 搭配一個 cost 計算)。

chia7712 commented 11 months ago

不好意思,我想回頭討論一下 waiting 機制,目前看起來這個機制是讓外部可以傳入ㄧ個 predicator 來針對目前已經收集的 ClusterBean 做判斷 (timeout or pass),所以我不太確定為何需要在receiverJob裡面也要做判斷?因為在 wait這個方法中預期的結果有兩種,一種是 timeout (IllegalStateException)、一種是通過,所以應該不會有需要維護一個waitingList?

chinghongfang commented 11 months ago

這個機制是讓外部可以傳入ㄧ個 predicator 來針對目前已經收集的 ClusterBean 做判斷 (timeout or pass),所以我不太確定為何需要在receiverJob裡面也要做判斷?

是的,外部可以傳入一個 predicator 然後等待 latch 歸零,或是 timeout。 然後 MetricStore 會在 clusterBean 有更新的時候檢查每個 latch。

所以這邊外部並沒有在檢查(除了剛進來會做唯一一次檢查),而是掛著等待 "別人檢查"。而這個"別人"就是 MetricStore 的 receiverJob。

當初會讓 MetricStore 來檢查,而不是外部自己檢查,其實是為了不讓檢查不斷空轉,而是有 clusterBean 更新的時候才做檢查。當初的討論可參考 #1660 。

chia7712 commented 11 months ago

當初會讓 MetricStore 來檢查,而不是外部自己檢查,其實是為了不讓檢查不斷空轉,而是有 clusterBean 更新的時候才做檢查。當初的討論可參考 https://github.com/skiptests/astraea/issues/1660

我在想是否就讓外部 user thread 去檢查就好?檢查如果沒有的話,就等一個內部的 lock,至於 receiveJob 就是在更新 bean 後去喚醒該 lock 就好,這樣實作應該不難,同時也避免讓 receiveJob (thread) 跟 user thread 綁再一起

chinghongfang commented 11 months ago

我在想是否就讓外部 user thread 去檢查就好?

贊同,這樣也就不用再建立新的 thread 了。

chia7712 commented 10 months ago

@chinghongfang 請問這個還有更新嗎?把等待的邏輯再簡化一點

chinghongfang commented 10 months ago

那可否用簡單用wait and notifyall就好?

  1. 進來的時候立刻檢查一次
  2. 檢查失敗的話,sleep for 指定的時間
  3. 醒來後 (不管是被中斷 or 單純醒來) 再檢查一次
  4. 成功的話返回
  5. 反之則拋出錯誤

我已經照上面的建議更新了,還是請問還有什麼不夠精簡的嗎?再麻煩指教了。