apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.51k stars 14.14k forks source link

Airflow DAG not working anymore on Airflow 2.7.3 #35979

Closed gmarendaz closed 10 months ago

gmarendaz commented 10 months ago

Apache Airflow version

2.7.3

What happened

I was in part of the migration of Apache Airflow in a Ubuntu environment from a Windows WSL environment. DAG was working correctly before on Windows WSL but not on Ubuntu.

What you think should happen instead

The code should work as expected because the file and DAG structure did not change.

How to reproduce

The problem can't be reprocued as it is in my environment only

Operating System

Ubuntu 22.04.3 LTS

Versions of Apache Airflow Providers

ai-operator @ file:///home/apache/airflow/packages/ai_package/dist/ai_operator-0.0.0-py3-none-any.whl
aiohttp==3.8.6
aiosignal==1.3.1
alembic==1.12.1
annotated-types==0.6.0
anyio==4.0.0
apache-airflow==2.7.3
apache-airflow-providers-common-sql==1.8.0
apache-airflow-providers-ftp==3.6.0
apache-airflow-providers-http==4.6.0
apache-airflow-providers-imap==3.4.0
apache-airflow-providers-mysql==5.2.1
apache-airflow-providers-sqlite==3.5.0
apispec==6.3.0
argcomplete==3.1.3
asgiref==3.7.2
async-timeout==4.0.3
attrs==23.1.0
Automat==20.2.0
Babel==2.13.1
backoff==1.10.0
bcrypt==3.2.0
blinker==1.6.3
cachelib==0.9.0
cattrs==23.1.2
certifi==2023.7.22
cffi==1.16.0
chardet==4.0.0
charset-normalizer==3.3.2
click==8.1.7
clickclick==20.10.2
cloud-init==23.2.2
cmake==3.27.1
colorama==0.4.6
colorlog==4.8.0
command-not-found==0.3
configobj==5.0.6
ConfigUpdater==3.1.1
connexion==2.14.2
constantly==15.1.0
contourpy==1.1.0
cron-descriptor==1.4.0
croniter==2.0.1
cryptography==41.0.5
cycler==0.11.0
dbus-python==1.2.18
Deprecated==1.2.14
dill==0.3.1.1
distro==1.7.0
distro-info==1.1+ubuntu0.1
dnspython==2.4.2
docutils==0.20.1
email-validator==1.3.1
exceptiongroup==1.1.2
filelock==3.12.2
Flask==2.2.5
Flask-AppBuilder==4.3.6
Flask-Babel==2.0.0
Flask-Caching==2.1.0
Flask-JWT-Extended==4.5.3
Flask-Limiter==3.5.0
Flask-Login==0.6.3
Flask-Session==0.5.0
Flask-SQLAlchemy==2.5.1
Flask-WTF==1.2.1
fonttools==4.42.0
frozenlist==1.4.0
gevent==23.7.0
google-re2==1.1
googleapis-common-protos==1.61.0
graphviz==0.20.1
greenlet==3.0.1
grpcio==1.59.2
gunicorn==21.2.0
h11==0.14.0
httpcore==0.16.3
httplib2==0.20.2
httpx==0.23.3
hyperlink==21.0.0
idna==3.4
importlib-metadata==6.8.0
importlib-resources==6.1.0
imutils==0.5.4
incremental==21.3.0
inflection==0.5.1
itsdangerous==2.1.2
jeepney==0.7.1
Jinja2==3.1.2
jsonpatch==1.32
jsonpointer==2.0
jsonschema==4.19.2
jsonschema-specifications==2023.7.1
keyring==23.5.0
kiwisolver==1.4.4
launchpadlib==1.10.16
lazr.restfulclient==0.14.4
lazr.uri==1.0.6
lazy-object-proxy==1.9.0
limits==3.6.0
linkify-it-py==2.0.2
lit==16.0.6
lockfile==0.12.2
Mako==1.2.4
Markdown==3.5.1
markdown-it-py==3.0.0
MarkupSafe==2.1.3
marshmallow==3.20.1
marshmallow-enum==1.5.1
marshmallow-oneofschema==3.0.1
marshmallow-sqlalchemy==0.26.1
matplotlib==3.7.2
mdit-py-plugins==0.4.0
mdurl==0.1.2
more-itertools==8.10.0
mpmath==1.3.0
multidict==6.0.4
mutils==1.0.5
mysql-connector-python==8.1.0
mysqlclient==2.1.1
netifaces==0.11.0
networkx==3.1
numpy==1.25.2
nvidia-cublas-cu11==11.10.3.66
nvidia-cuda-cupti-cu11==11.7.101
nvidia-cuda-nvrtc-cu11==11.7.99
nvidia-cuda-runtime-cu11==11.7.99
nvidia-cudnn-cu11==8.5.0.96
nvidia-cufft-cu11==10.9.0.58
nvidia-curand-cu11==10.2.10.91
nvidia-cusolver-cu11==11.4.0.1
nvidia-cusparse-cu11==11.7.4.91
nvidia-nccl-cu11==2.14.3
nvidia-nvtx-cu11==11.7.91
oauthlib==3.2.0
opencv-python==4.8.0.76
opentelemetry-api==1.20.0
opentelemetry-exporter-otlp==1.20.0
opentelemetry-exporter-otlp-proto-common==1.20.0
opentelemetry-exporter-otlp-proto-grpc==1.20.0
opentelemetry-exporter-otlp-proto-http==1.20.0
opentelemetry-proto==1.20.0
opentelemetry-sdk==1.20.0
opentelemetry-semantic-conventions==0.41b0
ordered-set==4.1.0
packaging==23.2
pandas==2.0.3
pathspec==0.11.2
pendulum==2.1.2
pexpect==4.8.0
Pillow==10.0.0
pluggy==1.3.0
prison==0.2.1
protobuf==4.24.4
psutil==5.9.6
ptyprocess==0.7.0
pyasn1==0.4.8
pyasn1-modules==0.2.1
pycparser==2.21
pydantic==2.4.2
pydantic_core==2.10.1
Pygments==2.16.1
PyGObject==3.42.1
PyHamcrest==2.0.2
PyJWT==2.8.0
pyOpenSSL==21.0.0
pyparsing==3.0.9
pyrsistent==0.18.1
pyserial==3.5
python-apt==2.4.0+ubuntu2
python-daemon==3.0.1
python-dateutil==2.8.2
python-debian==0.1.43+ubuntu1.1
python-magic==0.4.24
python-nvd3==0.15.0
python-slugify==8.0.1
pytz==2023.3.post1
pytzdata==2020.1
PyYAML==6.0.1
referencing==0.30.2
requests==2.31.0
requests-toolbelt==1.0.0
rfc3339-validator==0.1.4
rfc3986==1.5.0
rich==13.6.0
rich-argparse==1.4.0
rpds-py==0.10.6
scipy==1.11.1
seaborn==0.12.2
SecretStorage==3.3.1
service-identity==18.1.0
setproctitle==1.3.3
six==1.16.0
sniffio==1.3.0
sos==4.5.6
SQLAlchemy==1.4.50
SQLAlchemy-JSONField==1.0.1.post0
SQLAlchemy-Utils==0.41.1
sqlparse==0.4.4
ssh-import-id==5.11
sympy==1.12
systemd-python==234
tabulate==0.9.0
tenacity==8.2.3
termcolor==2.3.0
text-unidecode==1.3
torch==2.0.1
torchvision==0.15.2
tqdm==4.66.1
triton==2.0.0
Twisted==22.1.0
typing_extensions==4.8.0
tzdata==2023.3
ubuntu-advantage-tools==8001
ubuntu-drivers-common==0.0.0
uc-micro-py==1.0.2
ufw==0.36.1
unattended-upgrades==0.1
unicodecsv==0.14.1
urllib3==1.26.18
wadllib==1.3.6
Werkzeug==2.2.3
wrapt==1.15.0
WTForms==3.0.1
xkit==0.0.0
xlrd==2.0.1
yarl==1.9.2
zipp==3.17.0
zope.event==5.0
zope.interface==5.4.0

