astronomer / ask-astro

An end-to-end LLM reference implementation providing a Q&A interface for Airflow and Astronomer
https://ask.astronomer.io/
Apache License 2.0
196 stars 47 forks source link

WSL/Debian does not pass read_parquet exception handling gracefully. #206

Closed mpgreg closed 11 months ago

mpgreg commented 11 months ago

https://github.com/astronomer/ask-astro/blob/32e8ddc95d2f1ef8c774500cb9a07c9e13a40256/airflow/include/tasks/extract/utils/weaviate/ask_astro_weaviate_hook.py#L519-L529

The try/except logic in import_baseline looks for an already downloaded file before trying to read from the baseline file in cloud storage. If this file does not exist we open a new file for writing and pull the data with requests. On some linux variants this works but on WSL/debian it does not. Better to do explicit checks for the file and leverage the remote read capability of read_parquet to extract the data from cloud storage.

This issue has also been seen with the try/except reading cached parquet files in ask-astro-load.py.

*** Found local files:
***   * /usr/local/airflow/logs/dag_id=ask_astro_load_bulk/run_id=manual__2023-12-05T03:07:58.996066+00:00/task_id=import_baseline/attempt=4.log
/usr/local/lib/python3.11/site-packages/airflow/utils/log/file_task_handler.py:186 ResourceWarning: unclosed file <_io.TextIOWrapper name='/usr/local/airflow/logs/dag_id=ask_astro_load_bulk/run_id=manual__2023-12-05T03:07:58.996066+00:00/task_id=import_baseline/attempt=4.log' mode='a' encoding='utf-8'>
[2023-12-05T03:09:43.995+0000] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: ask_astro_load_bulk.import_baseline manual__2023-12-05T03:07:58.996066+00:00 [queued]>
[2023-12-05T03:09:44.007+0000] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: ask_astro_load_bulk.import_baseline manual__2023-12-05T03:07:58.996066+00:00 [queued]>
[2023-12-05T03:09:44.007+0000] {taskinstance.py:1361} INFO - Starting attempt 4 of 4
[2023-12-05T03:09:44.023+0000] {taskinstance.py:1382} INFO - Executing <Task(_PythonDecoratedOperator): import_baseline> on 2023-12-05 03:07:58.996066+00:00
[2023-12-05T03:09:44.029+0000] {standard_task_runner.py:57} INFO - Started process 18423 to run task
[2023-12-05T03:09:44.032+0000] {standard_task_runner.py:84} INFO - Running: ['airflow', 'tasks', 'run', 'ask_astro_load_bulk', 'import_baseline', 'manual__2023-12-05T03:07:58.996066+00:00', '--job-id', '52', '--raw', '--subdir', 'DAGS_FOLDER/ingestion/ask-astro-load.py', '--cfg-path', '/tmp/tmpiqznij59']
[2023-12-05T03:09:44.032+0000] {standard_task_runner.py:85} INFO - Job 52: Subtask import_baseline
[2023-12-05T03:09:44.070+0000] {task_command.py:416} INFO - Running <TaskInstance: ask_astro_load_bulk.import_baseline manual__2023-12-05T03:07:58.996066+00:00 [running]> on host 021cf41edc04
[2023-12-05T03:09:44.142+0000] {taskinstance.py:1662} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='ask_astro_load_bulk' AIRFLOW_CTX_TASK_ID='import_baseline' AIRFLOW_CTX_EXECUTION_DATE='2023-12-05T03:07:58.996066+00:00' AIRFLOW_CTX_TRY_NUMBER='4' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-12-05T03:07:58.996066+00:00'
[2023-12-05T03:09:44.152+0000] {taskinstance.py:1937} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/airflow/include/tasks/extract/utils/weaviate/ask_astro_weaviate_hook.py", line 522, in import_baseline
    df = pd.read_parquet(seed_filename)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/pandas/io/parquet.py", line 670, in read_parquet
    return impl.read(
           ^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/pandas/io/parquet.py", line 265, in read
    path_or_handle, handles, filesystem = _get_path_or_handle(
                                          ^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/pandas/io/parquet.py", line 139, in _get_path_or_handle
    handles = get_handle(
              ^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/pandas/io/common.py", line 872, in get_handle
    handle = open(handle, ioargs.mode)
             ^^^^^^^^^^^^^^^^^^^^^^^^^
FileNotFoundError: [Errno 2] No such file or directory: 'include/data/baseline_data_v2.parquet'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/airflow/decorators/base.py", line 221, in execute
    return_value = super().execute(context)
                   ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/operators/python.py", line 192, in execute
    return_value = self.execute_callable()
                   ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/operators/python.py", line 209, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/include/tasks/extract/utils/weaviate/ask_astro_weaviate_hook.py", line 525, in import_baseline
    with open(seed_filename, "wb") as fh:
         ^^^^^^^^^^^^^^^^^^^^^^^^^
PermissionError: [Errno 13] Permission denied: 'include/data/baseline_data_v2.parquet'
[2023-12-05T03:09:44.163+0000] {taskinstance.py:1400} INFO - Marking task as FAILED. dag_id=ask_astro_load_bulk, task_id=import_baseline, execution_date=20231205T030758, start_date=20231205T030943, end_date=20231205T030944
[2023-12-05T03:09:44.173+0000] {standard_task_runner.py:104} ERROR - Failed to execute job 52 for task import_baseline ([Errno 13] Permission denied: 'include/data/baseline_data_v2.parquet'; 18423)
[2023-12-05T03:09:44.204+0000] {local_task_job_runner.py:228} INFO - Task exited with return code 1
[2023-12-05T03:09:44.231+0000] {taskinstance.py:2778} INFO - 0 downstream tasks scheduled from follow-on schedule check