opensource4you / astraea

釋放kafka的無限潛能
Apache License 2.0
130 stars 46 forks source link

spark etl 驗收內容 #1296

Open wycccccc opened 1 year ago

wycccccc commented 1 year ago

在spark standalone mode 下對etl進行如下測試

  1. 測試10GB資料需要多少時間跑完
  2. 檢查input ouput資料的一致性,例如資料筆數,抽檢資料是否一致
  3. 替換spark中的kafka partitioner再次測試效能,看有沒有變好
  4. 講一下架構與做法

========================================================= @chia7712 有漏掉的部分我再補充

chia7712 commented 1 year ago

@wycccccc 感謝幫忙建立議題,那四項大致上沒錯,另外要請你針對那四項寫一個報告放到此專案,私下的報告格式則是要投影片。不過我們可以先完成要推到此PR的測試報告,然後在剪貼成投影片就好

wycccccc commented 1 year ago

有一個問題想要問一下學長,我嘗試將file path通過ftp的方式,類似於ftp://user:password@host:port/home/warren/ImportcsvTest/source來將地址餵給它, 但似乎是不太行。而且網路上好像根本找不到structure streaming與ftp配合使用的案例。 真的有這種用法嗎,還是我去學一下hdfs串spark。。。

chia7712 commented 1 year ago

我嘗試將file path通過ftp的方式,類似於ftp://user:password@host:port/home/warren/ImportcsvTest/source來將地址餵給它

測試報告的部分別用 ftp,那會成為效能瓶頸。將資料複製到所有節點,並且將資料掛載到容器內,從“本地”檔案來創建 spark streaming,這樣可以避免掉“來源的效率問題”,也就是讓我們把報告的重點放在“資料處理”和“資料傳輸到kafka”

wycccccc commented 1 year ago

我嘗試將資料文件夾按如下方式掛載到spark_worker中:

  docker run -d --init \
    -e SPARK_WORKER_WEBUI_PORT=$SPARK_UI_PORT \
    -e SPARK_WORKER_PORT=$SPARK_PORT \
    -e SPARK_NO_DAEMONIZE=true \
    -v /home/kafka/spark2kafkaTest/ImportcsvTest:/home/kafka/spark2kafkaTest/ImportcsvTest \
    --name "$WORKER_NAME" \
    --network host \
    "$IMAGE_NAME" ./sbin/start-worker.sh "$master_url"

然後sourcePath配置爲/home/kafka/spark2kafkaTest/ImportcsvTest/source

但它似乎還是沒找到sourcePath裏的資料,是我哪裡沒處理好嘛

chia7712 commented 1 year ago

-v /home/kafka/spark2kafkaTest/ImportcsvTest:/home/kafka/spark2kafkaTest/ImportcsvTest \

可否先把這個功能放到腳本上然後發一隻PR(要用通用一點的參數名稱),我們先確定各個 spark worker 有看到路徑

wycccccc commented 1 year ago

學長我算了一下,目前兩個worker,10GB資料會花費時間是2分55秒。replica 1 的情況下產生到叢集的資料爲36GB 平均每個worker處理57MB/s,實際csv大小資料。 每秒會向Broker發送205MB/s的資料量。平均每個Broker只能收到68MB/s。

這就有了一個問題,在做實驗的過程中,我製造一個負載全力往一個Broker打資料,對結果根本沒有影響。。。哪怕我再加一個worker也只能對一個Broker的負載增加到102MB/s。我擔心對於1GB的頻寬基本也沒影響。

或許我可以把網路空改到2.5G再做實驗可能會有效果。這樣可以嗎。

wycccccc commented 1 year ago

學長沒救了,發現一個叢集的新坑,以前可以通過換網路口將頻寬從10G降到2.5G,現在這招不行了,換到2.5G的口從外面都ping不到這臺機器。(可能因爲這幾天的搬遷動到了什麼東西)。1跟2的資料已經收集完成,我之後研究下能不能連回來做3的實驗吧。

chia7712 commented 1 year ago

