apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.31k stars 14.34k forks source link

Using the string ".json" in a dag makes KubernetesPodOperator worker unable to trigger the pod #16922

Closed louison closed 3 years ago

louison commented 3 years ago

Apache Airflow version: 2.1.1

Kubernetes version (if you are using kubernetes) (use kubectl version):

Client Version: version.Info{Major:"1", Minor:"21", GitVersion:"v1.21.1", GitCommit:"5e58841cce77d4bc13713ad2b91fa0d961e69192", GitTreeState:"clean", BuildDate:"2021-05-12T14:11:29Z", GoVersion:"go1.16.3", Compiler:"gc", Platform:"darwin/amd64"}
Server Version: version.Info{Major:"1", Minor:"18", GitVersion:"v1.18.2", GitCommit:"52c56ce7a8272c798dbc29846288d7cd9fbae032", GitTreeState:"clean", BuildDate:"2020-04-16T11:48:36Z", GoVersion:"go1.13.9", Compiler:"gc", Platform:"linux/amd64"}
WARNING: version difference between client (1.21) and server (1.18) exceeds the supported minor version skew of +/-1

Environment:

What happened:

While trying to write a simple dag with KubernetesPodExecutor, I noticed that in certain cases, the pod is launched but not always. By investigating a bit more, I found that when the string ".json" is present in parameters of the KubernetesPodOperator, it will not work. I tried to set up a minimal example to reproduce the bug. I manage to reproduce the bug on my kubernetes cluster and my Airflow instance (if it can help)

import datetime

import airflow
from airflow.utils.dates import days_ago
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import \
    KubernetesPodOperator

DAG_NAME = "trigger_test"
default_args = {
    "owner": "Rapsodie Data",
    "depends_on_past": False,
    "wait_for_downstream": False,
    "email": [""],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 0,
    "retry_delay": datetime.timedelta(minutes=20),
}
with airflow.DAG(
        "michel",
        catchup=False,
        default_args=default_args,
        start_date=days_ago(1),
        schedule_interval="*/10 * * * *",
) as dag:
    kubernetes_min_pod_json = KubernetesPodOperator(
        # The ID specified for the task.
        task_id='pod-ex-minimum_json',
        name='pod-ex-minimum_json',
        cmds=['echo'],
        namespace='default',
        arguments=["vivi.json"],
        image='gcr.io/gcp-runtimes/ubuntu_18_0_4'
    )
    kubernetes_min_pod_txt = KubernetesPodOperator(
        # The ID specified for the task.
        task_id='pod-ex-minimum_txt',
        name='pod-ex-minimum_txt',
        cmds=['echo'],
        namespace='default',
        arguments=["vivi.txt"],
        image='gcr.io/gcp-runtimes/ubuntu_18_0_4'
    )
    kubernetes_min_pod_json
    kubernetes_min_pod_txt

No error message or log to give here. Here is the logs of the scheduler while trying to execute one run:

[2021-07-10 14:30:49,356] {kubernetes_executor.py:147} INFO - Event: michelpodexminimumtxt.36d56ddf03e544669100f7a99657db6d had an event of type MODIFIED
[2021-07-10 14:30:49,356] {kubernetes_executor.py:205} INFO - Event: michelpodexminimumtxt.36d56ddf03e544669100f7a99657db6d Succeeded
[2021-07-10 14:30:49,996] {kubernetes_executor.py:368} INFO - Attempting to finish pod; pod_id: michelpodexminimumtxt.36d56ddf03e544669100f7a99657db6d; state: None; annotations: {'dag_id': 'michel', 'task_id': 'pod-ex-minimum_txt', 'execution_date': '2021-07-10T14:20:00+00:00', 'try_number': '1'}
[2021-07-10 14:30:50,004] {kubernetes_executor.py:546} INFO - Changing state of (TaskInstanceKey(dag_id='michel', task_id='pod-ex-minimum_txt', execution_date=datetime.datetime(2021, 7, 10, 14, 20, tzinfo=tzlocal()), try_number=1), None, 'michelpodexminimumtxt.36d56ddf03e544669100f7a99657db6d', 'default', '56653001583') to None
[2021-07-10 14:30:50,006] {scheduler_job.py:1222} INFO - Executor reports execution of michel.pod-ex-minimum_txt execution_date=2021-07-10 14:20:00+00:00 exited with status None for try_number 1
[2021-07-10 14:31:00,478] {scheduler_job.py:964} INFO - 2 tasks up for execution:
    <TaskInstance: michel.pod-ex-minimum_txt 2021-07-10 14:30:59.199174+00:00 [scheduled]>
    <TaskInstance: michel.pod-ex-minimum_json 2021-07-10 14:30:59.199174+00:00 [scheduled]>
