GENZITSU / UsefulMaterials

34 stars 0 forks source link

almost weekly useful materials - 03/27 - #149

Open GENZITSU opened 1 month ago

GENZITSU commented 1 month ago

Spark未経験のチームが2年間模索して実感した、効果的なパフォーマンス改善6選

表題の通り、以下勉強になったことを抜粋

必要なデータのみを抽出する

- items: array
    - elements: struct
        - id: string
        - name: string
        - values: array<string>
        - text: string

上記のようなarray型のデータ形式の中から、特定の要素をとってきたい場合は以下のようドットアクセスとarray_zipsを用いて、必要な要素のみの抽出が可能

from pyspark.sql import functions as F

df = (
    create_spark_dataframe()
    .select(
        F.col("items.id").alias("item_ids")
    )
)
from pyspark.sql import functions as F

df = (
    create_spark_dataframe()
    .select(
        F.arrays_zip(
            "items.id",
            F.transform("items.text", F.length)
        ).alias("items")
    )
    # arrays_zipではカラム名が0, 1になってしまうため、castしつつ付与する
    .cast("array<struct<id: string, text_length: int>>"),
)

AWS Glueの場合

Glue自体のバージョンを上げることでパフォーマンスが向上する場合がある

ワーカー数の調整

チューニングの目安

メモリ使用率に余裕がある => ワーカー数を減らせる可能性がある メモリ使用率に余裕がない => ワーカー数を増やした方がいいかもしれない

また、ワーカー数を増やすことによって I/O の効率が良くなり、データの入出力の速度が上がる傾向があります。 肌感ですが、コスパが良いのは最大でも40~50台くらいまでで、それ以上になるとそれほどパフォーマンス向上は見られない印象があります。

ワーカータイプについての話題

IOがボトルネックであれば、G.1X メモリがボトルネックであれば、G.2X

特定のデータを取得、変換、出力するような処理であれば、G.1X 複数のデータを取得、joinして複雑な集計をし、出力するような処理であれば、G.2X

broadcast joinの閾値

Spark の設定には autoBroadcastJoinThreshold というものがあり、デフォルトが 10MB になっています。 10MB以下のデータであれば Spark 側で自動的に broadcast join をしてくれます。

処理順序の効率化

  • filter や join などで先にデータを小さくしてから集計できないだろうか
  • 大きなデータとの join はもっと後ろ倒しにできないだろうか
  • 無駄に join をしてないだろうか
  • もっとシンプルに集計できないだろうか
  • 使いまわせるデータはないだろうか、共通化できないだろうか

コメント

特にドットアクセス/array_zipsのところが勉強になった。 あまり複雑なデータ型に相対したことはないが、参考にしたい

出典