MrPowers / quinn

pyspark methods to enhance developer productivity 📣 👯 🎉
https://mrpowers.github.io/quinn/
Apache License 2.0
602 stars 96 forks source link

Better SparkSession settings for localhost #143

Open MrPowers opened 9 months ago

MrPowers commented 9 months ago

Users need to configure their SparkSession for localhost development so computations run fast and so that they don't run out of memory.

Here are some examples I ran on my local machine that has 64GB of RAM on the 1e9 h2o groupby dataset (has 1 billion rows of data).

Here's the "better config":

builder = (
    pyspark.sql.SparkSession.builder.appName("MyApp")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.executor.memory", "10G")
    .config("spark.driver.memory", "25G")
    .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    )
    .config("spark.sql.shuffle.partitions", "2")
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()

Here's the default config:


builder = (
    pyspark.sql.SparkSession.builder.appName("MyApp")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    )
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()

groupby query

This query takes 104 seconds with the "better config":

delta_table = delta.DeltaTable.forPath(
    spark, f"{Path.home()}/data/deltalake/G1_1e9_1e2_0_0"
)

delta_table.toDF().groupby("id3").agg(F.sum("v1"), F.mean("v3")).limit(10).collect()

This same query errors out with the default config.

join query

This query takes 69 seconds with the "better config", but 111 seconds with the default config:

x = spark.read.format("delta").load(f"{Path.home()}/data/deltalake/J1_1e9_1e9_0_0")
small = spark.read.format("parquet").load(f"{Path.home()}/data/J1_1e9_1e3_0_0.parquet")

spark.sql('select x.id2, sum(small.v2) from x join small using (id1) group by x.id2').show()

Conclusion

SparkSession configurations significantly impact the localhost Spark runtime experience.

How can we make it easy for Spark users to get optimal configurations for localhost development?

lucazanna commented 9 months ago

Good job @MrPowers to get this conversation started

I think we have 3 ways to do this (going from less likely to have an impact to most likely to have an impact)

1- recommend the different configuration on the Spark docs, get started guide, etc This will help some users, but most users just use the standard config so this will likely have a small impact

2- change Spark so it automatically uses the ‘better’ configuration above However the ‘better’ configuration will depend on the machine and the dataset. a better configuration in one case does not mean it’s the best configuration always

3- change Spark so that it ‘guesses’ and uses a configuration based on the ram available, the number of CPU cores, etc I believe this is what Polars does. This config should changeable by the user

On top of improving Spark performance (which is important), I wonder what should be the Spark positioning : should Spark be the engine for big data or the engine for data of any size? if the positioning for Spark is the engine for big data, then I would not mind if it’s slower than duckdb on small data because that’s not what it’s built for.

Your thoughts?

MrPowers commented 9 months ago

@lucazanna - I think we should first figure out the true capabilities of Spark locally and then figure out the best messaging. Here are the results for one of the h2o queries:

Screenshot 2023-10-20 at 9 50 50 AM

I think the current benchmarks are really misleading...

jeffbrennan commented 9 months ago

I'm in favor of the automatic configuration (leaning towards higher memory consumption) with configurable parameters that the user can change if needed.

I think these are good configurable parameters:

For executor and driver memory we could do a percentage of available system memory. It doesn't look like there's a good way to do this with Python's standard library but psutil https://pypi.org/project/psutil/ looks like a popular way to get this info.

MrPowers commented 9 months ago

@jeffbrennan - figuring out how to programatically set the best settings is a great goal.

The first step is to get everyone with the same datasets on their local machines so we can tinker and find what settings work best. There are so many Spark configuration options and I'm not even sure which knobs need to be turned (let alone how to optimally turn them)!

MrPowers commented 9 months ago

Here are some other suggestions that might be useful: https://luminousmen.com/post/how-to-speed-up-spark-jobs-on-small-test-datasets

bjornjorgensen commented 9 months ago

I do use this one

# Standard library imports
import json
import multiprocessing
import os
import re
import sys
import random
import time

# Third-party imports
import numpy as np
import pandas as pd
import pyarrow

# Pyspark imports
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, functions as F, Window
from pyspark.sql.functions import (
    col, concat, concat_ws, expr, lit, trim, udf
)
from pyspark.sql.types import (
    IntegerType, StringType, StructField, StructType,
    DoubleType, TimestampType
)
from pyspark import pandas as ps

os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"

number_cores = int(multiprocessing.cpu_count())

mem_bytes = os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES")  # e.g. 4015976448
memory_gb = int(mem_bytes / (1024.0**3))  # e.g. 3.74

def get_spark_session(app_name: str, conf: SparkConf):
    conf.setMaster("local[{}]".format(number_cores))
    conf.set("spark.driver.memory", "{}g".format(memory_gb)).set(
        "spark.sql.repl.eagerEval.enabled", "True"
    ).set("spark.sql.adaptive.enabled", "True").set(
        "spark.serializer", "org.apache.spark.serializer.KryoSerializer"
    ).set(
        "spark.sql.repl.eagerEval.maxNumRows", "10000"
    ).set(
        "sc.setLogLevel", "ERROR"
    )

    return SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()

spark = get_spark_session("My app", SparkConf())
spark.sparkContext.setLogLevel("ERROR")

With this one users get max mem and cpu in local mode.

MrPowers commented 9 months ago

Here are some other settings that might be useful: https://www.linkedin.com/posts/dipanjan-s-5874b1a0_pyspark-unittesting-optimization-activity-7122896865762701312-oF-j?utm_source=share&utm_medium=member_desktop

geoHeil commented 8 months ago

Do you absolutely need spark? What about polars or duckdb in case you only target single node deployments?

SemyonSinchenko commented 8 months ago

Do you absolutely need spark? What about polars or duckdb in case you only target single node deployments?

The topic is about running unit tests of spark routines. These tests are running on a single node (locally). Maybe I'm missing something, but how polars/duckdb may help here?

geoHeil commented 8 months ago

No- for these purposes it will not help