[2021-07-10 14:31:00,483] {scheduler_job.py:993} INFO - Figuring out tasks to run in Pool(name=default_pool) with 128 open slots and 2 task instances ready to be queued
[2021-07-10 14:31:00,483] {scheduler_job.py:1021} INFO - DAG michel has 0/16 running and queued tasks
[2021-07-10 14:31:00,484] {scheduler_job.py:1021} INFO - DAG michel has 1/16 running and queued tasks
[2021-07-10 14:31:00,484] {scheduler_job.py:1086} INFO - Setting the following tasks to queued state:
    <TaskInstance: michel.pod-ex-minimum_txt 2021-07-10 14:30:59.199174+00:00 [scheduled]>
    <TaskInstance: michel.pod-ex-minimum_json 2021-07-10 14:30:59.199174+00:00 [scheduled]>
[2021-07-10 14:31:00,492] {scheduler_job.py:1128} INFO - Sending TaskInstanceKey(dag_id='michel', task_id='pod-ex-minimum_txt', execution_date=datetime.datetime(2021, 7, 10, 14, 30, 59, 199174, tzinfo=Timezone('UTC')), try_number=1) to executor with priority 1 and queue default
[2021-07-10 14:31:00,492] {base_executor.py:82} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'michel', 'pod-ex-minimum_txt', '2021-07-10T14:30:59.199174+00:00', '--local', '--pool', 'default_pool', '--subdir', '/opt/airflow/dags/repo/dags/k8s.py']
[2021-07-10 14:31:00,493] {scheduler_job.py:1128} INFO - Sending TaskInstanceKey(dag_id='michel', task_id='pod-ex-minimum_json', execution_date=datetime.datetime(2021, 7, 10, 14, 30, 59, 199174, tzinfo=Timezone('UTC')), try_number=1) to executor with priority 1 and queue default
[2021-07-10 14:31:00,493] {base_executor.py:82} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'michel', 'pod-ex-minimum_json', '2021-07-10T14:30:59.199174+00:00', '--local', '--pool', 'default_pool', '--subdir', '/opt/airflow/dags/repo/dags/k8s.py']
[2021-07-10 14:31:00,498] {kubernetes_executor.py:504} INFO - Add task TaskInstanceKey(dag_id='michel', task_id='pod-ex-minimum_txt', execution_date=datetime.datetime(2021, 7, 10, 14, 30, 59, 199174, tzinfo=Timezone('UTC')), try_number=1) with command ['airflow', 'tasks', 'run', 'michel', 'pod-ex-minimum_txt', '2021-07-10T14:30:59.199174+00:00', '--local', '--pool', 'default_pool', '--subdir', '/opt/airflow/dags/repo/dags/k8s.py'] with executor_config {}
[2021-07-10 14:31:00,500] {kubernetes_executor.py:504} INFO - Add task TaskInstanceKey(dag_id='michel', task_id='pod-ex-minimum_json', execution_date=datetime.datetime(2021, 7, 10, 14, 30, 59, 199174, tzinfo=Timezone('UTC')), try_number=1) with command ['airflow', 'tasks', 'run', 'michel', 'pod-ex-minimum_json', '2021-07-10T14:30:59.199174+00:00', '--local', '--pool', 'default_pool', '--subdir', '/opt/airflow/dags/repo/dags/k8s.py'] with executor_config {}
[2021-07-10 14:31:00,503] {kubernetes_executor.py:292} INFO - Kubernetes job is (TaskInstanceKey(dag_id='michel', task_id='pod-ex-minimum_txt', execution_date=datetime.datetime(2021, 7, 10, 14, 30, 59, 199174, tzinfo=Timezone('UTC')), try_number=1), ['airflow', 'tasks', 'run', 'michel', 'pod-ex-minimum_txt', '2021-07-10T14:30:59.199174+00:00', '--local', '--pool', 'default_pool', '--subdir', '/opt/airflow/dags/repo/dags/k8s.py'], None, None)
[2021-07-10 14:31:00,558] {kubernetes_executor.py:147} INFO - Event: michelpodexminimumtxt.a291f1d7ffeb45abb86c51c9b7b5a95b had an event of type ADDED
[2021-07-10 14:31:00,558] {scheduler_job.py:1222} INFO - Executor reports execution of michel.pod-ex-minimum_txt execution_date=2021-07-10 14:30:59.199174+00:00 exited with status queued for try_number 1
[2021-07-10 14:31:00,559] {kubernetes_executor.py:200} INFO - Event: michelpodexminimumtxt.a291f1d7ffeb45abb86c51c9b7b5a95b Pending
[2021-07-10 14:31:00,559] {scheduler_job.py:1222} INFO - Executor reports execution of michel.pod-ex-minimum_json execution_date=2021-07-10 14:30:59.199174+00:00 exited with status queued for try_number 1
[2021-07-10 14:31:00,563] {kubernetes_executor.py:147} INFO - Event: michelpodexminimumtxt.a291f1d7ffeb45abb86c51c9b7b5a95b had an event of type MODIFIED
[2021-07-10 14:31:00,563] {kubernetes_executor.py:200} INFO - Event: michelpodexminimumtxt.a291f1d7ffeb45abb86c51c9b7b5a95b Pending
[2021-07-10 14:31:00,576] {scheduler_job.py:1249} INFO - Setting external_id for <TaskInstance: michel.pod-ex-minimum_json 2021-07-10 14:30:59.199174+00:00 [queued]> to 1
[2021-07-10 14:31:00,577] {scheduler_job.py:1249} INFO - Setting external_id for <TaskInstance: michel.pod-ex-minimum_txt 2021-07-10 14:30:59.199174+00:00 [queued]> to 1
[2021-07-10 14:31:00,621] {kubernetes_executor.py:147} INFO - Event: michelpodexminimumtxt.a291f1d7ffeb45abb86c51c9b7b5a95b had an event of type MODIFIED
[2021-07-10 14:31:00,622] {kubernetes_executor.py:200} INFO - Event: michelpodexminimumtxt.a291f1d7ffeb45abb86c51c9b7b5a95b Pending
[2021-07-10 14:31:00,719] {kubernetes_executor.py:292} INFO - Kubernetes job is (TaskInstanceKey(dag_id='michel', task_id='pod-ex-minimum_json', execution_date=datetime.datetime(2021, 7, 10, 14, 30, 59, 199174, tzinfo=Timezone('UTC')), try_number=1), ['airflow', 'tasks', 'run', 'michel', 'pod-ex-minimum_json', '2021-07-10T14:30:59.199174+00:00', '--local', '--pool', 'default_pool', '--subdir', '/opt/airflow/dags/repo/dags/k8s.py'], None, None)
[2021-07-10 14:31:00,752] {kubernetes_executor.py:147} INFO - Event: michelpodexminimumjson.db72c28bed7e4d0cad6cf8594bcbd4f1 had an event of type ADDED
[2021-07-10 14:31:00,752] {kubernetes_executor.py:200} INFO - Event: michelpodexminimumjson.db72c28bed7e4d0cad6cf8594bcbd4f1 Pending
[2021-07-10 14:31:00,769] {kubernetes_executor.py:147} INFO - Event: michelpodexminimumjson.db72c28bed7e4d0cad6cf8594bcbd4f1 had an event of type MODIFIED
[2021-07-10 14:31:00,770] {kubernetes_executor.py:200} INFO - Event: michelpodexminimumjson.db72c28bed7e4d0cad6cf8594bcbd4f1 Pending
[2021-07-10 14:31:00,870] {kubernetes_executor.py:147} INFO - Event: michelpodexminimumjson.db72c28bed7e4d0cad6cf8594bcbd4f1 had an event of type MODIFIED
[2021-07-10 14:31:00,871] {kubernetes_executor.py:200} INFO - Event: michelpodexminimumjson.db72c28bed7e4d0cad6cf8594bcbd4f1 Pending
[2021-07-10 14:31:03,961] {kubernetes_executor.py:147} INFO - Event: michelpodexminimumtxt.a291f1d7ffeb45abb86c51c9b7b5a95b had an event of type MODIFIED
[2021-07-10 14:31:03,961] {kubernetes_executor.py:200} INFO - Event: michelpodexminimumtxt.a291f1d7ffeb45abb86c51c9b7b5a95b Pending
[2021-07-10 14:31:05,538] {kubernetes_executor.py:147} INFO - Event: michelpodexminimumjson.db72c28bed7e4d0cad6cf8594bcbd4f1 had an event of type MODIFIED
[2021-07-10 14:31:05,542] {kubernetes_executor.py:200} INFO - Event: michelpodexminimumjson.db72c28bed7e4d0cad6cf8594bcbd4f1 Pending
[2021-07-10 14:31:07,092] {kubernetes_executor.py:147} INFO - Event: michelpodexminimumtxt.a291f1d7ffeb45abb86c51c9b7b5a95b had an event of type MODIFIED
[2021-07-10 14:31:07,092] {kubernetes_executor.py:200} INFO - Event: michelpodexminimumtxt.a291f1d7ffeb45abb86c51c9b7b5a95b Pending
[2021-07-10 14:31:08,163] {kubernetes_executor.py:147} INFO - Event: michelpodexminimumtxt.a291f1d7ffeb45abb86c51c9b7b5a95b had an event of type MODIFIED
[2021-07-10 14:31:08,164] {kubernetes_executor.py:208} INFO - Event: michelpodexminimumtxt.a291f1d7ffeb45abb86c51c9b7b5a95b is Running
[2021-07-10 14:31:08,818] {kubernetes_executor.py:147} INFO - Event: michelpodexminimumjson.db72c28bed7e4d0cad6cf8594bcbd4f1 had an event of type MODIFIED
[2021-07-10 14:31:08,820] {kubernetes_executor.py:200} INFO - Event: michelpodexminimumjson.db72c28bed7e4d0cad6cf8594bcbd4f1 Pending
[2021-07-10 14:31:09,924] {kubernetes_executor.py:147} INFO - Event: michelpodexminimumjson.db72c28bed7e4d0cad6cf8594bcbd4f1 had an event of type MODIFIED
[2021-07-10 14:31:09,925] {kubernetes_executor.py:208} INFO - Event: michelpodexminimumjson.db72c28bed7e4d0cad6cf8594bcbd4f1 is Running
[2021-07-10 14:31:28,861] {dagrun.py:429} ERROR - Marking run <DagRun michel @ 2021-07-10 14:30:59.199174+00:00: manual__2021-07-10T14:30:59.199174+00:00, externally triggered: True> failed
[2021-07-10 14:31:45,227] {kubernetes_executor.py:147} INFO - Event: michelpodexminimumjson.db72c28bed7e4d0cad6cf8594bcbd4f1 had an event of type MODIFIED
[2021-07-10 14:31:45,227] {kubernetes_executor.py:205} INFO - Event: michelpodexminimumjson.db72c28bed7e4d0cad6cf8594bcbd4f1 Succeeded
[2021-07-10 14:31:45,454] {kubernetes_executor.py:368} INFO - Attempting to finish pod; pod_id: michelpodexminimumjson.db72c28bed7e4d0cad6cf8594bcbd4f1; state: None; annotations: {'dag_id': 'michel', 'task_id': 'pod-ex-minimum_json', 'execution_date': '2021-07-10T14:30:59.199174+00:00', 'try_number': '1'}
[2021-07-10 14:31:45,459] {kubernetes_executor.py:546} INFO - Changing state of (TaskInstanceKey(dag_id='michel', task_id='pod-ex-minimum_json', execution_date=datetime.datetime(2021, 7, 10, 14, 30, 59, 199174, tzinfo=tzlocal()), try_number=1), None, 'michelpodexminimumjson.db72c28bed7e4d0cad6cf8594bcbd4f1', 'default', '56653030468') to None
[2021-07-10 14:31:45,463] {scheduler_job.py:1222} INFO - Executor reports execution of michel.pod-ex-minimum_json execution_date=2021-07-10 14:30:59.199174+00:00 exited with status None for try_number 1
[2021-07-10 14:31:47,817] {kubernetes_executor.py:147} INFO - Event: michelpodexminimumtxt.a291f1d7ffeb45abb86c51c9b7b5a95b had an event of type MODIFIED
[2021-07-10 14:31:47,818] {kubernetes_executor.py:205} INFO - Event: michelpodexminimumtxt.a291f1d7ffeb45abb86c51c9b7b5a95b Succeeded
[2021-07-10 14:31:48,373] {kubernetes_executor.py:368} INFO - Attempting to finish pod; pod_id: michelpodexminimumtxt.a291f1d7ffeb45abb86c51c9b7b5a95b; state: None; annotations: {'dag_id': 'michel', 'task_id': 'pod-ex-minimum_txt', 'execution_date': '2021-07-10T14:30:59.199174+00:00', 'try_number': '1'}
[2021-07-10 14:31:48,376] {kubernetes_executor.py:546} INFO - Changing state of (TaskInstanceKey(dag_id='michel', task_id='pod-ex-minimum_txt', execution_date=datetime.datetime(2021, 7, 10, 14, 30, 59, 199174, tzinfo=tzlocal()), try_number=1), None, 'michelpodexminimumtxt.a291f1d7ffeb45abb86c51c9b7b5a95b', 'default', '56653031774') to None
[2021-07-10 14:31:48,378] {scheduler_job.py:1222} INFO - Executor reports execution of michel.pod-ex-minimum_txt execution_date=2021-07-10 14:30:59.199174+00:00 exited with status None for try_number 1