現在這招不行了,換到2.5G的口從外面都ping不到這臺機器。(可能因爲這幾天的搬遷動到了什麼東西

麻煩多試著測試一些狀況,例如反過來ping能不能通

chia7712 commented 1 year ago

@Haser0305 我把這個議題指定給你,請盡力在月底前完成~

wycccccc commented 1 year ago

我成功連回來了,學弟應該只需要幫忙處理硬體上的問題就好了。

發生了一些問題,strict partitioner應該沒在正常工作。 實驗環境為我向broker1001號機器發送額外的資料,影響其效能。 以下是不替換時候的數據 4min跑完 (testTopic) image

以下為替換後的數據5min跑完(testTopic) image

所以替換partitioner後反而變成了負優化,然後觀察了各Broker partition的offset。負載最重的1001Broker反而吃到了最多的資料。。。 image

我猜應該是邏輯反了或者其他問題,我去翻翻code看看是哪裡出錯了。

chia7712 commented 1 year ago

發生了一些問題,strict partitioner應該沒在正常工作。 我猜應該是邏輯反了或者其他問題,我去翻翻code看看是哪裡出錯了。

可否先試著不要用 spark,直接用 perf 打打看是不是也有一樣的狀況

wycccccc commented 1 year ago

可否先試著不要用 spark,直接用 perf 打打看是不是也有一樣的狀況

我又測試了幾次,並且換了不同的節點作為負載高的節點。 用performance就正常了,可是放在etl裡就不正常。不正常體現為,大多數情況下負載高的節點反而會吃到較多資料,小部分情況各節點的資料接近。 這會是spark 使用producer的方式導致的嘛,比如一個task創建一次producer,導致我們的partitioner沒法正常工作。如果可能是這方面導致的,我再去追一下那邊的程式。

chia7712 commented 1 year ago

我又測試了幾次,並且換了不同的節點作為負載高的節點。 用performance就正常了,可是放在etl裡就不正常。不正常體現為,大多數情況下負載高的節點反而會吃到較多資料,小部分情況各節點的資料接近。 這會是spark 使用producer的方式導致的嘛,比如一個task創建一次producer,導致我們的partitioner沒法正常工作。如果可能是這方面導致的,我再去追一下那邊的程式。

時間關係,這部分先緩著,我們先把報告做一個版本出來,依照描述中提到的部分,做一個頁面然後先發隻PR給我看一下

wycccccc commented 1 year ago

目前缺少抽檢的圖片在台式機中,我今日隔離結束後,回家再補上。有需要修改的地方我再儘快修正。 https://drive.google.com/file/d/1kIgwvdjb0HEIDWSpVSDD-e3YfoC492RZ/view?usp=sharing

chia7712 commented 1 year ago

Kafka 的叢集規模多大?另外可否試試看用 local lode 跑跑看 spark 看看效能如何

chia7712 commented 1 year ago

另外文件的格式要請參考 https://github.com/skiptests/astraea/blob/main/docs/dispatcher/experiments/StrictCostDispatcher_4.md

用發 PR 的方式

chia7712 commented 1 year ago

由於時辰的關係,這個功能會從 0.1.0 release 中移除

wycccccc commented 1 year ago

Kafka 的叢集規模多大?另外可否試試看用 local lode 跑跑看 spark 看看效能如何

三臺broker。local mode下花費5分15秒。 鑒於兩個worker花費2分45秒,local mode基本相當於開一個worker時的狀態

image

chia7712 commented 1 year ago

三臺broker。local mode下花費5分15秒。 鑒於兩個worker花費2分45秒,local mode基本相當於開一個worker時的狀態

感謝,接下來要去解析一下這段流程主要花費的時間在哪裡

wycccccc commented 1 year ago

感謝,接下來要去解析一下這段流程主要花費的時間在哪裡

好,我再進行測試看看是哪裡在吃時間

wycccccc commented 1 year ago

我用jmc進行觀測,跑了許多次實驗,它們的結果都是如下所示。 image 可以看到exception特別突出,其他沒有明顯問題。

查看是EOFException引起了該問題,推測EOFException應該是被spark用作判斷當前task處理的csv資料有沒有到末尾。 image

但這個EOF的方式應該就是效能的瓶頸處,可以看到EOFException在圖中有一個大平頂,是我們jsonconverter的幾倍之多。 螢幕快照 2023-01-03 04-11-21

推測的結論:瓶頸來自於spark讀取csv文件的效能。更換一種資料的讀取方式我想應該就可以解決該問題,不過目前暫時應該無法改善這一問題。

chia7712 commented 1 year ago

推測的結論:瓶頸來自於spark讀取csv文件的效能

所以應該是從硬碟撈資料這段太慢嗎?

wycccccc commented 1 year ago

所以應該是從硬碟撈資料這段太慢嗎?

結論上是的,從硬碟上撈csv會發生EOFException,spark處理該exception會用掉大量時間。講結論的話就是這段太慢。

chia7712 commented 1 year ago

將此議題涵蓋到 0.2.0,我們需要重新審視從“來源拉資料”這一段的效能,查看是硬碟提供的速度太慢還是我們做資料轉換 (binary to csv) 的部分有問題