Nike-Inc / koheesio

Python framework for building efficient data pipelines. It promotes modularity and collaboration, enabling the creation of complex pipelines from simple, reusable components.
https://engineering.nike.com/koheesio/
Apache License 2.0
599 stars 19 forks source link

[FEATURE] Spark - update SparkStep to be able to manage the SparkSession more effectively #32

Closed dannymeijer closed 3 weeks ago

dannymeijer commented 4 months ago

Is your feature request related to a problem? Please describe.

A feature that was previously requested (aligned to this) was:

Having the ability to clean checkpoints via spark.cleaner.referenceTracking.cleanCheckpoints=true to opt-in to spark internal checkpoint reference tracking and cleaning

Currently, Koheesio has no specific interaction with or management of the underlying SparkSession (it simply expects the session to just 'be there').

Describe the solution you'd like

I would like to see us extend the SparkStep to be able to manage the spark session, and also be able to pass the SparkSession to the Koheesio Step as a field. At the moment, spark is only set up as a property. I propose we keep the property, but add a Field with a default. The property would then either pass whatever value is assigned to self._spark or the getActiveSession (as it does currently).

By doing this we can introduce functionality like this (inspired by Delta library):

def clean_up_spark_streaming_checkpoint(spark: SparkSession) -> SparkSession:
    # add desired config to the given spark session and return it
    return spark

Suggestion on new SparkStep code:

class SparkStep(Step):
    ...
    _spark: Optional[SparkSession] = Field(default=SparkSession.getActiveSession(), alias="spark")

    # TODO: add a check that validates the sparksession, and sets it to pyspark.sql.getActiveSession if it is still none

    def spark_conf_clean_up_streaming_checkpoint(self):
        # as mentioned above

This opens the door to future optimizations also.

Describe alternatives you've considered

Open for suggestions :)

Additional context

For reference, a user would call the functionality like this:

# using an incomplete DeltaStreamReader, purely as an example
reader = DeltaStreamReader(
    spark=SparkSession.builder.getOrCreate(),   # optional !
    ...
)
reader.spark_conf_clean_up_streaming_checkpoint()
mikita-sakalouski commented 3 weeks ago

@dannymeijer Can you provide more context, why do we need to provide a session which is different from the active one ? Based on current implementation koheesio will take current Active Session. Building session is responsibility of user ( where they can create local/ remote session)

dannymeijer commented 3 weeks ago

I don't intend to change the behavior, just want users to be able to explicitly be able to pass the SparkSession if they have configured one. This is to have compatibility with delta API for example.

mikita-sakalouski commented 3 weeks ago

I don't intend to change the behavior, just want users to be able to explicitly be able to pass the SparkSession if they have configured one. This is to have compatibility with delta API for example.

As for me I prefer not to have it separately and only allow active spark session, without possibility to pass anything.

mikita-sakalouski commented 3 weeks ago

If this is method of a class:

def clean_up_spark_streaming_checkpoint(spark: SparkSession) -> SparkSession:
    # add desired config to the given spark session and return it
    return spark

then it should be as:

def clean_up_spark_streaming_checkpoint(self) -> SparkSession:
    # add desired config to the given spark session and return it
    # self.spark.config.set()
    return self.spark