apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.12k stars 14.31k forks source link

task.kubernetes failure on creating pods (using CeleryExecutor) #35595

Closed mostafaghadimi closed 1 year ago

mostafaghadimi commented 1 year ago

Apache Airflow version

Other Airflow 2 version (please specify below)

What happened

We are trying to use task.kubernetes decorator in order to run the Python code. Inside the image of the Kubernetes Operator, the requirements for the Pythonic code has been installed.

Whenever we want to run the DAG, after it creates the /tmp/script.py file (based on the functionality of the decorator), the following error was raised:

TypeError: Invalid argument(s) 'encoding' sent to create_engine(), using configuration SQLiteDialect_pysqlite/QueuePool/Engine. Please check that the keyword arguments are appropriate for this combination of components.

What you think should happen instead

It is trying to run and authenticate to backend database that is SQLLite! That sounds so weird. As expected, it is required to run the created /tmp/script.py file.

How to reproduce

  1. copy the requirements.txt file:
    pendulum
    requests
    numpy
    pyodbc
    pytz
    sqlalchemy
    pandas
    ldap3
    clickhouse-driver==0.2.6
    airflow-clickhouse-plugin==0.11.0
    hydra-core==1.3.2
  2. Create a Dockerfile:

    FROM python:3.8.10
    
    RUN apt-get update && \
    apt-get install -y unixodbc-dev libkrb5-dev build-essential tzdata telnet && \
    apt-get autoremove -yqq --purge &&\
    apt-get clean &&\
    rm -rf /var/lib/apt/lists/*
    
    WORKDIR /app
    
    COPY requirements.txt .
    
    RUN pip install -r requirements.txt
    
    RUN ln -s /usr/bin/python3 /usr/bin/python
  3. Add the following example file as DAG:
import os
from datetime import datetime

import pendulum
from airflow.models.dag import DAG
from airflow.decorators import task

from ucce_live_data.utilities.insert_to_target_task import insert_to_target
from ucce_live_data.utilities.utils import convert_airflow_datetime_to_sql

@task.kubernetes(
    image="<created_image>:<created_image_tag>",
    name="ucce_make_query",
    namespace="airflow",
    in_cluster=True,
)
def make_query(**kwargs):
    dag_run = kwargs["dag_run"]
    start_time = dag_run.data_interval_start
    end_time = dag_run.data_interval_end
    print(start_time)
    print(end_time)
    start_time = convert_airflow_datetime_to_sql(start_time)
    end_time = convert_airflow_datetime_to_sql(end_time)
    return f"""
        SELECT
            cast(RecoveryKey as bigint),
            RouterCallKey,
            RouterCallKeyDay,
            RouterCallKeySequenceNumber,
            PrecisionQueueID,
            DateTime,
            DbDateTime,
            TalkTime,
            NetQTime,
            CallDisposition,
            CallDispositionFlag
        FROM
            [icm_hds].[dbo].[t_Termination_Call_Detail]
        WHERE
            DbDateTime BETWEEN '{start_time}' and '{end_time}'
    """

with DAG(
    dag_id="termination_call_detail",
    max_active_runs=1,
    schedule_interval="*/5 * * * *",  # "Every 5 minutes"
    start_date=datetime(2023, 10, 31, 0, 0, 0),
    catchup=True,
    tags=["UCCE", "LiveData"],
    default_args={"retries": 1},
    params={"target_db": "UCCE_live", "target_table": "t_Termination_Call_Detail"},
) as dag:
    query = make_query()

The problem is that before it runs the make_query function, the following error has happened:

