iterative / dvc

🦉 ML Experiments and Data Management with Git
https://dvc.org
Apache License 2.0
13.38k stars 1.16k forks source link

Epic: Database table/non-file dependencies #9945

Closed dberenbaum closed 5 months ago

dberenbaum commented 9 months ago

Update

See updated proposal below.

Original proposal (outdated)

Summary / Background

Tables/datasets from databases and similar systems can be specified as dependencies in dvc pipelines.

Related:

Scope

Assumptions

Open Questions

Blockers / Dependencies

General Approach

Example:

# dvc.yaml
stages:
  train:
    cmd: python train.py
    deps:
      - train.py
      - customer_info:
          type: delta-lake
          conn_info: ...
    outs:
      - model
# dvc.lock
train:
  cmd: python train.py
  deps:
    - path: train.py
       hash: md5
       md5: 324001573ed724e5ae092226fcf9ca30
       size: 1666
    - name: customer_info
       type: delta-lake
       version: 2
  outs:
    - path: model
       hash: md5
       md5: f9ee137c1772f7fb21b8f52c8a72c8ac
       size: 1957931

Steps

Must have (p1)

Optional / followup (p2)

Timelines

TBD by the assignee

dberenbaum commented 9 months ago
  • Delta Lake tables

This can be limited to databricks rather than a generic connection to delta lake. https://docs.databricks.com/en/dev-tools/index.html might be helpful to research.

skshetry commented 9 months ago

Also related: https://github.com/iterative/dvc/issues/2378.

dberenbaum commented 9 months ago

Edited to add generalized support for callbacks deps as a p2 item

skshetry commented 9 months ago

Snowflake has LAST_ALTERED that can be used

LAST_ALTERED TIMESTAMP_LTZ Date and time when the table was last altered by a DDL or DML operation.

For views and external tables, even though they have LAST_ALTERED, they only record the timestamp when the structure was changed. So this approach is only going to work for tables.

use schema snowflake_sample_data.tpcds_sf100tcl;
SELECT LAST_ALTERED 
FROM snowflake_sample_data.INFORMATION_SCHEMA.TABLES 
WHERE TABLE_NAME = 'CUSTOMER' and TABLE_SCHEMA = 'TPCDS_SF100TCL';

Snowflake also has time-travel feature where you can query using AT/BEFORE SQL extensions. The data retention is set to 1 day by default for all accounts, but you can only extend it to 90 days max if you have an enterprise account.

# show data from 5 minutes ago
SELECT * FROM customer_table AT(OFFSET => -60*5);

We can also use hash_agg(*), which computes a single hash for the table. It seemed to be used for checking data consistency, equivalency of different tables, and retriggering pipelines[citation required]. It seemed fast enough to me on the snowflake sample data, but I am not sure how fast it will be on a really large datasets.

SELECT hash_agg(*) FROM table;
HASH_AGG(*)
-5,290,741,514,389,461,925