Don't hesitate to ask me if you need more info

boring-cyborg[bot] commented 3 years ago

Thanks for opening your first issue here! Be sure to follow the issue template!

eladkal commented 3 years ago

possibly due to https://github.com/apache/airflow/pull/15942 json was added as templated_ext which means what Airflow tries to template all fields in template_fields that contains the suffix json.

can you see if MyKubernetesPodOperator as:

class MyKubernetesPodOperator(KubernetesPodOperator):
    template_ext = ('.yaml', '.yml', '.json')

solves your problem?

kaxil commented 3 years ago

Yup you are right, @eladkal -- would you like to create a PR with that fix :) ?

We should do it now so it is available in next Providers release

kaxil commented 3 years ago

I have created https://github.com/apache/airflow/pull/16930 just in case you are busy

eladkal commented 3 years ago

I have created #16930 just in case you are busy

Thanks! I wonder if its possible to create a test that will prevent future cases of this problem (test that makes sure any string in temple_ext to any opetator starts with dot). I'm not near my laptop but I think I we have other operators that are missing the dot.

potiuk commented 3 years ago

Comment from https://github.com/apache/airflow/issues/17037#issuecomment-881946792 -> seems that there are doubts whether fix in #16930 actually solves the problem. The comment by @raphaelauv suggests that the root cause is not solved and that passing and argument ending with ".json" (including the '.') still causes failure and does not really solve the problem (@louison @kaxil @eladkal ? WDYT?) . I think the change makes it "consistent" with current behaviour of Airflow for all other operators, but maybe indeed there should be a way to disable the "extension" behaviour (but I see it as a new feature, rather than bug).