[2023-11-13, 08:45:39 UTC] {pod_manager.py:235} INFO - + python -c 'import base64, os;x = base64.b64decode(os.environ["__PYTHON_SCRIPT"]);f = open("/tmp/script.py", "wb"); f.write(x); f.close()'
[2023-11-13, 08:45:39 UTC] {pod_manager.py:235} INFO - + python -c 'import base64, os;x = base64.b64decode(os.environ["__PYTHON_INPUT"]);f = open("/tmp/script.in", "wb"); f.write(x); f.close()'
[2023-11-13, 08:45:39 UTC] {pod_manager.py:235} INFO - + mkdir -p /airflow/xcom
[2023-11-13, 08:45:39 UTC] {pod_manager.py:235} INFO - + python /tmp/script.py /tmp/script.in /airflow/xcom/return.json
[2023-11-13, 08:45:39 UTC] {pod_manager.py:235} INFO - Traceback (most recent call last):
[2023-11-13, 08:45:39 UTC] {pod_manager.py:235} INFO -   File "/tmp/script.py", line 8, in <module>
[2023-11-13, 08:45:39 UTC] {pod_manager.py:235} INFO -     from airflow.plugins_manager import integrate_macros_plugins
[2023-11-13, 08:45:39 UTC] {pod_manager.py:235} INFO -   File "/usr/local/lib/python3.8/dist-packages/airflow/__init__.py", line 69, in <module>
[2023-11-13, 08:45:39 UTC] {pod_manager.py:235} INFO -     settings.initialize()
[2023-11-13, 08:45:39 UTC] {pod_manager.py:235} INFO -   File "/usr/local/lib/python3.8/dist-packages/airflow/settings.py", line 529, in initialize
[2023-11-13, 08:45:39 UTC] {pod_manager.py:235} INFO -     configure_orm()
[2023-11-13, 08:45:39 UTC] {pod_manager.py:235} INFO -   File "/usr/local/lib/python3.8/dist-packages/airflow/settings.py", line 222, in configure_orm
[2023-11-13, 08:45:39 UTC] {pod_manager.py:235} INFO -     engine = create_engine(SQL_ALCHEMY_CONN, connect_args=connect_args, **engine_args)
[2023-11-13, 08:45:39 UTC] {pod_manager.py:235} INFO -   File "<string>", line 2, in create_engine
[2023-11-13, 08:45:39 UTC] {pod_manager.py:235} INFO -   File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/util/deprecations.py", line 281, in warned
[2023-11-13, 08:45:39 UTC] {pod_manager.py:235} INFO -     return fn(*args, **kwargs)  # type: ignore[no-any-return]
[2023-11-13, 08:45:39 UTC] {pod_manager.py:235} INFO -   File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/engine/create.py", line 680, in create_engine
[2023-11-13, 08:45:39 UTC] {pod_manager.py:235} INFO -     raise TypeError(
[2023-11-13, 08:45:39 UTC] {pod_manager.py:235} INFO - TypeError: Invalid argument(s) 'encoding' sent to create_engine(), using configuration SQLiteDialect_pysqlite/QueuePool/Engine.  Please check that the keyword arguments are appropriate for this combination of components.
[2023-11-13, 08:45:39 UTC] {pod_manager.py:263} WARNING - Pod ucce-make-query-u6pbpck2 log read interrupted but container base still running
[2023-11-13, 08:45:40 UTC] {pod_manager.py:235} INFO - Traceback (most recent call last):
[2023-11-13, 08:45:40 UTC] {pod_manager.py:235} INFO -   File "/tmp/script.py", line 8, in <module>
[2023-11-13, 08:45:40 UTC] {pod_manager.py:235} INFO -     from airflow.plugins_manager import integrate_macros_plugins
[2023-11-13, 08:45:40 UTC] {pod_manager.py:235} INFO -   File "/usr/local/lib/python3.8/dist-packages/airflow/__init__.py", line 69, in <module>
[2023-11-13, 08:45:40 UTC] {pod_manager.py:235} INFO -     settings.initialize()
[2023-11-13, 08:45:40 UTC] {pod_manager.py:235} INFO -   File "/usr/local/lib/python3.8/dist-packages/airflow/settings.py", line 529, in initialize
[2023-11-13, 08:45:40 UTC] {pod_manager.py:235} INFO -     configure_orm()
[2023-11-13, 08:45:40 UTC] {pod_manager.py:235} INFO -   File "/usr/local/lib/python3.8/dist-packages/airflow/settings.py", line 222, in configure_orm
[2023-11-13, 08:45:40 UTC] {pod_manager.py:235} INFO -     engine = create_engine(SQL_ALCHEMY_CONN, connect_args=connect_args, **engine_args)
[2023-11-13, 08:45:40 UTC] {pod_manager.py:235} INFO -   File "<string>", line 2, in create_engine
[2023-11-13, 08:45:40 UTC] {pod_manager.py:235} INFO -   File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/util/deprecations.py", line 281, in warned
[2023-11-13, 08:45:40 UTC] {pod_manager.py:235} INFO -     return fn(*args, **kwargs)  # type: ignore[no-any-return]
[2023-11-13, 08:45:40 UTC] {pod_manager.py:235} INFO -   File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/engine/create.py", line 680, in create_engine
[2023-11-13, 08:45:40 UTC] {pod_manager.py:235} INFO -     raise TypeError(
[2023-11-13, 08:45:40 UTC] {pod_manager.py:235} INFO - TypeError: Invalid argument(s) 'encoding' sent to create_engine(), using configuration SQLiteDialect_pysqlite/QueuePool/Engine.  Please check that the keyword arguments are appropriate for this combination of components.
[2023-11-13, 08:45:40 UTC] {pod_manager.py:288} INFO - Pod ucce-make-query-u6pbpck2 has phase Running
[2023-11-13, 08:45:42 UTC] {kubernetes_pod.py:688} INFO - Deleting pod: ucce-make-query-u6pbpck2
[2023-11-13, 08:45:43 UTC] {taskinstance.py:1776} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/cncf/kubernetes/decorators/kubernetes.py", line 138, in execute
    return super().execute(context)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/decorators/base.py", line 217, in execute
    return_value = super().execute(context)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py", line 516, in execute
    return self.execute_sync(context)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py", line 547, in execute_sync
    remote_pod=self.remote_pod,
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py", line 672, in cleanup
    f"Pod {pod and pod.metadata.name} returned a failure:\n{error_message}\n"
airflow.exceptions.AirflowException: Pod ucce-make-query-u6pbpck2 returned a failure:

That's non-sense. Would anyone describe what is happening in the backend side of the Airflow? It's so confusing!

Operating System

Debian GNU/Linux 11

Versions of Apache Airflow Providers

No response

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 1 year ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.