apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.03k stars 14.28k forks source link

Airflow schedules tasks in wrong timezone with an MSSQL Metadata DB on a non-UTC server #21171

Closed mattinbits closed 10 months ago

mattinbits commented 2 years ago

Apache Airflow version

2.2.2

What happened

Airflow schedules a task an hour earlier than expected, when using an MSSQL metadata database where the DB server is set to the CET timezone. The screenshot below shows the DAG starting an hour before the end of the data interval.

image

What you expected to happen

Airflow schedules the task at the correct time in UTC.

How to reproduce

It's hard to describe a complete reproducible method since it relies on having an MSSQL Server with particular settings.

A relevant DAG would be a simple as:

with DAG(
    dag_id="example_dag",
    start_date=datetime(2021, 1, 1),
    schedule_interval="0 9 * * 1-5",
) as dag:
    task = DummyOperator(task_id="dummy")

And Airflow config of:

default_timezone = utc

This DAG would then be scheduled an hour earlier than expected.

Operating System

Redhat UBI 8

Versions of Apache Airflow Providers

No response

Deployment

Other Docker-based deployment

Deployment details

Airflow scheduler and webserver each running in a docker container based on Redhat UBI 8. Metadata DB is MSSQL Server running on a Windows Server where the server timezone is CET.

Anything else

In our installation, the problem is happening for any DAG with a UTC based schedule.

I believe the root cause is this line of code: https://github.com/apache/airflow/blob/6405d8f804e7cbd1748aa7eed65f2bbf0fcf022e/airflow/models/dag.py#L2872

