astronomer / astro-sdk

Astro SDK allows rapid and clean development of {Extract, Load, Transform} workflows using Python and SQL, powered by Apache Airflow.
https://astro-sdk-python.rtfd.io/
Apache License 2.0
340 stars 42 forks source link

PoC: Expose aql.transform as traditional Airflow operator #1855

Open tatiana opened 1 year ago

tatiana commented 1 year ago

Context

As of 1.5.3, some existing Python SDK features are exposed as traditional Airflow operators (LoadFileOperator and ExportFileOperator). So far, we have yet to encourage them to be used this way, but we're changing this as part of https://github.com/astronomer/astro-sdk/issues/1853.

Some of the existing Astro SDK features may need more work to be exposed as traditional Airflow Operators. The goal of this ticket is to do a PoC to have not only the decorator aql.trasform, but also a DataframeOperator.

Some challenges may include the following:

This ticket is a follow-up to a conversation with @manmeetkaur .

Proposal

from datetime import datetime
from airflow import DAG
from astro import sql as aql

imdb_file = File("https://raw.githubusercontent.com/astronomer/astro-sdk/main/tests/data/imdb_v2.csv")
imdb_table = Table(conn_id="sqlite_default")
popular_animations_table = Table(name="top_five_animations")

with DAG(
    "calculate_popular_movies",
    schedule_interval=None,
    start_date=datetime(2000, 1, 1),
    catchup=False,
) as dag:

    load_file = LoadFileOperator(
        input_file=imdb_file,
        output_table=imdb_table
    )

    transform_table = TransformOperator(
        sql = """
            SELECT title, rating
            FROM {{input_table}}
            WHERE genre1='Animation'
            ORDER BY Rating desc
            LIMIT 5;
        """,
        input_table = imdb_table,
        output_table = popular_animations_table
    )

    load_file >> transform_table    

Acceptance criteria

manmeetkaur commented 1 year ago

@tatiana I can take up this ticket if it is okay with you.

tatiana commented 1 year ago

That's amazing, @manmeetkaur . Thank you very much. Please, go ahead and reach out if you have any questions!

manmeetkaur commented 1 year ago

@tatiana just one correction, as we discussed in our meeting, the "decorator" category operators including @dataframe, @transform, @run_raw_sql have special logic included to make them work. That could be de-prioritized and we can focus on the low-hanging fruit of the these operators load_file, append , drop_table , export_to_file , merge , transform_file , check_column , check_table , get_file_list. Thoughts?

manmeetkaur commented 1 year ago

Tested Code:

from typing import Callable
import os
from datetime import datetime
from airflow import DAG
from astro import sql as aql
from astro.files import File
from astro.constants import FileType
from astro.sql.table import Metadata, Table
from astro.sql import LoadFileOperator, AppendOperator, DropTableOperator, ExportToFileOperator, MergeOperator, TransformOperator, ColumnCheckOperator, SQLCheckOperator

imdb_file = File("https://raw.githubusercontent.com/astronomer/astro-sdk/main/tests/data/imdb_v2.csv")
imdb_table = Table(name="imdb_data", conn_id="snowflake")
# popular_animations_table = Table(name="top_five_animations")
target_table = Table(name="target_imdb_data", conn_id="snowflake")
final_table = Table(name="final_imdb_data", conn_id="snowflake")
out_file = File(path=os.path.join(os.getcwd(), 'out.csv'))

with DAG(
    "calculate_popular_movies",
    schedule_interval=None,
    start_date=datetime(2000, 1, 1),
    catchup=False,
) as dag:

    load_file = LoadFileOperator(
        input_file=imdb_file,
        output_table=imdb_table
    )

    append = AppendOperator(
        target_table = target_table,
        source_table = imdb_table,
        # columns = ["X", "TITLE", "RATING", "TOTALVOTES", "GENRE1", "GENRE2", "GENRE3", "METACRITIC", "BUDGET", "RUNTIME", "CVOTES10", "CVOTES09", "CVOTES08", "CVOTES07", "CVOTES06", "CVOTES05", "CVOTES04", "CVOTES03", "CVOTES02", "CVOTES01", "CVOTESMALE", "CVOTESFEMALE", "CVOTESU18", "CVOTESU18M", "CVOTESU18F", "CVOTES1829", "CVOTES1829M", "CVOTES1829F", "CVOTES3044", "CVOTES3044M", "CVOTES3044F", "CVOTES45A", "CVOTES45AM", "CVOTES45AF", "CVOTES1000", "CVOTESUS", "CVOTESNUS", "VOTESM", "VOTESF", "VOTESU18", "VOTESU18M", "VOTESU18F", "VOTES1829", "VOTES1829M", "VOTES1829F", "VOTES3044", "VOTES3044M", "VOTES3044F", "VOTES45A", "VOTES45AM", "VOTES45AF", "VOTESIMDB", "VOTES1000", "VOTESUS", "VOTESNUS", "DOMESTIC", "FOREIGN", "WORLDWIDE"]
    )

    ## this throws an error AttributeError: 'LoadFileOperator' object has no attribute 'conn_id' because the Table returned by LoadFileOperator has no conn_id attribute - 
    ## shouldn't it auto-use the conn_id of the input Table object?
    # drop = DropTableOperator(
    #     table=load_file
    # )

    export_to_file = ExportToFileOperator(
        task_id = 'export_file',
        output_file=out_file,
        input_data=target_table,
        if_exists="replace"
    )

    merge = MergeOperator(
        target_table = target_table,
        source_table = imdb_table,
        target_conflict_columns=["X"],
        if_conflicts="ignore",
        columns = ["X", "TITLE", "RATING", "TOTALVOTES", "GENRE1", "GENRE2", "GENRE3", "METACRITIC", "BUDGET", "RUNTIME", "CVOTES10", "CVOTES09", "CVOTES08", "CVOTES07", "CVOTES06", "CVOTES05", "CVOTES04", "CVOTES03", "CVOTES02", "CVOTES01", "CVOTESMALE", "CVOTESFEMALE", "CVOTESU18", "CVOTESU18M", "CVOTESU18F", "CVOTES1829", "CVOTES1829M", "CVOTES1829F", "CVOTES3044", "CVOTES3044M", "CVOTES3044F", "CVOTES45A", "CVOTES45AM", "CVOTES45AF", "CVOTES1000", "CVOTESUS", "CVOTESNUS", "VOTESM", "VOTESF", "VOTESU18", "VOTESU18M", "VOTESU18F", "VOTES1829", "VOTES1829M", "VOTES1829F", "VOTES3044", "VOTES3044M", "VOTES3044F", "VOTES45A", "VOTES45AM", "VOTES45AF", "VOTESIMDB", "VOTES1000", "VOTESUS", "VOTESNUS", "DOMESTIC", "FOREIGN", "WORLDWIDE"]
    )

    transform = TransformOperator(
        sql='my_sql.sql',
        python_callable=lambda: ('my_sql.sql', {"input_table": target_table}),
        parameters={"input_table": target_table},
        op_kwargs={"output_table": final_table}
    )

    drop = DropTableOperator(
        table=imdb_table
    )

    column_check = ColumnCheckOperator(
        dataset=target_table,
        column_mapping={
            "x": {"null_check": {"geq_to": 0, "leq_to": 1}},
            "title": {
                "null_check": {
                    "equal_to": 0,
                },
            },
        }
    )

    table_check = SQLCheckOperator(
        dataset=final_table,
        checks={
            "row_count": {"check_statement": "count(*) > 10"}
        }
    )

    load_file >> append >> export_to_file >> merge >> drop >> transform >> column_check >> table_check

    aql.cleanup()