Nike-Inc / brickflow

Pythonic Programming Framework to orchestrate jobs in Databricks Workflow
https://engineering.nike.com/brickflow/
Apache License 2.0
183 stars 36 forks source link

[FEATURE] `TaskDependencySensor` improvements #101

Open maxim-mityutko opened 5 months ago

maxim-mityutko commented 5 months ago

Is your feature request related to a problem? Please describe.

  1. Consistent approach is required across all sensors to determine the time that will be used as a base for calculating whether or not upstream criteria are fulfilled. This approach should make sensor executions reproducable if brickflow_start_time / brickflow_start_date custom parameters are set manually or job is scheduled and the execution date will be derived from CRON expression. Currently TaskDependencySensor is using datetime.now() approach: https://github.com/Nike-Inc/brickflow/blob/6ba515b9eb291195089fd54ea701cfcb0cd97838/brickflow_plugins/airflow/operators/external_tasks.py#L324-L326

  2. From the sensor usage perspective, the user does not care if the upstream DAG failed or not, what matters is the ability to trigger downstream tasks when upstream dependency is fulfilled, even if it was delayed due to the need to restart the failed DAG. That means that failing the task and the workflow is not a desired scenario for the operator usage. https://github.com/Nike-Inc/brickflow/blob/6ba515b9eb291195089fd54ea701cfcb0cd97838/brickflow_plugins/airflow/operators/external_tasks.py#L333-L337

  3. Along the same lines as (2), failing the sensor if the upstream execution is not found is not a desirable flow: https://github.com/Nike-Inc/brickflow/blob/6ba515b9eb291195089fd54ea701cfcb0cd97838/brickflow_plugins/airflow/operators/external_tasks.py#L261-L264 The argument that the execution is always created by the Airflow, even if it not yet started, is not valid, because if the DAG paramers depends_on_past is used, new executions won't be created unless older ones are succesfull.

Cloud Information

Describe the solution you'd like

  1. Use context["execution_date"] which is available for Airflow operators or use brickflow_start_time from the Brickflow context.
  2. Continue poking upstream.
  3. Log that upstream execution is not found and continue poking.

Describe alternatives you've considered

Additional context