delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.59k stars 1.7k forks source link

[Feature Request] Spark Connect support for the Python API #3231

Open sebastiandaberdaku opened 5 months ago

sebastiandaberdaku commented 5 months ago

Feature request

Which Delta project/connector is this regarding?

Spark Connect support for the Python API

Overview

Currently, the Python Delta Table API does not fully support Spark Connect. This is because when using Spark Connect, lower level APIs such as the Spark Context are not available. The Spark Context object is however used in the DeltaTable.forPath and DeltaTable.forName methods, which means that these two methods cannot be used with Spark Connect.

Motivation

This feature will benefit all PySpark users who want to use the DeltaTable Python API in their code when using Spark Connect.

Further details

The Delta Table SQL API is supported by Spark Connect. For the moment I have been converting PySpark DataFrames to Temporary Views, and then used Spark SQL to do merges.

For others who might need this in the mean time, The following code provides a function to convert a PySpark DataFrame into a TempView.

import logging
import random
import string
from contextlib import contextmanager
from typing import Generator

from pyspark.errors import TempTableAlreadyExistsException
from pyspark.sql import DataFrame

logger = logging.getLogger(__name__)

class MultipleFailuresException(Exception):
    """Custom exception raised when an operation has failed after several retries."""
    pass

def generate_random_temp_view_name(length: int) -> str:
    """
    Generates a random temp view name consisting of lowercase letters of specified length using the
    'string.ascii_lowercase' module and the 'random.choice' function.

    :param length: Length of the random temp view name to be generated.
    :return: Random temp view name as a string.
    """
    # Use string.ascii_lowercase to get all lowercase letters
    lowercase_letters = string.ascii_lowercase
    # Use random.choice to randomly select characters from the set of lowercase letters
    random_temp_view_name = ''.join(random.choice(lowercase_letters) for _ in range(length))
    return random_temp_view_name

@contextmanager
def create_temp_view(df: DataFrame, max_attempts: int = 5) -> Generator[str, None, None]:
    """
    Create a temporary view for a given DataFrame.

    :param df: The DataFrame to be used for creating the temporary view.
    :param max_attempts: The maximum number of attempts to create the temporary view if it already exists. Default is 5.
    :return: The name of the created temporary view.

    :raises MultipleFailuresException: If the temporary view creation fails after the maximum number of attempts.
    """
    temp_view_name = None
    left_attempts = max_attempts
    try:
        while temp_view_name is None and left_attempts > 0:
            try:
                temp_view_name = generate_random_temp_view_name(length=12)
                df.createTempView(temp_view_name)
                logger.info(f"Created temp view named '{temp_view_name}'!")
            except TempTableAlreadyExistsException as e:
                logger.warning(
                    msg=f"Temp view named '{temp_view_name}' already exists! Attempts left: {left_attempts}.",
                    exc_info=e
                )
                left_attempts -= 1
        if temp_view_name is None:
            raise MultipleFailuresException(f"Temp view creation failed after {max_attempts} attempts!")
        yield temp_view_name
    finally:
        if temp_view_name is None:
            logger.warning("No temp view created so nothing to drop!")
        else:
            if df.sparkSession.catalog.dropTempView(temp_view_name):
                logger.info(f"Dropped temp view named '{temp_view_name}'!")
            else:
                logger.info(f"Failed to drop temp view named '{temp_view_name}'!")

Now, say to_upsert is the PySpark DataFrame you want to upsert into the DeltaTable located at path. You can use the provided function like so:

with create_temp_view(to_upsert) as temp_view:
    merge_condition = f"target.some_column = {temp_view}.some_column"
    logger.info(f"Merge condition is: {merge_condition}.")

    merge_query = f"""
        MERGE INTO delta.`{path}` AS target
        USING {temp_view}
          ON {merge_condition}
        WHEN MATCHED THEN
          UPDATE SET *
        WHEN NOT MATCHED THEN
          INSERT *
    """
    logger.info(f"Merge query is: {merge_query}.")
    spark.sql(merge_query)

Willingness to contribute

The Delta Lake Community encourages new feature contributions. Would you or another member of your organization be willing to contribute an implementation of this feature?

allisonport-db commented 5 months ago

Hey @sebastiandaberdaku thanks for opening this! I just created a parent issue https://github.com/delta-io/delta/issues/3240 which addresses this for both the Scala and Python DeltaTable APIs.

This is definitely on our roadmap for future releases (and the Delta 4.0.0 Preview release will include some partial support!)