Nike-Inc / koheesio

Python framework for building efficient data pipelines. It promotes modularity and collaboration, enabling the creation of complex pipelines from simple, reusable components.
https://engineering.nike.com/koheesio/
Apache License 2.0
602 stars 23 forks source link

[BUG] Spark Connect not detected on Databricks #102

Open YevIgn opened 1 week ago

YevIgn commented 1 week ago

Describe the bug

When running on Databricks in Spark Connect mode (for example Shared Isolation mode, Job Cluster, DBR 15.4) - Spark Connect isn't detected by koheesio, which leads to exceptions in code that can only be executed on regular SparkSession.

Steps to Reproduce

Run code that involves for example use of DeltaTableWriter with merge_builder passed as DeltaMergeBuilder, it will fail on 0.9.0rc versions.

Expected behavior

Spark Connect is detected while using Koheesio 0.9.0 (future release branch).

Environment

Additional context

The issue is caused by relying on non-existing configuration parameter spark.remote - https://github.com/Nike-Inc/koheesio/blob/abb435b409b620ae71dd868a5647256eeb4194b4/src/koheesio/spark/utils/connect.py#L17

dannymeijer commented 1 week ago

we could take inspiration from Spark 4.0 pre-release - it's unfortunate that this util is not available in earlier spark versions https://spark.apache.org/docs/4.0.0-preview1/api/python/_modules/pyspark/sql/utils.html#is_remote

def is_remote() -> bool:
    """
    Returns if the current running environment is for Spark Connect.

    .. versionadded:: 4.0.0

    Notes
    -----
    This will only return ``True`` if there is a remote session running.
    Otherwise, it returns ``False``.

    This API is unstable, and for developers.

    Returns
    -------
    bool

    Examples
    --------
    >>> from pyspark.sql import is_remote
    >>> is_remote()
    False
    """
    return ("SPARK_CONNECT_MODE_ENABLED" in os.environ) or is_remote_only()
dannymeijer commented 1 week ago

Here is another Spark 4.0 nugget, which is showing how it is going to be changing in the next release:

def is_remote_only() -> bool:
    """
    Returns if the current running environment is only for Spark Connect.
    If users install pyspark-connect alone, RDD API does not exist.

    .. versionadded:: 4.0.0

    Notes
    -----
    This will only return ``True`` if installed PySpark is only for Spark Connect.
    Otherwise, it returns ``False``.

    This API is unstable, and for developers.

    Returns
    -------
    bool

    Examples
    --------
    >>> from pyspark.sql import is_remote
    >>> is_remote()
    False
    """
    global _is_remote_only

    if "SPARK_SKIP_CONNECT_COMPAT_TESTS" in os.environ:
        return True

    if _is_remote_only is not None:
        return _is_remote_only
    try:
        from pyspark import core  # noqa: F401

        _is_remote_only = False
        return _is_remote_only
    except ImportError:
        _is_remote_only = True
        return _is_remote_only

source: https://spark.apache.org/docs/4.0.0-preview1/api/python/_modules/pyspark/util#is_remote_only

Namely, you can see that pyspark-connect is a separate package with not immediate relationship to 'core'. This as an aside, yet related.

my point... we should do what works for 3.4 and 3.5 spark which might mean we have to rely on there being an active sparksession to just determine this by the class instance type of the SparkSession; specifically because changes we can see coming to us in Spark 4 (when it comes out).

YevIgn commented 1 week ago

Thank you for your reply. I have just checked this check is present on https://github.com/apache/spark/blob/d39f5ab99f67ce959b4379ecc3d6e262c10146cf/python/pyspark/sql/utils.py#L156 - 3.5 brach

YevIgn commented 1 week ago

Neither of this options is set on Databricks Runtime :/

But also there is another bug to be fixed, after manually setting spark.remote the code continues to execute, but fails due to non-replaced isinstance(writer, DeltaMergeBuilder) check further on.

There are couple of them actually:

Given proliferation of this check in several places, it makes sense to make only once at instance initialisation or spin a separate class.