delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.62k stars 1.71k forks source link

[Question] Multiple writeStreams, Merge, and idempotent writes #1999

Open watfordkcf opened 1 year ago

watfordkcf commented 1 year ago

Question

Which Delta project/connector is this regarding?

Overview

We have some logic that looks like this:

def x_batch(x_df: DataFrame, batch_id: int) -> None:
    // do stuff including a merge

def y_batch(y_df: DataFrame, batch_id: int) -> None:
    // do stuff including a merge

x_kafka_topic.writeStream.forEachBatch(x_batch)
y_kafka_topic.writeStream.forEachBatch(y_batch)

We would like to use idempotent writes for each of these batches, however, because we're using a Delta Merge operation within the forEachBatch, we have to set the idempotency at the SparkSession conf level.

Is this safe? It isn't obvious that it would be:

def x_batch(x_df: DataFrame, batch_id: int) -> None:
    spark = x_df.sparkSession
    spark.conf.set("spark.databricks.delta.write.txnAppId", f"x-{SETTINGS.version.major}")
    spark.conf.set("spark.databricks.delta.write.txnVersion", batch_id)
    // do stuff including a merge

def y_batch(y_df: DataFrame, batch_id: int) -> None:
    spark = y_df.sparkSession
    spark.conf.set("spark.databricks.delta.write.txnAppId", f"y-{SETTINGS.version.major}")
    spark.conf.set("spark.databricks.delta.write.txnVersion", batch_id)
    // do stuff including a merge

x_kafka_topic.writeStream.forEachBatch(x_batch)
y_kafka_topic.writeStream.forEachBatch(y_batch)

Motivation

I noticed in the original feature request asking for this that .option(...) would be added to DeltaMergeBuilder, but it is clear from the implementation that it did not get added.

If this is not safe to do, then I would change this question to a feature request and allow options to be set on the DeltaMergeBuilder.

Further details

Spark 3.4.0 Delta 2.4.0

Willingness to contribute

The Delta Lake Community encourages new feature contributions. Would you or another member of your organization be willing to contribute an implementation of this feature?

watfordkcf commented 1 year ago

After digging into Spark's source code, it appears that each structured stream receives an isolated SparkSession, meaning it would be safe to set idempotent details within a microbatch. This is not explicitly called out anywhere obvious in the Spark documentation...

  /** Isolated spark session to run the batches with. */
  private val sparkSessionForStream = sparkSession.cloneSession()

https://github.com/apache/spark/blob/87a5442f7ed96b11051d8a9333476d080054e5a0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L193-L194

Which is then passed to:

  /**
   * Processes any data available between `availableOffsets` and `committedOffsets`.
   * @param sparkSessionToRunBatch Isolated [[SparkSession]] to run this batch with.
   */
  private def runBatch(sparkSessionToRunBatch: SparkSession): Unit = {

https://github.com/apache/spark/blob/87a5442f7ed96b11051d8a9333476d080054e5a0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L573-L577

pgrandjean commented 8 months ago

+1. Thank you for providing a workaround.

nickefy commented 3 months ago

Hi, can I confirm if its safe to set the idempotency spark config in each of the foreachbatch itself?

def x_batch(x_df: DataFrame, batch_id: int) -> None:
    spark = x_df.sparkSession
    spark.conf.set("spark.databricks.delta.write.txnAppId", f"x-{SETTINGS.version.major}")
    spark.conf.set("spark.databricks.delta.write.txnVersion", batch_id)
    // do stuff including a merge

def y_batch(y_df: DataFrame, batch_id: int) -> None:
    spark = y_df.sparkSession
    spark.conf.set("spark.databricks.delta.write.txnAppId", f"y-{SETTINGS.version.major}")
    spark.conf.set("spark.databricks.delta.write.txnVersion", batch_id)
    // do stuff including a merge

x_kafka_topic.writeStream.forEachBatch(x_batch)
y_kafka_topic.writeStream.forEachBatch(y_batch)

Just like how you did it here ?