raphaelauv commented 3 years ago

Yeah this change make it consistent , it's a good MR.

It just do not solve HIS (@louison) problem, and I'm not saying that HIS (@louison) problem should become OURS ( airflow community).

ps: I was just around to give help on checking the RC providers , nothing more

eladkal commented 3 years ago

@raphaelauv in your reproduce example:

    k = KubernetesPodOperator(
        namespace=namespace,
        image="hello-world",
        labels={"foo": "bar"},
        arguments=["vivi.json"],
        name="airflow-test-pod",
        task_id="task-one",
        ...
        )

Did you actually have a vivi.json file so that jinja could pickup? The traceback you shared is different then the one posted on this issue. Your traceback says jinja2.exceptions.TemplateNotFound: vivi.json which means that jinja doesn't find the vivi.json to process.

potiuk commented 3 years ago

Did you actually have a vivi.json file so that jinja could pickup? The traceback you shared is different then the one posted on this issue. Your traceback says jinja2.exceptions.TemplateNotFound: vivi.json which means that jinja doesn't find the vivi.json to process.

Yep. That is also my point and I think we agree here @eladkal. I think @louison (can you please confirm that ?) really complained in this issue not that just using pod-ex-minimum_json and pod-ex-minimum_txt as the values of the parameter made it fail. IMHO it's reasonable to expect that 'something.json' is a file name and that it will fail if missing (this is how Airflow's 'template_extension' feature works for all other operators too).

