delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.6k stars 1.71k forks source link

[Feature Request] add DeltaTable.insert(df) operation to Delta Lake API #1345

Open keen85 opened 2 years ago

keen85 commented 2 years ago

Feature request

Overview

When inserting rows via spark df.write operation into a Delta Lake table it would be helpful if number of inserted rows (Long) would be returned.

Motivation

We are currently building a data lakehouse facilitating Delta Lake as storage format. Ingested raw data are processed and curated through multiple layers (bronze/silver/gold). In order to maintain some kind of data lineage and processing audit as well as ensuring data quality / completeness, it is important to log how many rows were inserted/deleted/updated when new data is processed from on layer to another.

Delta Lake API DELETE operations already returns metrics, and metrics for operations UPDATE and MERGE are in development (see https://github.com/delta-io/delta/issues/1321 and https://github.com/delta-io/delta/issues/1322). Basically metrics for plain INSERTS/writes/appends is the last missing puzzle piece.

Currently df.write doesn't return any metrics. Our workaround is to analyze the commit info from the Delta Logs in a separate post-processing step and extract the metric from it.

Further details

Delta Log version looks like this; we are interested in attribute commitInfo.operationMetrics.numOutputRows

{
  "commitInfo": {
    "timestamp": 1658227579417,
    "operation": "WRITE",
    "operationParameters": {
      "mode": "Append",
      "partitionBy": "[]"
    },
    "readVersion": 317,
    "isBlindAppend": true,
    "operationMetrics": {
      "numFiles": "1",
      "numOutputBytes": "2147041",
      "numOutputRows": "2986"
    }
  }
}

Our workaround for retrieving the number of written rows looks like this:

written_rows_count = int(deltaTable.history(1).select(F.col('operationMetrics')).collect()[0]['operationMetrics']['numOutputRows'])

Disclaimer

I'm not sure if my request is technically in scope of Delta Lake OSS since it is closely related to Apache Spark. If it is not, I'd like to open the discussion if it would make sense to add an INSERT operation to Delta Lake API that can be used instead of df.write and the provides the desired metric.

Willingness to contribute

The Delta Lake Community encourages new feature contributions. Would you or another member of your organization be willing to contribute an implementation of this feature?

TarekSalha commented 2 years ago

I certainly like the idea to also have an insert statement being available in delta lake. This would make my pipelines much more convenient as it would reduce boilerplate code

nkarpov commented 2 years ago

@TarekSalha Delta Lake does support INSERT INTO using Spark SQL as documented here.

@keen85 thanks for posting this issue. A change to the return value of df.write API would have to happen in Spark, not Delta, so we'll close this issue.

keen85 commented 2 years ago

Hi @nkarpov, What about adding a DeltaTable.insert(df) operation to the Delta Lake API?

zsxwing commented 2 years ago

That's a good point. Re-opening this in case someone wants to work on this.