astronomer / astronomer-providers

Airflow Providers containing Deferrable Operators & Sensors from Astronomer
https://astronomer-providers.rtfd.io/
Apache License 2.0
137 stars 25 forks source link

Unable to use `failure` callable in SnowflakeSensorAsync when deferred #1528

Closed josh-fell closed 4 months ago

josh-fell commented 4 months ago

Describe the bug Version used: astronomer-providers==1.19.1

When using a failure callable to determine whether a SnowflakeSensorAsync task should fail based on the SQL result, the task fails stating the failure callable is not actually a callable. This failure only occurs when the SnowflakeSensorAsync task goes into a deferred state and the SQL check is handled by the Triggerer.

FWIW, the same setup with a failure callable works fine using the SqlSensor from the Common SQL provider though.

To Reproduce Steps to reproduce the behavior:

  1. Write a SnowflakeSensorAsync task with a configured failure callable and make sure it won't return after the first poke. Something like:
    SnowflakeSensorAsync(
    task_id=...,
    snowflake_conn_id=...
    sql=...,
    failure=lambda result: result == "failed",
    )
  2. Execute task and observe a similar exception message:
    File "/usr/local/lib/python3.11/site-packages/astronomer/providers/utils/sensor_util.py", line 27, in raise_error_or_skip_exception
    raise AirflowException(error_message)
    airflow.exceptions.AirflowException: self.failure is present, but not callable ->         lambda result: result == "failed"

Expected behavior When configured with a failure callable that is a callable type, the task should not fail but execute the failure callable to assess whether the task should indeed fail.

Additional context This is probably an issue with the success callable as well. My initial suspicion is that these callable parameters are getting serialized to the SnowflakeTrigger as strings and therefore not callable. Perhaps it would be better to run the success/failure callables as part of the sensor rather than the trigger? Meaning the SnowflakeTrigger pokes and awaits a result, and if there is a success/failure callable configured, that check is executed by the SnowflakeSensorAsync logic.