On MSSQL, func.now() appears to correspond to GETDATE(), which returns the current time in the timezone of the DB server. But next_dagrun_create_after is stored in the database as UTC (in a datetime2 column, which doesn't include timezone information). So this line of code is equivalent to "Is the current time in CET before the next creation time in UTC?", meaning that a DAG that should start at 09:00 UTC starts at 09:00 CET instead, one hour early.

I can verify that func.now() returns CET with the SQLAlchemy code engine.execute(sa.select([sa.func.now()])).fetchall().

I think the correct way to get the current time in UTC on MSSQL is GETUTCDATE().

We ran Airflow 1.10 previously without seeing this problem. From what I can tell, in that version the date comparison is done on the application side rather than in the DB.

Are you willing to submit PR?

Code of Conduct

potiuk commented 2 years ago

Not the first time MySQL bites us. Seems that indeed people even created packages to handle it correctly https://github.com/spoqa/sqlalchemy-utc. I will assign you to this one :)

mattinbits commented 2 years ago

@potiuk just to check before I attempt a fix. Are you happy with a solution that uses sqlalchemy-utc for all DB backends, or is it preferrable to have a solution that handles the MSSQL case specifically, and leaves other DB backends unchanged? For example I see there are some DB specific utilities here https://github.com/apache/airflow/blob/aa2cb5545f09d694b9143b323efcd4f6b6c66e60/airflow/utils/sqlalchemy.py

potiuk commented 2 years ago

I think db-specific case will be better (if simple). We already have ~500 deps in Airflow total (including transitive) and while adding one more seems like no-biggie, adding a 'util' in Airlfow seems to be more "straightforward".

ashb commented 2 years ago

Is there a better datetime type to use that include and stores timezone?

mattinbits commented 2 years ago

There is such a type in MSSQL (datetimeoffset), but I don't think the problem here is with the stored type. As I understand it, Airflow always stores these values in UTC anyway, so the value stored in a naive date time column can be assumed to be UTC. I think the problem is with the function, whose result is being compared to the stored value. However I don't feel that I have a complete grasp on Airflow's internals so I could have it wrong.

uranusjr commented 2 years ago

So the problem is that MSSQL’s NOW() function returns a local datetime, resulting us comparing e.g. 9am UTC with 9:15 local? The analysis makes sense to me.

mattinbits commented 2 years ago

Yes, that's my take. More specifically, SQLAlchemy's now() function translates to MSSQL's GETDATE() function. It's easy to fix for MSSQL by calling GETUTCDATE(), just need to figure out the clean way to do that for MSSQL specifically, while leaving the other DB backends working.

mattinbits commented 2 years ago

More details on reproducibility. Start a SQL Server instance with timezone set to one hour ahead of UTC:

docker container run \
    --rm \
    --name mssql \
    --env 'ACCEPT_EULA=Y' \
    --env 'MSSQL_SA_PASSWORD=Mssqlpwd123' \
    --env 'TZ=Europe/Copenhagen' \
    --publish 1433:1433 \
    --detach mcr.microsoft.com/mssql/server:2019-CU5-ubuntu-18.04

Create the Airflow DB:

docker exec mssql /opt/mssql-tools/bin/sqlcmd -U sa -P Mssqlpwd123 -Q "CREATE DATABASE AIRFLOW;"
docker exec mssql /opt/mssql-tools/bin/sqlcmd -U sa -P Mssqlpwd123 -Q "ALTER DATABASE Airflow SET READ_COMMITTED_SNAPSHOT ON;"

Conn string:

sql_alchemy_conn = mssql+pyodbc://sa:Mssqlpwd123@localhost/airflow?driver=ODBC+Driver+17+for+SQL+Server

And a DAG. Change the cron to be less than one hour ahead of the current time in UTC:

from airflow.models.dag import DAG
from airflow.operators.dummy import DummyOperator
from datetime import datetime

dag = DAG(
    dag_id="example",
    start_date=datetime(2021, 1, 1),
    schedule_interval="00 22 * * *",
    catchup=False
)

with dag:
    task = DummyOperator(task_id="dummy")

Start Airflow, start the DAG, observe the premature DAG execution.

mattinbits commented 2 years ago

@ashb I experimented with using datetimeoffset rather than datetime2 but unfortunately it doesn't seem to help so long as we use func.now() mapping to GETDATE(). I get this surprising result:

CREATE TABLE foo (tz_dt datetimeoffset);

INSERT INTO foo VALUES
('2022-02-04 16:00 +00:00'),
('2022-02-04 16:00 +01:00');

SELECT tz_dt, GETDATE() as now FROM foo
WHERE tz_dt < GETDATE();

This returns a result asserting that 16:00 CET is before 15:34 CET:

tz_dt                                     |    now
------------------------------------------|-----------------------------
2022-02-04 16:00:00.0000000 +01:00        |    2022-02-04 15:34:17.117

I assume the datetimeoffset is being converted to a naive value in UTC (15:00) and then compared.

uranusjr commented 2 years ago

Maybe it’s easier to create a custom function that emit GETDATE() on MSSQL and NOW() everywhere else, and just use that instead of SQLAlchemy’s func.now(). Except I have no idea how to implement a function in SQLAlchemy :(

mattinbits commented 2 years ago

@potiuk As an initial fix for our internal system, I replaced func.now() with func.GETDATE() in an internal fork which is enough to fix it for SQL Server. This is not enough for cross-database compatibility of course. I have considered creating something similar to the functionality of sqlalchemy-utc to airflow.utils.sqlalchemy, but I had second thoughts whether this additional complexity is necessary. Do we need to rely on the DB's version of "now" in this case? Can we instead use timezone.utcnow(), i.e. let the application server decide what "now" is and pass it as a literal to the DB?

Here are two places where func.now() is used in a filter, exposing this issue: https://github.com/apache/airflow/blob/main/airflow/models/dagrun.py#L294 https://github.com/apache/airflow/blob/main/airflow/models/dag.py#L2872

I can see in other places that timezone.utcnow() is used: https://github.com/apache/airflow/blob/main/airflow/models/taskinstance.py#L293 https://github.com/apache/airflow/blob/main/airflow/models/trigger.py#L179

I'm not sure if there is a particular reason why func.now() is needed in the first two instances?

mattinbits commented 2 years ago

@uranusjr here is an example of implementing a custom SQLAlchemy function: https://github.com/spoqa/sqlalchemy-utc/blob/master/sqlalchemy_utc/now.py

This code could be almost what we need, if the fix should be a DB function, although it does custom things for SQLite and Mysql which are presumably not necessary in this case.

uranusjr commented 2 years ago

Yeah it seems like we need something slightly different from the implementation in SQLAlchemy-UTC.

It seems like Airflow currently renders func.now() as:

(Can anyone confirm this?)

I guess what we need is to change the SQL Server one to GETDATE()? That shouldn’t be too hard, and (I believe) SQLAlchemy lets us register the custom function to func as well.

mattinbits commented 2 years ago

@uranusjr is it the case then that you prefer a solution that continues to rely on the database to get the current timestamp, rather than adopt timezone.utcnow() and calculated the current timestamp within the application? Changing to calculate it on the application side would be the simpler fix, and would be consistent with the other places in the codebase that already work this way.

uranusjr commented 2 years ago

Personally I would prefer doing this in the database since it is how this is done right now, and switching to the application side could theoratically introduce incompatibilities since timezone calculating very unfortunately tend to be buggy across implementations.

MansiVerma777 commented 2 years ago

Hi, We are also encountering the same issue with airflow. We are using SQLServer backend which is not running on a UTC timezone. Since the db query that creates dagruns that need to be scheduled uses CURRENT TIMESTAMP, we are seeing scheduling lag of 7 hours since database runs in timezone which is 7 hours behind UTC. Any ETA for when the fix will be available?

gemunet commented 2 years ago

still no solution? I have the same problem which causes my dags not to be executed

rajvansia commented 1 year ago

@potiuk As an initial fix for our internal system, I replaced func.now() with func.GETDATE() in an internal fork which is enough to fix it for SQL Server. This is not enough for cross-database compatibility of course. I have considered creating something similar to the functionality of sqlalchemy-utc to airflow.utils.sqlalchemy, but I had second thoughts whether this additional complexity is necessary. Do we need to rely on the DB's version of "now" in this case? Can we instead use timezone.utcnow(), i.e. let the application server decide what "now" is and pass it as a literal to the DB?

Here are two places where func.now() is used in a filter, exposing this issue: https://github.com/apache/airflow/blob/main/airflow/models/dagrun.py#L294 https://github.com/apache/airflow/blob/main/airflow/models/dag.py#L2872

I can see in other places that timezone.utcnow() is used: https://github.com/apache/airflow/blob/main/airflow/models/taskinstance.py#L293 https://github.com/apache/airflow/blob/main/airflow/models/trigger.py#L179

I'm not sure if there is a particular reason why func.now() is needed in the first two instances?

@mattinbits thanks for the internal fix just to clarify do you mean replacing func.now() with func.GETUTCDATE() instead of func.GETDATE()? When I made those changes you suggested with func.GETUTCDATE() it worked.

schwartzpub commented 1 year ago

@potiuk As an initial fix for our internal system, I replaced func.now() with func.GETDATE() in an internal fork which is enough to fix it for SQL Server. This is not enough for cross-database compatibility of course. I have considered creating something similar to the functionality of sqlalchemy-utc to airflow.utils.sqlalchemy, but I had second thoughts whether this additional complexity is necessary. Do we need to rely on the DB's version of "now" in this case? Can we instead use timezone.utcnow(), i.e. let the application server decide what "now" is and pass it as a literal to the DB?

Here are two places where func.now() is used in a filter, exposing this issue: https://github.com/apache/airflow/blob/main/airflow/models/dagrun.py#L294 https://github.com/apache/airflow/blob/main/airflow/models/dag.py#L2872

I can see in other places that timezone.utcnow() is used: https://github.com/apache/airflow/blob/main/airflow/models/taskinstance.py#L293 https://github.com/apache/airflow/blob/main/airflow/models/trigger.py#L179

I'm not sure if there is a particular reason why func.now() is needed in the first two instances?

Thanks for this, was racking my brain trying to figure out why my DAGs were not running at the right time. I was about to accept that I had gone insane. Changing func.now() to func.GETUTCDATE() in dag.py and dagrun.py worked for me to get my schedules to run at the expected time.

Taragolis commented 1 year ago

In general speaking it is very risky use MS SQL as backend for Airflow in production. MS SQL backend it is experimental feature which covered by this article: https://airflow.apache.org/docs/apache-airflow/stable/release-process.html#experimental-features

Potentially if Airflow PMC decided that better just remove support of MS SQL it could be done in any new feature version of Airflow. And it could be done by different reasons:

Some personal suggestion for a long term under the spoiler ![image](https://user-images.githubusercontent.com/3998685/213920434-e353cdbd-9d6e-49e9-816a-2c10ea5e451a.png)
potiuk commented 1 year ago

Potentially if Airflow PMC decided that better just remove support of MS SQL

I am for it. MSSQL is experimental.

mattinbits commented 1 year ago

As someone who has been running Airflow on MSSQL in production for several years, I tend to agree that explicitly removing support would be better. We're planning to move away from it and I wouldn't recommend it to anyone else.

Although the Github README makes it clear that the support is experimental, I don't see this mentioned here, which I assume is probably most people's entry point: https://airflow.apache.org/docs/apache-airflow/stable/howto/set-up-database.html#choosing-database-backend

mattinbits commented 1 year ago

@potiuk As an initial fix for our internal system, I replaced func.now() with func.GETDATE() in an internal fork which is enough to fix it for SQL Server. This is not enough for cross-database compatibility of course. I have considered creating something similar to the functionality of sqlalchemy-utc to airflow.utils.sqlalchemy, but I had second thoughts whether this additional complexity is necessary. Do we need to rely on the DB's version of "now" in this case? Can we instead use timezone.utcnow(), i.e. let the application server decide what "now" is and pass it as a literal to the DB? Here are two places where func.now() is used in a filter, exposing this issue: https://github.com/apache/airflow/blob/main/airflow/models/dagrun.py#L294 https://github.com/apache/airflow/blob/main/airflow/models/dag.py#L2872 I can see in other places that timezone.utcnow() is used: https://github.com/apache/airflow/blob/main/airflow/models/taskinstance.py#L293 https://github.com/apache/airflow/blob/main/airflow/models/trigger.py#L179 I'm not sure if there is a particular reason why func.now() is needed in the first two instances?

@mattinbits thanks for the internal fix just to clarify do you mean replacing func.now() with func.GETUTCDATE() instead of func.GETDATE()? When I made those changes you suggested with func.GETUTCDATE() it worked.

You're right, it was mistake in my initial comment.

schwartzpub commented 1 year ago

As someone who has been running Airflow on MSSQL in production for several years, I tend to agree that explicitly removing support would be better. We're planning to move away from it and I wouldn't recommend it to anyone else.

Although the Github README makes it clear that the support is experimental, I don't see this mentioned here, which I assume is probably most people's entry point: https://airflow.apache.org/docs/apache-airflow/stable/howto/set-up-database.html#choosing-database-backend

That's how I wound up using MSSQL, it wasn't immediately clear that it was an experimental feature -- thankfully in our case we're still in PoC stage, so making the swap to postgres was painless. I appreciate the feedback, comments, and guidance.

Taragolis commented 1 year ago

Yep seems like inconsistent between README.md and Documentation.

Feel free to change it, you could do even do it by click on "Suggest a change on this page"

potiuk commented 10 months ago

Closing. MSSQL has just been removed from main