duckdb / dbt-duckdb

dbt (http://getdbt.com) adapter for DuckDB (http://duckdb.org)
Apache License 2.0
792 stars 70 forks source link

feat: support reading delta tables with delta plugin #263

Closed milicevica23 closed 8 months ago

milicevica23 commented 9 months ago

add delta table plugin

241

an example project can be found here

milicevica23 commented 9 months ago

Hi @jwills, i have question regarding execution lifetime

As i understand we load/create materialization in the compile time from the first node which is using source? Therefore if i register the df in that time it will/is not visible to the context of the node creation time.

####### Here starts compilation
21:03:24  Began running node model.dbt_duckdb_delta.customer_raw
21:03:24  1 of 1 START sql table model main.customer_raw ................................. [RUN]
21:03:24  Acquiring new duckdb connection 'model.dbt_duckdb_delta.customer_raw'
21:03:24  Began compiling node model.dbt_duckdb_delta.customer_raw
##### We register df here 
SourceConfig(name='customer', identifier='customer', schema='main', database='dbt', meta={'delta_table_path': '/home/aleks/git/my-projects/banchmark-utils/utils/sf1/delta/customer', 'plugin': 'delta'}, tags=[])
21:03:24  Writing injected SQL for node "model.dbt_duckdb_delta.customer_raw"
21:03:24  Timing info for model.dbt_duckdb_delta.customer_raw (compile): 23:03:24.439860 => 23:03:24.506147
21:03:24  Began executing node model.dbt_duckdb_delta.customer_raw
21:03:24  Writing runtime sql for node "model.dbt_duckdb_delta.customer_raw"
21:03:24  Using duckdb connection "model.dbt_duckdb_delta.customer_raw"
######HERE STARTS node creation and ends compilation
21:03:24  On model.dbt_duckdb_delta.customer_raw: BEGIN
21:03:24  Opening a new connection, currently in state init
21:03:24  SQL status: OK in 0.0 seconds
21:03:24  Using duckdb connection "model.dbt_duckdb_delta.customer_raw"
21:03:24  On model.dbt_duckdb_delta.customer_raw: /* {"app": "dbt", "dbt_version": "1.6.3", "profile_name": "dbt_duckdb_delta", "target_name": "dev", "node_id": "model.dbt_duckdb_delta.customer_raw"} */
###### We df need it here 
    create  table
      "dbt"."main"."customer_raw__dbt_tmp"

    as (

SELECT * FROM "dbt"."main"."customer"
    );

The reasoning why i want that is because if we can somehow register that df in the execution from creation then we dont have to materialize the loading data earlier and we can leverage filter pruning from the first query automatically which will reduce drastically complexity and table preparation

Do you have some resource where i can understand execution lifecycle of the dbt <-> adapter execution and can you point me on some entry points of that interaction

jwills commented 9 months ago

Hey @milicevica23 sorry for the lag here, I'm at a conference this week.

So at the time the source node in the dbt graph is created, we call the source plugin if one is defined (which is done here: https://github.com/duckdb/dbt-duckdb/blob/master/dbt/adapters/duckdb/relation.py#L42 ) which kicks off this logic block here: https://github.com/duckdb/dbt-duckdb/blob/master/dbt/adapters/duckdb/environments/local.py#L82

By default, that logic will materialize the data frame (or pyarrow table/dataset, or whatever) into the DuckDB database associated with the run (unless a table with the expected name already exists in the database, etc.-- there is some other logic in there for overwriting existing tables if they exist, etc., etc.)-- but you can also choose to materialize the data frame as a view instead, which I think is what you're alluding to here-- i.e., we can delay actually reading the data from the df/pyarrow dataset so that you can do predicate pushdown etc. later on in the dbt run.

milicevica23 commented 9 months ago

Hi @jwills thank you very much for the answer The problem is that i tried to register df in this moment but in the very next when we materialize the node the df instance is not there. I have to do that registration of the dataset in the moment/session when node which is dependent of the source is created. I am sorry if i can't explain it right because i miss a bit of knowlage for the general adapter <-> dbt logic and when is what build. Do you have something to recommend to learn a general concept?

jwills commented 9 months ago

Is it something you can demo for me so I can reproduce it myself? I think I understand the problem but I'm not entirely sure; if it is that the reference to the underlying data frame does not persist from dbt run to dbt run that makes sense, but the only way to fix that is via an actual DuckDB extension like the one that exists for Iceberg.

milicevica23 commented 9 months ago

will try to prepare a test, but have to do it tommorow

Just random two thoughts, i write it down not to forget:

  1. If we assume that we do the whole process in :memory: as stated in the main readme-> meaning two calls have to be in the same session? because if we create a view in one and use it in another (as currently) this view is still visible. (but scope can be not vissible but if i register it to the same session it should?) as they recommended here
  2. if we assume that two different processes are executed for the creation of the source node and the creation of the node which depends on the source -> we have to move the creation/registration of that df to the second process/session

This is purly performance reason otherwise it will work but as you said with first loding it in and then moving around

milicevica23 commented 9 months ago

I added a new test which showcase how the extension is used and it throws the same error While doing that i also tried with df registration but this doesnt work either. I need more understanding how the connection instances are build and copied around My feeling what happens is following:

import duckdb
from deltalake import DeltaTable
import pandas as pd

conn = duckdb.connect(":memory:")
conn1 = conn.cursor()
df = pd.DataFrame({"x": [1, 2, 3]})
conn.register("test_materialized",df)
conn.sql("SELECT * FROM test_materialized").show()
conn1.sql("SELECT * FROM test_materialized").show()

conn.close()
conn1.close()

This throws by conn1.sql():

CatalogException: Catalog Error: Table with name test_materialized does not exist!
Did you mean "sqlite_master"?

We create a connection in one point and then duplicate it but we dont register our df in the same connection instance

You did a very nice job with intercepting the creation of the node and adding plugins i like that idea a lot

milicevica23 commented 9 months ago

Hi @jwills i made it work as i wanted Now the following sql query

SELECT c_custkey, c_nationkey 
FROM {{source("delta_source", "customer")}}
where c_nationkey = 15

Load just the needed parts into the memory and filter and selection are automatically pushed down from the query. Explane produce following:

┌───────────── │ ARROW_SCAN
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │ c_custkey
│ c_nationkey
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │Filters: c_nationkey=15 AND │ c_nationkey IS NOT NULL │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │ EC: 1 │ │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │ 5921 │ │ (0.00s) │ └─────────────

The problem was as i described above, between runs the registration df was not copied with the new coursor. There are multiple similar issues

I now register the df by the load time in the local environment and reregister it in each connection before execute. I will try to understand if i can move it to some other point as for example initialization of the cursor. I would be happy if you can look over code and provide some feedback.

I still have to test if the remote storage as azure and s3 works out of box but it is slowly getting ready

milicevica23 commented 8 months ago

Sure, just point it out where and what i have to change and i will do it. Also excuse for my English not my first language so i will be happy if you see there some potential to improve comments or documentation just point it out and i will add it.

So what i see that is still to do:

I will try to go over those stuff in the next days, please add some points if i forgot some

milicevica23 commented 8 months ago

One more note i also tried this with storage options for Azure and it works very well and out of box so i don't think that there should be some problems with other providers as S3 or GCP storage. If there are then they are in the delta lake to fix but it should work. I added some more explanation in my demo project where i show how it works.

I also tried dbt power user and it also works out of box which i think is very nice because. This means you can profile your delta tables very simple from the vs code

milicevica23 commented 8 months ago

Hi @jwills, thank you for your help and all feedback. From the logical perspective i think this should work now, i would be happy if you have time to look once again over it and say what formatting can i fix. Do you think that we can write this in the dbt slack chat and if somebody has some feedback or something similar or we can merge it and say that is experimental and the adapt it, if somebody has some feedback? The thing which could be better is testing. I struggled a bit to understand utils for comparing two relations and how i can make it correctly

geoHeil commented 8 months ago

One note. from my side: the internal parquet reader is << 1 second to register the tables. The plugin-based delta reader took 5 seconds for me to process.

I do not know if there is potential to improve (given the external nature of the reader) but if possible, it would be nice to improve the performance.

milicevica23 commented 8 months ago

Interesting, I would take a look into it. I have to check if this comes from environment setup or reading metadata (by big delta tables, it can be that this takes time). Most important is that we don't load data into memory before pushdown predicates from the first materialization. One note: you have to load and persist data with this delta-adapter and not make a view over it because the instance of the delta pointer df is not in the scope anymore. So if you build a view, the duckdb file has just the representation of it but will throw an error because it doesn't know anything about the delta pointer.

jwills commented 8 months ago

just pushed some formatting fixes; going to take a shot at a) seeing if I can get the testing running for myself and then b) see if I can simplify things a bit more while keeping the tests passing

jwills commented 8 months ago

@milicevica23 so I think I'm good to go with this as-is; anything I'm not sure about in the implementation is stuff that we can iterate on in the future.

The one last thing I think you wanted was a way to change the default materialization type of a source plugin from a table to a view, right? Do you just want to add that as a method on the BasePlugin class that returns "table" by default and then the delta plugin overrides it to return "view"?

jwills commented 8 months ago

Hooray! Thank you @milicevica23! 🎉