The first one is applicable only for snowflake, and the second one does not seem very useful by default. The hash_agg approach may be generalizable to other databases, as they have similar functions (if it's okay to compute each time). Anyway, it's a good function to measure and see (On a snowflake trial, it was ⚡ fast for me on the sample dataset).

skshetry commented 9 months ago

Dolt is a full-fledged mysql database with some stored procedures for Git-like operations and versioning capabilities.

See https://www.dolthub.com/blog/2023-04-19-dolt-architecture-intro/. cc @shcheklein Also see Dolt 1.0 where they replaced internal storage format.

Also, there have seen some attempts on integrating dolt with DVC.

dberenbaum commented 9 months ago

@skshetry hash_agg() is what I used at my last company to use dvc with snowflake, and from discussion with some others on other teams in the org, I think this was generally agreed as the best way to do it. It worked pretty well even for large datasets AFAIR.

dberenbaum commented 9 months ago

Notes from discussion with @dmpetrov and @shcheklein:

Other tools to research in this area:

We also discussed starting even simpler and leave it to the user to decide when to check for updates and run the query again. In the basic use case of "query and dump," someone wants to run an expensive query, often across multiple tables, and dump the results to a file. DVC caches the results to recover later, and the user chooses when to update those results.

A simple example of how to do this in DVC now would be to write a script to execute the query and dump the results:

# dump.py
import pandas as pd
import sqlite3

conn = sqlite3.connect("mydb.db")
df = pd.read_sql_query("SELECT * FROM table WHERE ...", conn)
df.to_csv("dump.csv")

Then wrap that in a DVC stage:

stages:
  dump:
    cmd: python dump.py
    deps:
      - dump.py
    outs:
      - dump.csv

Running dvc repro will cache the output in a csv file and won't run again unless dump.py changes (for example, if the query is updated). If the user thinks the database tables have changed and wants to run the stage again, they can use --force to update the results.

The simplest approach we can take is to document this pattern. Pending looking deeper into the technologies above, I think anything beyond that will require us to write some database-specific functions to either:

dberenbaum commented 9 months ago
  • dump query results to a file (like the Python script is doing in this example). This one is probably more generalizable since it's probably possible to run queries across many database flavors with something like sqlalchemy and then have some common serialization function. However, we would need some new syntax (dvc import-db --conn sqlite:... --input query.sql --output dump.csv?) to support it.

Additional considerations:

skshetry commented 9 months ago

How much do users care about materializing as files compared to materializing as a transient table or a view that they use in the next stage? Materializing to a file is not always possible for large data warehouses databases, and compute/storage is cheap anyway. Although depends on what databases we are targeting and if we are focused on end-to-end scenarios or not.

dberenbaum commented 9 months ago

Discussed in the sprint meeting:

skshetry commented 9 months ago

dbt seems useful for lineage tracking inside a database and dvc is useful for files -- can we show how to connect them for full lineage that starts in a database and ends in a model or other non-database files?

With dbt, you can deploy to a separate production schema (or your dev schema), so the way to convert a database table to a file is the same as for any other databases. dbt is not in the picture for this. You can track full lineage in dbt and use dvc to work with files even today. Note that a dbt repository is just a git repository, so you can easily use both dvc and dbt at the same time.

dberenbaum commented 9 months ago

Related: https://www.comet.com/site/blog/comet-snowflake-a-powerful-combination-for-better-reproducibility-and-visibility-into-your-ml-workflow/

skshetry commented 9 months ago

BigQuery has last_modified_time in __tables__.

SELECT 
  table_id, 
  TIMESTAMP_MILLIS(last_modified_time) AS last_modified
FROM 
  project_id.dataset.__TABLES__

There are other ways written on this blog post, which also mentions that there is a way using INFORMATION_SCHEMA.TABLES, but I could neither find it documented in TABLES view nor comes up in query.

There is not a hash_agg() function like in BigQuery either, but we probably can use this: https://stackoverflow.com/questions/50428639/does-bigquery-has-an-api-to-return-a-checksum-for-a-exported-table.

skshetry commented 9 months ago

Postgresql>=9.5 has track_commit_timestamp config, that when enabled logs the commit timestamp.

Also, this post suggests that it's not that reliable:

PostgreSQL 9.5 has commit timestamps. If you have them enabled in postgresql.conf (and did so in the past too), you can check the commit timestamp for the row with the greatest xmin to approximate the last modified time. It's only an approximation because if the most recent rows have been deleted they won't be counted.

Also, commit timestamp records are only kept for a limited time. So if you want to tell when a table that isn't modified much is modified, the answer will effectively be "dunno, a while ago".

There is no metadata in INFORMATION_SCHEMA.TABLES and no hash_agg like checksum functions, but can be achieved similar to above in BigQuery.

skshetry commented 9 months ago

Databricks has LAST_ALTERED field in INFORMATION_SCHEMA.TABLES.

But no such fields exist for views (not even for DDL changes).

skshetry commented 9 months ago

MySQL has a UPDATE_TIMEin INFORMATION_SCHEMA.TABLES, but it depends on storage engine used. By default, it should exist. But it is not persisted over reboots.

There is also something called live checksum that you can set when creating the table, and a checksum field will be populated in the INFORMATION_SCHEMA.TABLES for that table, but it might make inserts slower.

skshetry commented 9 months ago

For SQL Server, I could only find modify_date in sys.objects table.

modify_date datetime Date the object was last modified by using an ALTER statement. If the object is a table or a view, modify_date also changes when an index on the table or view is created or altered.

Since it is a system table, probably not everyone has access to it.

There is CHECKSUM_AGG function in SQL Server.

dberenbaum commented 9 months ago

dbt is conceptually analagous to dvc for databases. One approach to avoid having to build too much ourselves or maintain implementations for different databases is to be opinionated. In other words, we could suggest to use dbt for database/data engineering/ETL version control (and see if they are also interested in partnering on this), and then only provide an integration with dbt.

For example, dbt already has a way to determine the freshness of a table that we could use to determine when data was last updated (when available).

If we want to start with simply dumping a query result, we could provide a way to specify a dbt model to dump.

Besides this approach being easier to implement, the reason to take this approach would be to have a holistic end-to-end workflow that supports versioning and reproducibility for structured data, starting from ETL all the way to model training and deployment (similar to how dvcx may be upstream of dvc for unstructured data workflows).

Edit: I should also mention downsides to this approach. It's opinionated and requires learning another fairly complex tool.

skshetry commented 8 months ago

Different databases offer different authentication and configuration mechanisms. For example, Snowflake supports username/password auth, key-pair auths, MFA, SSO, etc. Redshift supports password-based auth and IAM based authentication.

Similarly, bigquery supports oauth, service-account based login, etc. And these are only a few databases out of many that we'd have to support. Also, each database requires a driver/connector to run.

Even if we do start with password-based auth, it is likely going to turn into something more complicated soon, so I don't find the complexity worth it for the import-db feature (which in a way is already possible via user's script).

