fal-ai / dbt-fal

do more with dbt. dbt-fal helps you run Python alongside dbt, so you can send Slack alerts, detect anomalies and build machine learning models.
https://fal.ai/dbt-fal
Apache License 2.0
851 stars 71 forks source link

Enable updates to a model from fal scripts #195

Open chamini2 opened 2 years ago

chamini2 commented 2 years ago

Initial proposal

The idea of this function is to be able to update a model table after it has been run (as an after-hook).

An example scenario would be:

-- models/tickets/tickets_with_sentiment.sql
SELECT
  *,
  -- NOTE: will be filled by fal in sentiment_analysis.py
  NULL AS label, 
  NULL AS score
FROM {{ ref('tickets') }}

Then, the after-hook:

# models/tickets/sentiment_analysis.py
ticket_data = ref(context.current_model)
ticket_descriptions = list(ticket_data.description)
classifier = pipeline("sentiment-analysis")
description_sentiment_analysis = classifier(ticket_descriptions)

rows = []
for id, sentiment in zip(ticket_data.id, description_sentiment_analysis):
    rows.append((int(id), sentiment["label"], sentiment["score"]))

records = np.array(rows, dtype=[("id", int), ("label", "U8"), ("score", float)])

sentiment_df = pd.DataFrame.from_records(records)

print("Uploading\n", sentiment_df)
write_to_model(
    dataframe=sentiment_df,
    # needed because function has no context of where it is being called from
    # we just have to document very well
    # (btw, what would happen if people used it "wrong"?)
    ref=context.current_model,
    id_column='id', # must be the same in df and table, used for knowing WHICH row to update
    columns=['label', 'score'] # defaults to ALL columns in dataframe?
)

How would the actual SQL statement look?

SQL does not match this kind of operation of inserting data on already existing rows very well. So you usually are updating data based on other database data or not doing it in big batches as we will.

The following SQL statement should work. However, more ideas may come up.

UPDATE {{ ref('tickets') }} _table
JOIN (
    SELECT 1 as id, 'positive' AS label, 0.8 AS score
    UNION ALL
    SELECT 1 as id, 'negative' AS label, 0.6 AS score
    UNION ALL
    SELECT 1 as id, 'neutral' AS label, 0.9 AS score
) _insert
ON _insert.id = _table.id
SET
    _table.label = _insert.label,
    _table.score = _insert.score;
burkaygur commented 2 years ago

I have a potentially interesting idea:

what if we leverage the existing dbt macros to do the merge?

so the operation would be:

  1. upload the dataframe to a staging table
  2. leverage the adapter specific “merge” macros to merge the two tables

i am thinking this could help: https://github.com/dbt-labs/dbt-core/blob/main/core/dbt/include/global_project/macros/materializations/snapshots/snapshot_merge.sql

and for dbt-bigquery, maybe we could use this: https://github.com/dbt-labs/dbt-bigquery/blob/main/dbt/include/bigquery/macros/materializations/snapshot.sql

and so on

Curious to see if this would lead anywhere.

chamini2 commented 2 years ago

We are avoiding updates for now and just support append and overwrite behavior for the new write_to_model function (#249).