Open hanizaidi110 opened 2 years ago
Seems duplicated with #117
I think that the issue is very similar but I'm using airflow 2.3.2 from docker with python 3.7. I'm getting the same error only for following tasks: cleanup_TaskInstance, cleanup_BaseXCom, cleanup_TaskReschedule, cleanup_RenderedTaskInstanceFields. Other tasks finish successfully.
[2022-06-20, 17:19:52 UTC] {airflow-db-cleanup.py:265} INFO - Retrieving max_execution_date from XCom
[2022-06-20, 17:19:52 UTC] {airflow-db-cleanup.py:278} INFO - Configurations:
[2022-06-20, 17:19:52 UTC] {airflow-db-cleanup.py:279} INFO - max_date: 2022-05-21 17:18:41.939540+00:00
[2022-06-20, 17:19:52 UTC] {airflow-db-cleanup.py:280} INFO - enable_delete: True
[2022-06-20, 17:19:52 UTC] {airflow-db-cleanup.py:281} INFO - session: <sqlalchemy.orm.session.Session object at 0x7f7102e0c750>
[2022-06-20, 17:19:52 UTC] {airflow-db-cleanup.py:282} INFO - airflow_db_model: <class 'airflow.models.taskinstance.TaskInstance'>
[2022-06-20, 17:19:52 UTC] {airflow-db-cleanup.py:283} INFO - state: None
[2022-06-20, 17:19:52 UTC] {airflow-db-cleanup.py:284} INFO - age_check_column: ColumnAssociationProxyInstance(AssociationProxy('dag_run', 'execution_date'))
[2022-06-20, 17:19:52 UTC] {airflow-db-cleanup.py:285} INFO - keep_last: False
[2022-06-20, 17:19:52 UTC] {airflow-db-cleanup.py:286} INFO - keep_last_filters: None
[2022-06-20, 17:19:52 UTC] {airflow-db-cleanup.py:287} INFO - keep_last_group_by: None
[2022-06-20, 17:19:52 UTC] {airflow-db-cleanup.py:289} INFO -
[2022-06-20, 17:19:52 UTC] {airflow-db-cleanup.py:291} INFO - Running Cleanup Process...
[2022-06-20, 17:19:52 UTC] {taskinstance.py:1889} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/operators/python.py", line 171, in execute
return_value = self.execute_callable()
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/operators/python.py", line 189, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/opt/airflow/dags/airflow-db-cleanup.py", line 298, in cleanup_function
logging.info("INITIAL QUERY : " + str(query))
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 2848, in __str__
return str(statement.compile(bind))
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", line 506, in compile
return self._compiler(dialect, **kw)
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", line 570, in _compiler
return dialect.statement_compiler(dialect, self, **kw)
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/sql/compiler.py", line 766, in __init__
Compiled.__init__(self, dialect, statement, **kwargs)
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/sql/compiler.py", line 455, in __init__
self.string = self.process(self.statement, **compile_kwargs)
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/sql/compiler.py", line 490, in process
return obj._compiler_dispatch(self, **kwargs)
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/sql/visitors.py", line 81, in _compiler_dispatch
return meth(self, **kw)
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/sql/compiler.py", line 2981, in visit_select
select_stmt, self, **kwargs
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/sql/base.py", line 501, in create_for_statement
return klass.create_for_statement(statement, compiler, **kw)
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/context.py", line 579, in create_for_statement
opt.process_compile_state(self)
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/strategy_options.py", line 185, in process_compile_state
self._process(compile_state, not bool(compile_state.current_path))
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/strategy_options.py", line 718, in _process
raiseerr,
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/strategy_options.py", line 870, in _bind_loader
"mapper option expects " "string key or list of attributes"
sqlalchemy.exc.ArgumentError: mapper option expects string key or list of attributes
[2022-06-20, 17:19:52 UTC] {taskinstance.py:1400} INFO - Marking task as FAILED. dag_id=airflow-db-cleanup, task_id=cleanup_TaskInstance, execution_date=20220619T000000, start_date=20220620T171951, end_date=20220620T171952
[2022-06-20, 17:19:52 UTC] {logging_mixin.py:115} WARNING - /home/airflow/.local/lib/python3.7/site-packages/airflow/models/param.py:62 DeprecationWarning: The use of non-json-serializable params is deprecated and will be removed in a future release
Tasks cleanup_TaskInstance
and cleanup_TaskReschedule
fail with Python 3.8.12
and Airflow 2.2.2
. Same error sqlalchemy.exc.ArgumentError: mapper option expects string key or list of attributes
This seems to be related to missing columns in the db tables to be cleaned for those two tasks.
I fixed it replacing execution_date
with end_date
in age_check_column
for both cleanup_TaskInstance
and cleanup_TaskReschedule
. Let me know whether there is a more appropriate field that should be used.
I also suggest adding
# Extract db table columns' names
table_columns = [str(colname).split(".")[1] for colname in airflow_db_model.__table__.columns]
age_check_col_name = str(age_check_column).split(".")[1]
if age_check_col_name not in table_columns:
raise ValueError(f"{age_check_col_name} field not in table {airflow_db_model.__table__.name}")
in cleanup_function
to check that the age_check_column
selected for a specific column is found in the table to be cleaned
I read thru #117 and it looks that comment from @PhilippDB makes sense and can be the solution.
@e-compagno please be aware of what wrote @tylerwmarrs - using start_date or end_date may refer to incorrect records that are not tied with records in dag_run table. The tables have following constrains in DDL:
CREATE TABLE task_instance(
task_id character varying(250) NOT NULL,
dag_id character varying(250) NOT NULL,
run_id character varying(250) NOT NULL,
[...]
PRIMARY KEY(task_id,dag_id,run_id,map_index),
CONSTRAINT task_instance_dag_run_fkey FOREIGN key(dag_id) REFERENCES dag_run(dag_id),
CONSTRAINT task_instance_dag_run_fkey FOREIGN key(run_id) REFERENCES dag_run(dag_id),
CONSTRAINT task_instance_trigger_id_fkey FOREIGN key(trigger_id) REFERENCES trigger(id)
);
The problem is still active in version 2.4.1. The cloud composer version does not fix TaskReschedule
cleaning and also break the xcom cleaning operation as execution_date
is not a field in the db. Is there any update on the issue resolution?
Would
"airflow_db_model": XCom,
"age_check_column": XCom.timestamp,
fixes the issue?
XCom.timestamp
solves the error on XCom
, not the others
XCom.timestamp
solves the error onXCom
, not the others
you can find the time column in model file. for example TaskInstance you can use queued_dttm/end_date
The problem is still active in version 2.4.1. The cloud composer version does not fix
TaskReschedule
cleaning and also break the xcom cleaning operation asexecution_date
is not a field in the db. Is there any update on the issue resolution?Would
"airflow_db_model": XCom, "age_check_column": XCom.timestamp,
fixes the issue?
Fixes the issue for Xcom
. For RenderedTaskInstanceFields
use RenderedTaskInstanceFields.run_id
to fix the issue.
I have recently upgraded to Python3.9 Airflow2.2.2. Following error is occurring repeatedly after the upgrade. I've only changed the parameters indicated in the repo README and is running everything else the same. Other lib versions: SQLAlchemy 1.4.1 Flask-SQLAlchemy 2.4.3 Can you please check the problem here?