dlt-hub / verified-sources

Contribute to dlt verified sources 🔥
https://dlthub.com/docs/walkthroughs/add-a-verified-source
Apache License 2.0
74 stars 50 forks source link

Add column selection and filtering to sql_database source #531

Closed arjun-panchmatia-mechademy closed 3 months ago

arjun-panchmatia-mechademy commented 4 months ago

Source name

sql_database

Describe the data you'd like to see

Background: A fairly common use-case when dealing with clients is having a direct database exposed for extraction. Considering the non-technical nature of these clients, these DBs often have lots of columns that may be irrelevant during the extraction process AND/OR require custom column-wise filtering to make the relevant data available for ingestion. This is best expressed as a SQL query where you can select individual columns and apply filters as needed. Currently, the sql_database source does not have the ability to do level of filtering or column selection. This makes it hard to use directly in pipelines, as often applying a map function still results in a full scan, slowing down critical loads. We often have instances of extreme volumes of data for even 15 minutes of incremental loads and cannot use this source as it would be far too slow, and potentially eat up a lot of in-memory store. We've been working around this by writing custom .sql files and yielding them from a resource, but that can be quite cumbersome from a maintenance perspective and does not allow us to leverage the full breadth of dlt's features.

Request: It would be great if we can get some level of control on the data loaded by the sql_database source (or the sql_table) resource. Potentially letting us select our required columns (maybe via a data_selector as is configred in the rest api source) and allow a level of filtering to occur. Aggregation would also be a generally useful function to have but that may be overkill and is probably better suited to post processing / ELT methadology anyway.

Are you a dlt user?

Yes, I'm already a dlt user.

Do you ready to contribute this extension?

No.

dlt destination

filesystem

Additional information

I think just having the ability to select columns and apply filters would go a long way. A big benefit of using ORMs is being DB agnostic. When trying to write an orchestration package, we need to necessarily account for variance and try to be as agnostic as possible, and this would go a long way towards helping us achieve that goal.

Current workaround is as described above: writing raw SQL / using an ORM and then yielding the data from the resource.

rudolfix commented 4 months ago

@arjun-panchmatia-mechademy we have table_adapter callback that let's you modify the SQL Alchemy Table before it is used for select. that allows to remove certain columns ie. (it is in the sql_database_pipeline.py demo)

def select_columns() -> None:
    """Uses table adapter callback to modify list of columns to be selected"""
    pipeline = dlt.pipeline(
        pipeline_name="rfam_database",
        destination="duckdb",
        dataset_name="rfam_data_cols",
        full_refresh=True,
    )

    def table_adapter(table: Table) -> None:
        print(table.name)
        if table.name == "family":
            # this is SqlAlchemy table. _columns are writable
            # let's drop updated column
            table._columns.remove(table.columns["updated"])

    family = sql_table(
        credentials="mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam",
        table="family",
        chunk_size=10,
        reflection_level="full_with_precision",
        table_adapter_callback=table_adapter,
    )

    # also we do not want the whole table, so we add limit to get just one chunk (10 records)
    pipeline.run(family.add_limit(1))
    # only 10 rows
    print(pipeline.last_trace.last_normalize_info)
    # no "updated" column in "family" table
    print(pipeline.default_schema.to_pretty_yaml())

regarding filters: really good idea! we intended to do it in similar way: just before executing we let user to modify the expression and ie add a WHERE clause

what do you think? those are powerful but advanced interfaces ie. you need to know SQL Alchemy to use them. did you have something simpler in mind?

arjun-panchmatia-mechademy commented 4 months ago

Hello! I gave it some thought:

1) I think both a data_selector and the table_adapter callback can coexist. Re: the former doesn't necessarily require you to understand and use SQLAlchemy. I understand dlt tends to be very unopinionated but for instance, I could parse endpoints for a REST API using RESTClient myself but the rest_api_source is a more opinionated and IMO more manageable solution.

2) I think SELECT can quite easily be implemented elegantly using a similar syntax as rest_api_source, something along the lines of maybe having a data_selector parameter or maybe even a columns parameter a la pandas? What do you think about it?

3) The last issue that will maybe not be possible to ever fully elegantly implement is aggregation. But IMHO that's probably best handled by either A) post processing using sql_client or B) providing ability to inject queries directly into sql_table source or even sql_database. I was thinking something for sql_database having a from_query would cover all esoteric edge cases. Again, this may not be the most well thought out soln and is maybe not even a strong usecase in an elt paradigm.

I'm currently leaning on just the ability to have columns selected and filters applied is good enough with potentially a query utility provided in later on for edge cases. In some senses I think anything you'd require a query for is best handled in silver / gold layer anyway and having the ability to select and filter covers bronze layers adequately.

How does the above sound? Looking forward to reading your thoughts!

rudolfix commented 3 months ago

@arjun-panchmatia-mechademy thanks for the feedback. so what I'd push to be implemented:

  1. a columns argument that accepts a list of columns to be selected. this will be configurable field so you'll be able to use toml/yaml to specify them as well
  2. for practical reasons I'd implement the filter as another fallback where you can add additional clauses to SELECT statements in SqlAlchemy. in essence user would add WHERE clauses like in here https://docs.sqlalchemy.org/en/20/tutorial/data_select.html#the-where-clause

so the callback would be

def filter_query(select: SELECT, table: TABLE) -> Select:
    return select.where(table.c.date == datetime.today())

I could not find an easy way to append SQL to an ORM expression. WDYT? we can push this to be implemented quite quickly

arjun-panchmatia-mechademy commented 3 months ago

I think I agree w/ you, it's probably best to implement filters as callbacks and have the columns data selector. It would be far too cumbersome to use any level of aggregation etc within the source / resource itself and is probably best handled by leveraging SQLAlchemy (or tbh that would be something you'd use DBT or Spark to handle in the silver and gold layers).

Thank you for pushing for these changes @rudolfix ! They're incredibly useful and we'd actually be using them for a big client the second they drop in a new version. Cheers :)

rudolfix commented 3 months ago

@arjun-panchmatia-mechademy side note: best way to implement aggregation etc. is to create a view and then select form it. added bonus is data type reflection.

rudolfix commented 3 months ago

@arjun-panchmatia-mechademy updates got merged to master