astronomer / astro-sdk

Astro SDK allows rapid and clean development of {Extract, Load, Transform} workflows using Python and SQL, powered by Apache Airflow.
https://astro-sdk-python.rtfd.io/
Apache License 2.0
350 stars 43 forks source link

Input table not converted to DataFrame in `@dataframe` tasks #2179

Open josh-fell opened 4 months ago

josh-fell commented 4 months ago

Describe the bug When attempting to use an input Table object as an input for a @dataframe-decorated task, the input object is not converted to a DataFrame. This results in task failures and AttributeError exceptions.

Version

To Reproduce This toy example DAG can be used to reproduce the issue:

from __future__ import annotations

from typing import TYPE_CHECKING

from airflow.decorators import dag
from astro.sql import dataframe, transform
from astro.table import Metadata, Table

if TYPE_CHECKING:
    from pandas import DataFrame

@dag(schedule=None)
def sdk():
    @transform
    def get_data() -> str:
        return "SELECT 1 AS blah"

    @dataframe
    def view_data(this: DataFrame) -> None:
        print(this.head())

    data = get_data(
        conn_id="snowflake", output_table=Table(metadata=Metadata(database="cs"), conn_id="snowflake")
    )
    view_data(this=data)

sdk()

When the "view_data" task runs it will fail with the follow exception:

[2024-07-16, 20:14:21 UTC] {taskinstance.py:2905} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 465, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 432, in _execute_callable
    return execute_callable(context=context, **execute_callable_kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 400, in wrapper
    return func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/astro/sql/operators/dataframe.py", line 173, in execute
    function_output = self.python_callable(*self.op_args, **self.op_kwargs)
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/dags/test.py", line 21, in view_data
    print(this.head())
          ^^^^^^^^^
AttributeError: 'TempTable' object has no attribute 'head'

Expected behavior When passing in a Table object to @dataframe-decorated task the input object should be transformed to a DataFrame and all typical, DataFrame-related operations should be allowed.

Screenshots

image

Additional context Whether updating the type annotations for the "view_data" function to DataFrame or Table or TempTable, the exception still persists.