Qbeast-io / qbeast-spark

Qbeast-spark: DataSource enabling multi-dimensional indexing and efficient data sampling. Big Data, free from the unnecessary!
https://qbeast.io/qbeast-our-tech/
Apache License 2.0
202 stars 18 forks source link

Investigate streaming options txnApp and txnVersion for Qbeast [Delta] #243

Closed osopardo1 closed 6 months ago

osopardo1 commented 7 months ago

From Delta Lake documentation, we notice that:

Delta tables support the following DataFrameWriter options to make writes to multiple tables within foreachBatch idempotent:

txnAppId: A unique string that you can pass on each DataFrame write. For example, you can use the StreamingQuery ID as txnAppId.

txnVersion: A monotonically increasing number that acts as a transaction version.

The user can pass these commit options from the batch processing of a stream:

val appId = ... // A unique string that is used as an application ID.
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...)  // location 1
  batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...)  // location 2
}

Overall, I think we should:

alexeiakimov commented 7 months ago

The answer to the first question is negative, Qbeast does not use these parameters. But the next question is what contract we should implement. Delta does the following:

  1. txnAppI and txnVersion can be specified in the write options or even in the Spark session. The session scoped values are used as a fallback.
  2. If the transaction has already been handled according to the log then no writes are performed, moreover the transaction options are removed from the session.
  3. If the transaction is new then the option values are saved in the Delta log via SetTransaction action.
  4. The manual recommends use checkpointLocation option for better results

The problem with Delta approach is that using old transaction identifier has side effects, moreover it is not clear why one needs to specify it at the session level. The role of checkpoints should also be clarified. The Delta approach seems unclear and complicated.

I suggest to implement something simpler like the following:

  1. The only way to specify the transaction identifier is to set the write options, the Spark session is ignored.
  2. The same like Delta
  3. The same like Delta
  4. The checkpoints should be postponed until it is really needed.

If we agree on having a simpler contract then I am not sure if it a good idea to use the same option names, because their semantics will slightly be different. @osopardo1 @cugni what do you think?

osopardo1 commented 6 months ago

I agree with the first three steps, although I am unfamiliar with the side effects of ignoring the SparkSession.

Checkpointing should be studied, because actual users need it.

alexeiakimov commented 6 months ago

The feature is implemented for main-1.0.0.