delta-io / delta-rs

A native Rust library for Delta Lake, with bindings into Python
https://delta-io.github.io/delta-rs/
Apache License 2.0
2.36k stars 416 forks source link

Proposal to create logical plans for operations #2006

Open Blajda opened 11 months ago

Blajda commented 11 months ago

Description

Propose further work that I'd like to perform regarding the creation reusable logical relations. Also helps with identifying relations we would need with substrait.

Delta Find Files Purpose: Identify files that contain records that satisfy a predicate.

This relation will generate a record batch stream with a single column called path. path will then map to an Add action in the Delta table. This relation will also maintain a list of files that satisfy the predicate which can be passed sideways to relations downstream.

Delta Scan Purpose: Scan the Delta Table

Update DeltaScan to take an optional input stream that contains paths of files to be scanned. This will enable DeltaScan to consume output of DeltaFindFile. Currently when using find files, we must wait for the entire operation to complete and then we build the scan. The change enables Delta Scan to start when the first candidate file is identified. I think this will require some significant work since it will involve refactoring the current DeltaScan implementation.

Delta Write Purpose: Write records to storage, conflict resolution, and commit creation

Takes an single input stream of data that matches that tables schema and creates Add actions for each new file. Information can be passed sideways to include additional delta actions to add to the commit. E.G DeltaDelete can provide a stream of Remove actions.

Delta Delete Purpose: Delete Records from the table.

Given a predicate delete records from the Delta table. Delta Delete can take an optional stream of records and will output records that do NOT satisfy the predicate. It will maintain a stream of Remove actions can be passed sideways to other operations downstream.

The input stream is optional since there are cases where delete determine which files to remove without a need for a scan. An optimization phase can help determine when this is the case.

Diagram

High level diagram of how these relation will connect.

               ┌───────────────────────┐
               │   Delta Find Files    │
               │                       │
               │  Predicate:           │
           ┌───┤    Version:           │
           │   │                       │
           │   └──────────┬────────────┘
           │              │
           │              ▼
   Files   │   ┌───────────────────────┐
  Matched  │   │     Delta Scan        │
   List    │   │                       │
           │   │   Version:            │
           │   │                       │
           │   │                       │
           │   └──────────┬────────────┘
           │              │
           │              ▼
           │   ┌───────────────────────┐
           └──►│     Delta Delete      │
               │                       │
               │  Predicate:           │
           ┌───┤                       │
           │   └──────────┬────────────┘
  Remove   │              │
  Actions  │              ▼
           │   ┌───────────────────────┐
           │   │     Delta Write       │
           └──►│                       │
               │                       │
               │                       │
               └───────────────────────┘

Converting the ReplaceWhere operation to a logical view can look something like this


               ┌───────────────────────┐
               │   Delta Find Files    │
               │                       │
               │  Predicate:           │
           ┌───┤    Version:           │
           │   │                       │
           │   └──────────┬────────────┘
           │              │
           │              ▼                     ┌────────────────────────────┐
   Files   │   ┌───────────────────────┐        │        Data  Source        │
  Matched  │   │     Delta Scan        │        │                            │
   List    │   │                       │        │                            │
           │   │   Version:            │        └────────────┬───────────────┘
           │   │                       │                     │
           │   │                       │                     ▼
           │   └──────────┬────────────┘        ┌────────────────────────────┐
           │              │                     │    Delta Constraint Check  │
           │              ▼                     │                            │
           │   ┌───────────────────────┐        └────────────┬───────────────┘
           └──►│     Delta Delete      │                     │
               │                       │                     │
               │  Predicate:           │                     │
           ┌───┤                       │                     │
           │   └──────────┬────────────┘                     │
  Remove   │              │                                  │
  Actions  │              └────────────────┐   ┌─────────────┘
           │                               ▼   ▼
           │       ┌──────────────────────────────────────────────────────────┐
           │       │                      Union                               │
           │       │                                                          │
           │       └─────────────────────────┬────────────────────────────────┘
           │                                 │
           │                                 ▼
           │                      ┌───────────────────────┐
           │                      │     Delta Write       │
           └──────────────────────┤                       │
                                  │                       │
                                  │                       │
                                  └───────────────────────┘

