apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.41k stars 14.36k forks source link

Not Showing next run in Airflow UI for a DAG scheduled using custom TimeTable #43262

Open vaibhavg-DA opened 1 month ago

vaibhavg-DA commented 1 month ago

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.6.2

What happened?

I have created a DAG which I want to schedule on 18th every month if 18th falls on [Mon, Tue, Wed, Thu] else schedule it on next Monday. This is how I have created my MonthlyProdTimetable.

`from datetime import timedelta import datetime as dt from typing import Optional import pendulum from pendulum import Date, DateTime, Time, timezone from airflow.utils.timezone import convert_to_utc, make_aware, make_naive

from airflow.plugins_manager import AirflowPlugin from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable

class MonthlyProdTimetable(Timetable):

def infer_manual_data_interval(self, run_after: DateTime)-> DataInterval:
    return DataInterval.exact(run_after)

def _get_next(self, current: DateTime) -> DateTime:
    print(f"current == {current}")
    eighteenth = pendulum.datetime(current.year, current.month, 18, 10,0,0, tz='America/Chicago')
    if eighteenth.day_of_week in range(0,4):
        cur_month_sched_dt = eighteenth
    else:
        cur_month_sched_dt = eighteenth.next(pendulum.MONDAY).at(10,0,0).in_timezone('America/Chicago')

    if current <= cur_month_sched_dt:
        return cur_month_sched_dt
    else:
        next_month_eighteenth = current.add(months=1).date(18).at(10,0,0).in_timezone('America/Chicago')
        if next_month_eighteenth.day_of_week in range(0,4):
            cur_month_sched_dt = next_month_eighteenth
        else:
            cur_month_sched_dt = next_month_eighteenth.next(pendulum.MONDAY).at(10,0,0).in_timezone('America/Chicago')
        return cur_month_sched_dt

def next_dagrun_info(
        self,
        *,
        last_automated_data_interval:Optional[DataInterval],
        restriction: TimeRestriction,
) -> Optional[DagRunInfo]:
    print("TimeTable has been called!")
    if not restriction.catchup:
        start_time_candidates = [self._get_next(pendulum.now("America/Chicago"))]
        if last_automated_data_interval is not None:
            start_time_candidates.append(self._get_next(last_automated_data_interval.end))
        if restriction.earliest is not None:
            start_time_candidates.append(self._get_next(restriction.earliest))
        next_start_time = max(start_time_candidates)
        print(f"This is next_start_time == {next_start_time}")
    if restriction.latest is not None and restriction.latest < next_start_time:
        return None
    return DagRunInfo.exact(next_start_time)

class MonthlyTimeTablePlugin(AirflowPlugin): name = "monthly_timetable_plugin" timetables = [MonthlyProdTimetable]`

In the Airflow UI I cannot see what is the Next Run for this DAG. image

What am I missing here?

What you think should happen instead?

Airflow UI should show Next Run for this DAG

How to reproduce

Will update

Operating System

On Docker

Versions of Apache Airflow Providers

No response

Deployment

Docker-Compose

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 1 month ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.