I am trying to look into dbt, to see if we can reuse their "Connection Profiles" to run the database queries. They have the concept of adapters which are implemented for each of the databases, with some maintained by dbt-labs, and some community maintained. See https://docs.getdbt.com/docs/core/connect-data-platform/about-core-connections.

Similarly, we could reuse source freshness implementation from dbt for dependencies. (See https://github.com/dbt-labs/dbt-core/issues/7012 which we would benefit from if we can somehow use dbt).

dbt seems risky because they are primarily a CLI application, and internals may not be stable or accessible. I'll try looking deeper into their internals.

Also might be worth discussing from other integrations POV with dbt.

dberenbaum commented 8 months ago

@skshetry @shcheklein To provide a bit more concrete proposal from the thoughts above about dbt integration, let me try to start with the query and dump use case and explain how it might work with dbt.

Assumptions:

The user can write their query as a dbt model. This looks like a templated select query, but dbt automatically saves it to a table (it's also possible to save it as a view or a few other variations of how to materialize the model).

Then they can dump the results to a dvc-tracked file:

dvc import-dbt . customers

The arguments are a path to the dbt repo and the name of the dbt model to import. This could perform a select * from customers and write those results to a file like customers.csv tracked by customers.csv.dvc (path and output format could be configurable).

Some benefits of this approach:

Questions:

skshetry commented 8 months ago

There is also dbt-fal that provides a function to get a pandas dataframe from a dbt model. But it's one more thing on top of dbt that user should know (but could be a good starting point to take inspiration from).

dberenbaum commented 8 months ago

Relevant discord comment: https://discord.com/channels/485586884165107732/563406153334128681/1163602577552846998

dberenbaum commented 8 months ago

Discussed with @skshetry to move forward with a proof of concept on this that we can use to:

dberenbaum commented 7 months ago

@skshetry Should we consider Delta Lake tables done since Databricks is a supported backend for dbt? I don't think we can prioritize another effort specific to Delta Lake right now.

dberenbaum commented 5 months ago

Now that we have import-db, I'm going to close this and open a new one for streaming support related to https://github.com/iterative/dvc/pull/10164.