Use Case

Once we have logical plans for Update and Delete we can expose new Datafusion SQL statements for them May help with reuse of Delete & Update other for logical plans.

Related Issue(s)

roeap commented 11 months ago

Great writeup @Blajda - I am a great fan of going the logical plan route for all our operations.

In another repo, I have been experimenting a bit with the end to end flow starting with parsing delta-specific SQL and will upstream some of these changes soon. Reading through your proposals, it seems the "find files" plan would essentially contain the logic from kernel and the concept of a Snapshot. Some of the work I have been doing around moving the state management to Arrow RecodBatch'es touches some of that I think.

Essentially the plan is to make out state less eager and hopefully improve processing performance along the way. The logic is as follows.

I think this would pay into the proposed DeltaFindFiles and DeltaScan operations and also support the described steam processing to start.

As a "corollary" of this work we also have some progress on a much improved parquet reader the supports selective reading of leaf columns (also for nested structs) as well as more fine-granulary casting the schema - i.e. support schema evolution.

Good news is, recently we have been consolidating the used APIs from the current DeltaTableState to become a snapshot. With the removal of parquet2 support in #1995 we will also greatly simplify log parsing by using the existing arrow integration in parquet crate, and eleminating the need to maintain manual parsing.

Implicity your proposal also moves us to keep data much longer as RecordBatches and avoid crating actual Add, Remove structs etc. which I strongly support as well :).

As you said, this will be a lot of work, but also generate a lot of impact!

Blajda commented 11 months ago

As a "corollary" of this work we also have some progress on a much improved parquet reader the supports selective reading of leaf columns (also for nested structs) as well as more fine-granulary casting the schema - i.e. support schema evolution.

Is this work available anywhere? To start I want to update DeltaScans implement to manage parquet reading itself instead of depending on depending entirely on Datafusion's parquet reader. We have multiple bugs caused by non-compliant writers (spark) that write timestamp info using Int96. This can be resolved by having an 'adapter' to conform data to the correct type and can help with 'Big' datatypes that are also used in the ecosystem. The 'adapter' will also align with schema evolution since not every parquet file will have the same physical schema.


I'm aligned with changes being made for LogSegment and Snapshot. Something that might beneficial is to have some sort of local 'registry'/'cache' that given a physical name / 'table handle' and version returns a shared structure in memory to reduce our overall memory footprint and network calls. When building operators there are some cases were clone Snapshot to satisfy the borrow checker.

Blajda commented 11 months ago

To clarify about operations needing to clone Snapshot. Operations are pinned to a particular version of the table and this data is read is multiple places however when the operation ends we return a table with a new version. The snapshot of the new table is cloned from the previous. Suppose each snapshot takes 10 units of memory we now may consume 20 units. It would be neat if we can build some of 'linked list' where snapshots can share data and each version would just maintain a logical view of which segments they need.

hntd187 commented 10 months ago

So, part of #2095 I was suggesting was refactoring the builders to pass back their actions instead of doing everything in their own internal way. I'm in agreeance that moving to this method would help a lot for generalizing execution, but at a slightly higher level how do we think this would alter operation builders? Their public facing API obviously would not change, but internally they would pass back actions and an execution plan? Or does the execution plan also take care of actions and commit log updates?

Blajda commented 10 months ago

@hntd187 The execution plan would also take care of the actions and commit log updates at this point. In the above diagrams this would have been done in the Delta write relation however we can consider splitting write and commit into two different relation if it makes it easier.

hntd187 commented 10 months ago

Well I suppose the logical plans don't need to care about the details of what has to happen with actions, but the physical ones will need to be able to pass that context along. I don't have a strong opinion about where the write and commit happen, just that you can compose the actions of various logical operators into a single planned write and commit. Hopefully that makes sense.