Deployment

Virtualenv installation

Deployment details

Anything else

Full traceback :

*** Found local files:
***   * /home/apache/airflow/logs/dag_id=MASP_Slave/run_id=manual__2023-11-28T00:01:58.619294+00:00/task_id=transform/attempt=1.log
[2023-11-30, 12:29:58 CET] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: MASP_Slave.transform manual__2023-11-28T00:01:58.619294+00:00 [queued]>
[2023-11-30, 12:29:58 CET] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: MASP_Slave.transform manual__2023-11-28T00:01:58.619294+00:00 [queued]>
[2023-11-30, 12:29:58 CET] {taskinstance.py:1361} INFO - Starting attempt 1 of 1
[2023-11-30, 12:29:58 CET] {taskinstance.py:1382} INFO - Executing <Task(PythonOperator): transform> on 2023-11-28 00:01:58.619294+00:00
[2023-11-30, 12:29:58 CET] {standard_task_runner.py:57} INFO - Started process 3168332 to run task
[2023-11-30, 12:29:58 CET] {standard_task_runner.py:84} INFO - Running: ['airflow', 'tasks', 'run', 'MASP_Slave', 'transform', 'manual__2023-11-28T00:01:58.619294+00:00', '--job-id', '890530', '--raw', '--subdir', 'DAGS_FOLDER/manufacturing/tests/MASP_Slave.py', '--cfg-path', '/tmp/tmp6usvkgvc']
[2023-11-30, 12:29:58 CET] {standard_task_runner.py:85} INFO - Job 890530: Subtask transform
[2023-11-30, 12:29:59 CET] {task_command.py:416} INFO - Running <TaskInstance: MASP_Slave.transform manual__2023-11-28T00:01:58.619294+00:00 [running]> on host clb-34a01
[2023-11-30, 12:29:59 CET] {taskinstance.py:1662} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='IMEDA' AIRFLOW_CTX_DAG_ID='MASP_Slave' AIRFLOW_CTX_TASK_ID='transform' AIRFLOW_CTX_EXECUTION_DATE='2023-11-28T00:01:58.619294+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-11-28T00:01:58.619294+00:00'
[2023-11-30, 12:29:59 CET] {taskinstance.py:1937} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/apache/.local/lib/python3.10/site-packages/airflow/models/xcom.py", line 681, in _deserialize_value
    return pickle.loads(result.value)
