Closed thiagopirex closed 9 months ago
Hi! Unfortunately, the problem is that the task entities themselves are created when this hook is called: on_dag_run_running, that is, at the moment the dag is launched, information about the tasks is also sent to ODD platform. But information is ingested only for the tasks that have "inlets" or "outlets" attribute not empty (these are old concepts in Airflow for lineage tracking and here this concept is reused).
The inconsistency arises due to the fact that later the check for the presence of inlets or outlets does not work at the start of the tasks (to create task run entry in ODD). That is, we did not create a task, but we are trying to create a run for it.
In general, we need to change the logic a bit and we'll do it. In the meantime, workaround is to add a list with empty string for inlets or outlets to the task:
test_task = BashOperator(
task_id="test_task",
bash_command=command,
inlets=[''],
)
Btw, inlets in that case are designed to list ODDRNs of Datasets that are considered to be inputs for the task. And outlets, in turn, are outputs. That way we could build a lineage for the task. At the moment there is no automation to create inlets and outlets so we have to mention them manually for each task. Of course, this attributes are templates so we could utilize full power of templating.
We'll also add more information about inlets and outlets to the readme file to reduce confusion!
@ValeriyWorld could I please ask you to check if inlets/outlets are in the list of template_fields, I suspect that they are not anymore templated by default...
@RamanDamayeu
As for now (tested with older apache-airflow==2.5.3
, and newer apache-airflow==2.7.3
), inlets and outlets are not templated by default. Moreover if we are trying to inherit from BaseOperator
and create custom one with inlets/outlets included in template_fields
we will not get the successful result.
For example, I created simple custom operator:
class CustomPythonOperator(BaseOperator):
template_fields = ("inlets", "outlets",)
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
def execute(self, context):
message = f"Hello World!"
print(message)
return message
Afterwards I've tried to create a task using CustomPythonOperator
, but in airflow UI I got an error:
Broken DAG: [/opt/airflow/dags/test_dags/test_issue.py] Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/serialization/serialized_objects.py", line 829, in serialize_operator
return cls._serialize_node(op, include_deps=op.deps is not BaseOperator.deps)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/serialization/serialized_objects.py", line 860, in _serialize_node
raise AirflowException(f"Cannot template BaseOperator fields: {template_field}")
airflow.exceptions.AirflowException: Cannot template BaseOperator fields: inlets
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/serialization/serialized_objects.py", line 1401, in to_dict
json_dict = {"__version": cls.SERIALIZER_VERSION, "dag": cls.serialize_dag(var)}
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/serialization/serialized_objects.py", line 1316, in serialize_dag
raise SerializationError(f"Failed to serialize DAG {dag.dag_id!r}: {e}")
airflow.exceptions.SerializationError: Failed to serialize DAG 'test_issue': Cannot template BaseOperator fields: inlets
The conclusion: inlets and outlets can not be included in template_fields
.
I guess that certain fields, including inlets and outlets, cannot be templated directly in Apache Airflow because these fields are meant to be set during the rendering phase, which occurs before the task is executed. Inlets and outlets are special attributes in Airflow, and they are processed during task instantiation, not during execution.
I configured my airflow to use ODD Plataform, setting the "odd http" connection with url, port and password (got from collector token).
After running the dag, the ODD Plataform raise some erros in log: odd_plataform.log
DAG code: dag_example.txt
Airflow version: 2.6.3