kaxil commented 3 years ago

Did you actually have a vivi.json file so that jinja could pickup? The traceback you shared is different then the one posted on this issue. Your traceback says jinja2.exceptions.TemplateNotFound: vivi.json which means that jinja doesn't find the vivi.json to process.

Yep. That is also my point and I think we agree here @eladkal. I think @louison (can you please confirm that ?) really complained in this issue not that just using pod-ex-minimum_json and pod-ex-minimum_txt as the values of the parameter made it fail. IMHO it's reasonable to expect that 'something.json' is a file name and that it will fail if missing (this is how Airflow's 'template_extension' feature works for all other operators too).

That's correct

ericdevries commented 3 years ago

I honestly do not understand how this behaviour is considered normal. In what case would you expect it to actually load the contents of vivi.json and pass that to the kubernetes pod, instead of just passing the literal string "vivi.json" to it?

Furthermore, how can I pass a string argument to a kubernetes pod operator as an env_var without airflow trying to load it as a file (but only when it ends with json or yaml) ?

potiuk commented 3 years ago

See discussion in #17186 - it turned out really to be a bug in the implementation of recursive rendering of parameters in Kubernetes Pod Operator,

You can (and Kubernetes operator actually does) specify specific extensions that are treated differently when processing parameters. This is an individual decision of each operator implementation and it only works for templated fields, so you can still pass '.json` as string for non-templated fields. See https://airflow.apache.org/docs/apache-airflow/stable/howto/custom-operator.html?highlight=template%20fields#templating .

In this case however that was a bug with recursive behaviour for the KPO is wrong and should be improved (but it only affects KPO)

raphaelauv commented 3 years ago

@louison @ericdevries

class MyKubernetesPodOperator(KubernetesPodOperator):
    template_fields = tuple(x for x in KubernetesPodOperator.template_fields if x != "arguments")

task = MyKubernetesPodOperator(
        # The ID specified for the task.
        task_id='pod-ex-minimum_json',
        name='pod-ex-minimum_json',
        cmds=['echo'],
        namespace='default',
        arguments=["vivi.json"],
        image='gcr.io/gcp-runtimes/ubuntu_18_0_4'
    )