delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.22k stars 1.62k forks source link

[Feature Request] Support for Spark Connect (aka Delta Connect) #3240

Open allisonport-db opened 2 weeks ago

allisonport-db commented 2 weeks ago

Feature request

Which Delta project/connector is this regarding?

Overview

Spark Connect is a new initiative in Apache Spark that adds a decoupled client-server infrastructure which allows Spark applications to connect remotely to a Spark server and run SQL / Dataframe operations. We want to develop what we're calling "Delta Connect" to allow Delta operations to be made in applications running in such client-server mode.

Further details

These are the CUJs we would like to support:

Server

The server is packaged into the io.delta:delta-spark-connect-server_2.13 package, installing this package automatically installs the io.delta:delta-spark-connect-common_2.13 package.

sbin/start-connect-server.sh \
  --packages org.apache.spark:spark-connect_2.13:4.0.0, io.delta:delta-spark-connect-server_2.13:4.0.0 \
  --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
  --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

Scala Client

The client is packaged into the io.delta:delta-spark-connect-client_2.13 package, installing this package automatically installs the io.delta:delta-spark-connect-common_2.13 package.

export SPARK_REMOTE="sc://localhost:15002"
spark-connect-repl --packages io.delta:delta-spark-connect-client_2.13:4.0.0

The delta-spark-connect-client_2.13 package uses the exact same class and package names as the delta-spark_2.13 package. Therefore the exact same code can be used as before.

import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTable = DeltaTable.forName(spark, "my_table")

deltaTable.update(
  condition = expr("id % 2 == 0"),
  set = Map("id" -> expr("id + 100")))

Python Client

The Delta Connect Python client is included in the same PyPi package as Delta Spark.

pip install pyspark==4.0.0
pip install delta-spark==4.0.0

There is no difference in usage compared to the classic way. We just need to pass in a remote SparkSession (instead of a local one) to the DeltaTable API.

from delta.tables import DeltaTable
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.remote("sc://localhost:15002").getOrCreate()

deltaTable = DeltaTable.forName(spark, "my_table")

deltaTable.update(
  condition = expr("id % 2 == 0"),
  set = { "id": expr("id + 100") })