_pickle.UnpicklingError: pickle data was truncated
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/home/apache/.local/lib/python3.10/site-packages/airflow/operators/python.py", line 192, in execute
    return_value = self.execute_callable()
  File "/home/apache/.local/lib/python3.10/site-packages/airflow/operators/python.py", line 209, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/home/apache/airflow/dags/manufacturing/tests/MASP_Slave.py", line 108, in transform
    extracting_res = [output for output in task_outputs if output is not None]
  File "/home/apache/airflow/dags/manufacturing/tests/MASP_Slave.py", line 108, in <listcomp>
    extracting_res = [output for output in task_outputs if output is not None]
  File "/home/apache/.local/lib/python3.10/site-packages/airflow/models/xcom.py", line 720, in __next__
    return XCom.deserialize_value(next(self._it))
  File "/home/apache/.local/lib/python3.10/site-packages/airflow/models/xcom.py", line 693, in deserialize_value
    return BaseXCom._deserialize_value(result, False)
  File "/home/apache/.local/lib/python3.10/site-packages/airflow/models/xcom.py", line 683, in _deserialize_value
    return json.loads(result.value.decode("UTF-8"), cls=XComDecoder, object_hook=object_hook)
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x80 in position 0: invalid start byte
[2023-11-30, 12:29:59 CET] {taskinstance.py:1400} INFO - Marking task as FAILED. dag_id=MASP_Slave, task_id=transform, execution_date=20231128T000158, start_date=20231130T112958, end_date=20231130T112959
[2023-11-30, 12:29:59 CET] {standard_task_runner.py:104} ERROR - Failed to execute job 890530 for task transform ('utf-8' codec can't decode byte 0x80 in position 0: invalid start byte; 3168332)
[2023-11-30, 12:29:59 CET] {local_task_job_runner.py:228} INFO - Task exited with return code 1
[2023-11-30, 12:29:59 CET] {taskinstance.py:2778} INFO - 0 downstream tasks scheduled from follow-on schedule check

The function where it fails :

def transform(**kwargs):
    ti = kwargs['ti']
    task_outputs = ti.xcom_pull(task_ids=["old_extract", "new_extract"])
    extracting_res = [output for output in task_outputs if output is not None]
    df = extracting_res[0]
    df = df.rename(columns={"data_n":"info_data_n"})
    schema = _template_slave.get_schema("test_wafer")
    df = concat_time_date(df)
    df = _template_slave.filter_(df, schema)
    df = _template_slave.cast(df, schema)
    df["location"] = kwargs["dag_run"].conf['location']
    df["src_path"] = kwargs["dag_run"].conf['path']
    return df

The function where the data comes from :

def old_extract(**kwargs):
    path = kwargs["dag_run"].conf['path']
    header_size = _template_slave.header(path, "Puce N°")
    raw_df = pd.read_csv(path, header=header_size, sep="\t", encoding='iso-8859-1', engine='python')
    df = raw_df[raw_df.count(axis=1) > 6][1:]
    df = get_header_fields(path, df, comment = "")
    str_columns = [col for col in df.columns if isinstance(df[col], str)]
    df[str_columns] = df[str_columns].rename(columns=str.lower)\
                                     .rename(columns=_template_slave.remove_accents)\
                                     .rename(columns=_template_slave.remove_special_characters)
    return df

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 10 months ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

yiqijiu commented 10 months ago

I need you to provide the complete DAG (Directed Acyclic Graph) code. Given my past experience (which could be wrong here), you probably haven't added UTF-8 encoding declaration in the header line of the Python file. #-*-coding: UTF-8 -*-

gmarendaz commented 10 months ago

@yiqijiu : thanks for your answer. Indeed, I didn't have the encoding declaration at the beginning of my DAG code and I added it but the problem remains the same.

Here is the full code below :

# -*- coding: utf-8 -*-

from _lib import _template_slave
from airflow import DAG
from airflow.hooks.base_hook import BaseHook
from airflow.models import Variable
from airflow.models import XCom
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from airflow.utils.db import provide_session
from datetime import datetime
from io import StringIO, BytesIO
import chardet
import datetime
import json
import logging
import ntpath
import numpy as np
import os
import pandas as pd
import re
import sqlalchemy
import unicodedata
import yaml

DAG_ID = os.path.basename(__file__).replace(".pyc", "").replace(".py", "")

default_args = {
    'owner': 'IMEDA',
    'start_date': days_ago(2),
    'weight_rule': "upstream"
}

## CHECK
def check_file(**kwargs):
    path = kwargs["dag_run"].conf['path']
    blacklist = Variable.get("blacklist", deserialize_json=True)
    if any(s in path for s in blacklist):
        return "blacklist"
    else:
        return "pick_extract_mode"

def pick_extract_mode(**kwargs):
    path = kwargs["dag_run"].conf['path']
    with open(path, encoding='latin_1') as f:
        file_ = [line for line in f]
        for n,line in enumerate(file_):
            if line.startswith("<Info>"):
                return "new_extract"
    return "old_extract"

## EXTRACT
def get_header_fields(path, df, comment):
    with open(path, encoding='latin_1') as f:
        for line in f:
            if line.startswith(comment + "\tNo de série :"):
                df["batch_id"] = re.search(r'.*No de série :\t([a-zA-Z0-9]*)',line).group(1)
                n_plaque = re.search(r'.*N° plaque :\t([0-9]*)',line).group(1)
                # Fill with zeros to keep order 01, 02, .. , 10, 11,...
                if len(n_plaque) < 2:
                    n_plaque = n_plaque.zfill(2)
                df["n_plaque"] = n_plaque
            elif line.startswith(comment + "\tProduit :"):
                df["produit"] = re.search(r'.*Produit :\t(.*)\ttechnologie :',line).group(1)
                df["technologie"] = re.search(r'.*technologie :\t(.*)\tOpérateur',line).group(1)
            if line.startswith(comment + "\tFichier limites :"):
                limit_file = re.search(r'\tFichier limites :\t(.*)\tdu :',line).group(1)
                df["limit_file"] = limit_file.lower().replace(" ", "")
    return df

def old_extract(**kwargs):
    path = kwargs["dag_run"].conf['path']
    header_size = _template_slave.header(path, "Puce N°")
    raw_df = pd.read_csv(path, header=header_size, sep="\t", engine='python', encoding='latin_1', encoding_errors='ignore', on_bad_lines='skip')
    # Keep only rows with more than 5 non-NaN values 
    df = raw_df[raw_df.count(axis=1) > 6][1:]
    df = get_header_fields(path, df, comment = "")
    str_columns = [col for col in df.columns if isinstance(col, str)]
    df[str_columns] = df[str_columns].rename(columns=str.lower)\
                                     .rename(columns=_template_slave.remove_accents)\
                                     .rename(columns=_template_slave.remove_special_characters)
    return df

def new_extract(**kwargs):
    path = kwargs["dag_run"].conf['path']
    df = pd.read_csv(path, comment='#', sep="\t", header=[0,1], engine='python', encoding='latin_1', encoding_errors='ignore', on_bad_lines='skip')
    df.columns = df.columns.map('_'.join)
    str_columns = [col for col in df.columns if isinstance(col, str)]
    df[str_columns] = df[str_columns].rename(columns=str.lower)\
                                     .rename(columns=_template_slave.remove_accents)\
                                     .rename(columns=_template_slave.remove_special_characters)
    df = get_header_fields(path, df, comment="###\t")
    with open(path, encoding='latin_1') as f:
            file_ = [line for line in f]
            for n,line in enumerate(file_):
                if line.startswith("Fichier clôturé"):
                    return df[1:-1]
    return df[1:]

## TRANSFORM
"""def custom_deserialize(df):
    return json.load(df.decode('latin_1'))"""

def concat_time_date(df):
    df['date_heure'] = pd.to_datetime(df['info_date'] +  " " + df['info_heure'], dayfirst = True, errors="coerce")
    df = df.drop(['info_date', 'info_heure'], axis=1)
    return df

def transform(**kwargs):
    ti = kwargs['ti']
    task_outputs = ti.xcom_pull(task_ids=["old_extract", "new_extract"])
    logging.info(task_outputs)
    extracting_res = [output for output in task_outputs if output is not None]
    df = extracting_res[0]
    df = df.rename(columns={"puce_n":"info_puce_n",
                            "pos_x": "info_pos_x",
                            "pos_y": "info_pos_y",
                            "date" : "info_date",
                            "heure" : "info_heure",
                            'pos_a_x_g' : 'capa_meas_pos_meas_at_x',
                            "status" : "info_status",
                            "csup_a_x_g" : "capa_meas_csup_x_g",
                            'cinf_a_x_g' : 'capa_meas_cinf_x_g',
                            'csup_a_0_g' : 'capa_meas_csup_0_g',
                            'rsup_a_0_g' : 'capa_meas_rsup_0_g',
                            'cinf_a_0_g' : 'capa_meas_cinf_0_g',
                            'rinf_a_0_g' : 'capa_meas_rinf_0_g',
                            'csup_pol' : 'capa_meas_csup_pol',
                            'cinf_pol' : 'capa_meas_cinf_pol',
                            'csup_hyst' : 'capa_meas_csup_hys',
                            'cinf_hyst' : 'capa_meas_cinf_hys',
                            'c_tot' : 'capa_meas_c_tot',
                            'sz' : 'capa_meas_sz_percent',
                            'sz_1' : 'capa_meas_sz_pf',
                            'so' : 'capa_meas_s0_a',
                            'd_pol_sup' : 'capa_meas_delta_csup_pol',
                            'd_pol_inf' : 'capa_meas_delta_cinf_pol',
                            'd_hyst_sup' : 'capa_meas_delta_csup_hys',
                            'd_hyst_inf' : 'capa_meas_delta_cinf_hys',
                            'ampl_a_0' : 'bw_meas_ampl_0_hz',
                            'ampl_x_hz' : 'bw_meas_ampl_x_hz',
                            'fc' : 'bw_meas_fc',
                            'ampl_min' : 'bw_meas_ampl_min',
                            'f_min' : 'bw_meas_f_min',
                            'ampl_res' : 'bw_meas_ampl_res',
                            'f_res' : 'bw_meas_f_res',
                            'mse' : 'bw_meas_mse',
                            'press' : 'bw_meas_press',
                            'q' : 'bw_meas_q'})
    schema = _template_slave.get_schema("test_wafer")
    df = concat_time_date(df)
    df = _template_slave.filter_(df, schema)
    df = _template_slave.cast(df, schema)
    df["location"] = kwargs["dag_run"].conf['location']
    df["src_path"] = kwargs["dag_run"].conf['path']
    return df

## LOAD 
def load(**kwargs):
    engine = sqlalchemy.create_engine(BaseHook.get_connection("database").get_uri())
    ti = kwargs['ti']
    df = ti.xcom_pull(task_ids='transform')
    try:
        df.to_sql("test_wafer", con=engine, if_exists='append', index=False)
    except sqlalchemy.exc.IntegrityError:
        pass

@provide_session
def cleanup_xcom(session=None, **kwargs): 
    session.query(XCom).filter(XCom.dag_id == DAG_ID)\
                       .filter(XCom.execution_date == kwargs["execution_date"])\
                       .delete()
with DAG(
    dag_id=DAG_ID,
    default_args=default_args,
    schedule_interval=None,
    max_active_runs=1,
    tags = ["manufacturing", "tests"]
) as dag:
    check_task = BranchPythonOperator(
        task_id="check_file",
        python_callable=check_file
    )
    blacklist_task = DummyOperator(
        task_id="blacklist"
    )
    pick_extract_mode_task = BranchPythonOperator(
        task_id="pick_extract_mode",
        python_callable=pick_extract_mode
    )
    old_extract_task = PythonOperator(
        task_id='old_extract',
        python_callable=old_extract,
    )
    new_extract_task = PythonOperator(
        task_id='new_extract',
        python_callable=new_extract,
    )
    transform_task = PythonOperator(
        task_id='transform',
        python_callable=transform,
        trigger_rule="none_failed"
    )
    load_task = PythonOperator(
        task_id='load',
        python_callable=load,
        pool="wafer_pool"
    )
    clean_xcom_task = PythonOperator(
        task_id="clean_xcom",
        python_callable = cleanup_xcom,
    )
    check_task >> [pick_extract_mode_task, blacklist_task]
    pick_extract_mode_task >> [old_extract_task, new_extract_task]
    [old_extract_task, new_extract_task] >> transform_task >> load_task >> clean_xcom_task