apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
35.21k stars 13.76k forks source link

openlineage: Non-local executor's initializer breaking the Airflow DB connection #40403

Closed kacpermuda closed 1 day ago

kacpermuda commented 4 days ago

Apache Airflow Provider(s)

openlineage

Versions of Apache Airflow Providers

apache-airflow-providers-openlineage==1.9.0rc1

or simply main branch for now.

Apache Airflow version

main branch (breeze), 2.9.1 (Astro)

Operating System

MacOS Sonoma 14.5

Deployment

Other

Deployment details

I tested this both on breeze and Astro cloud.

What happened

Changing the initializer from local function ProcessPoolExecutor to a separate function (implemented in #40353), to avoid pickling error in airflow standalone described in #40309, indeed works for airflow standalone, but it's causing the scheduler crash when trying to build AirflowStateRunFacet (introduced in #39520) when it's calling the Airflow DB.

Attaching some scheduler logs i got when running a simple DAG multiple times: error2.txt error3.txt error4.txt error5.txt

I've also seen this "Unexpected commit" error being raised, but don't have logs saved for that.

What you think should happen instead

I do not fully understand what's happenning there when initializing the executor, and why such a small change can cause problems here.

How to reproduce

Run some DAG with apache-airflow-providers-openlineage==1.9.0rc1 installed, f.e.

import datetime as dt

from airflow import DAG
from airflow.operators.bash import BashOperator

with DAG(
    dag_id='dag',
    start_date=dt.datetime(2024, 5, 21),
    schedule=None
) as dag:
    task_0 = BashOperator(
        task_id="task_0",
        bash_command="exit 0;",
    )

    task_1 = BashOperator(
        task_id="task_1",
        bash_command="exit 1;"
    )

You should receive no DAG level event after dagrun completion and the scheduler will crash.

Anything else

I prepared a PR reverting the change that's causing the problem: #40402, but i want to understand what's happenning and how I can prepare the fix that will actually work.

Are you willing to submit PR?

Code of Conduct

kacpermuda commented 4 days ago

@JDarDagran @mobuchowski @potiuk - Maybe you guys will be able to shed some light on that case for me. Thanks 😄

potiuk commented 4 days ago

It looks like it might be related to a forking model. Seems that when DB connection is open and process is forked after that, the postgres driver will attempt to continue established connection from the master process and fail (or something similar)

See similar issue here https://github.com/psycopg/psycopg2/issues/281

Actually after close looking at the change - it seems to be wrong. Initializer should be a callable (and after the change it is RESULT of calling the _executor_initializer - which is essentially None.

initializer=initializer -> initializer=_executor_initializer()

kacpermuda commented 4 days ago

I can't believe i missed that parenthesis there, probably added automatically by Pycharm's autocompletion 🥲 I need to buy a duck then, maybe it was all good 🤦 Will verify that tomorrow. Thanks !

potiuk commented 4 days ago

I can't believe i missed that parenthesis there, probably added automatically by Pycharm's autocompletion 🥲 I need to buy a duck then, maybe it was all good 🤦 Will verify that tomorrow. Thanks !

Happens. But reviewer missed it too 🙁