apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.13k stars 14.31k forks source link

PythonVirtualenvOperator with provide_context=True does not have 'ti' keyword #12985

Open oriolcmp opened 3 years ago

oriolcmp commented 3 years ago

Apache Airflow version: 1.10.13

Environment: Docker (Ubuntu 18.4 - Python 3.7)

What happened:

When we enable provide_context=True for PythonVirtualenvOperator and try to use the xcom_push to pass a variable I get this error:

File "/tmp/venv0upgqome/script.py", line 13, in push\n kwargs[\'ti\'].xcom_push(key=\'value from pusher 1\', value=value_1)\nKeyError: \'ti\'\n'

How to reproduce it:

import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonVirtualenvOperator

args = {
    'owner': 'Airflow',
    'start_date': airflow.utils.dates.days_ago(2),

}

dag = DAG('BAtatas', schedule_interval="@once", default_args=args)

def push(**kwargs):
    """Pushes an XCom without a specific target"""
    value_1 = [1, 2, 3]
    print("printing the kwargs!!!")
    print(kwargs)
    kwargs['ti'].xcom_push(key='value from pusher 1', value=value_1)

def push_by_returning(**kwargs):
    value_2 = {'a': 'b'}
    """Pushes an XCom without a specific target, just by returning it"""
    return value_2

def puller(**kwargs):
    value_2 = {'a': 'b'}
    value_1 = [1, 2, 3]
    """Pull all previously pushed XComs and check if the pushed values match the pulled values."""
    ti = kwargs['ti']

    # get value_1
    pulled_value_1 = ti.xcom_pull(key=None, task_ids='push')
    assert pulled_value_1 == value_1

    # get value_2
    pulled_value_2 = ti.xcom_pull(task_ids='push_by_returning')
    assert pulled_value_2 == value_2

    # get both value_1 and value_2
    pulled_value_1, pulled_value_2 = ti.xcom_pull(
        key=None, task_ids=['push', 'push_by_returning'])
    assert (pulled_value_1, pulled_value_2) == (value_1, value_2)

push1 = PythonVirtualenvOperator(
    task_id='push',
    dag=dag,
    python_callable=push,
    requirements=[],
    python_version='3.7',
    use_dill=False,
    provide_context=True,
    system_site_packages=True,

)

push2 = PythonVirtualenvOperator(
    task_id='push_by_returning',
    dag=dag,
    python_callable=push_by_returning,
    requirements=[],
    python_version='3.7',
    use_dill=False,
    provide_context=True,
    system_site_packages=True,

)

pull = PythonVirtualenvOperator(
    task_id='puller',
    dag=dag,
    python_callable=puller,
    requirements=[],
    python_version='3.7',
    use_dill=False,
    provide_context=True,
    system_site_packages=True,

)

pull << [push1, push2]
boring-cyborg[bot] commented 3 years ago

Thanks for opening your first issue here! Be sure to follow the issue template!

oriolcmp commented 3 years ago

Hello,

Do you have any news on this issue? It would be very useful for us to use it. Thanks,

oriol

kaxil commented 3 years ago

I have un-assigned myself so it is up for grabs :)

dsynkov commented 3 years ago

Sharing a few notes as I've run into this same issue:

RosterIn commented 3 years ago

I thought provide_context was removed from Python operators in Airflow 2?

uranusjr commented 3 years ago

It was, and what provide_context provides is now generally always available. So this issue (in 2.0) is more about there is no way to expose ti to the callable run by a PythonVirtualenvOperator.

xinbinhuang commented 3 years ago

Also found this Jira story (https://issues.apache.org/jira/browse/AIRFLOW-2738) that's still open that seems to be addressing the same problem.

Just a quick note: the community has migrated the issues tracking system from Jira to GitHub issues, so whatever left in Jira right now is likely outdated or inaccurate.

potiuk commented 3 years ago

Sound like a feature to add rather than bug-fix I am moving it to 2.2 milestone.

oriolcmp commented 3 years ago

Sorry but I don't think it shouldn't be a feature. It is a bug because If I'm not wrong all operators should be able to use XCom variables. And with this operator currently this is not possible. This operator could be very useful to deal with dependency contraints but without passing variables is kind of limited and it seems the only option left is to use DockerOperator.

oriolcmp commented 2 years ago

Hello,

After two years with this ticket open and after adding a fix like #19616 which seems was the same issue but in this case adding the "ti" as serializable would have fixed this ticket. I'm wondering which solution I'm suppose to use if I want to run dags with several python operators but avoid having a requirements file for all Dags? I'm thinking about DockerOperator as the only alternative solution but I thought PythonVirtualenvOperator was a simpler solution. However, nobody seems to be worried for the fact that it is currently not possible to use XCOMS with PythonVirtualenvOperator. Could you give me some hints on that? Thanks

uranusjr commented 2 years ago

Feel free to propose a PR to add ti to the list.

potiuk commented 2 years ago

@oriolcmp - it works here in the way that the fastest way to implement somethign that you really want to, is to implement it. I think waiting two years for something you want is far too long, but if no-one picked it up, then indeed implementing by you is way better.

dejavu284 commented 3 months ago

Are you going to solve this problem? It seems to be still relevant and it's very sad that such an important feature like XCom is not available in PythonVirtualenvOperator