astronomer / dag-factory

Dynamically generate Apache Airflow DAGs from YAML configuration files
Apache License 2.0
1.21k stars 181 forks source link

All DAGS disappear on import failure #283

Open skiedude opened 1 week ago

skiedude commented 1 week ago
for path in Path("/usr/local/airflow/dags/yamls/").rglob('*.yaml'):
  dag_factory = dagfactory.DagFactory(path)
  dag_factory.generate_dags(globals())

Using the following code we are importing all yaml dag files at once. Airflow sees this in some areas as a "single" dag. This means if one of those yaml files has an issue being loaded, all of them disappear from the UI.

Is it possible to not have that happen? Off the top of my head, we could create one python file to import each dag individually, but that seems very repetative.

tatiana commented 1 week ago

Hi @skiedude, please could you which version of DAG Factory you're using?

We had the hope that 0.2.0 would solve this problem: https://github.com/astronomer/dag-factory/releases/tag/v0.20.0

In particular, this change: https://github.com/astronomer/dag-factory/pull/184

skiedude commented 1 week ago

Hi @skiedude, please could you which version of DAG Factory you're using?

We had the hope that 0.2.0 would solve this problem: https://github.com/astronomer/dag-factory/releases/tag/v0.20.0

In particular, this change: #184

Hey @tatiana we are using

astro@airflow-worker-7c6b6f4489-4j6xn:/usr/local/airflow$ pip freeze | grep -i factory
dag-factory==0.20.0

Which appears to be your released version with the fix. Could it be that I'm not using your method load_yaml_dags?

skiedude commented 1 week ago

Ok tried out the following code

from airflow import DAG
from dagfactory import load_yaml_dags

load_yaml_dags(globals_dict=globals())

That does work, and the bad dags disappear from the UI, however I don't see any errors at the top showing that dags failed to import and when I check

http://localhost:8080/api/v1/importErrors

{
"import_errors": [],
"total_entries": 0
}

Is there anyway to know that Dags are failing to import?

tatiana commented 1 week ago

I see your point, @skiedude. The experience would be much better if we displayed those invalid YMLs as DAG import messages in Airflow.

You should be able to identify invalid YAML files by checking the scheduler log messages. They will be similar to:

Failed to load dag from tests/fixtures/invalid_yaml.yml'

https://github.com/quydx/airlfow-dagfactory/blob/a74253d1ae036b59d9fdfe25071f416b3dbc084f/dagfactory/dagfactory.py#L202

I suggest we add two follow-up tickets out of this one:

  1. Extend the behaviour from load_yaml_dags to dag_factory.generate_dags, so it also handles gracefully invalid YML files
  2. Evaluate the possibility of DAG Factory raising an Airflow import error - so it can be displayed in the UI if the same happened to a Python DAG.
skiedude commented 6 days ago

Thanks for the followup @tatiana . My overall understanding of all the underlying architecture is rudimentary. But in my personal testing I copied the load_yaml_dags function to my import file and did the following

    for config_file_path in candidate_dag_files:
        config_file_abs_path = str(config_file_path.absolute())
        logging.info("Loading %s", config_file_abs_path)
        try:
            factory = DagFactory(config_file_abs_path)
            factory.generate_dags(globals_dict)
        except Exception:  # pylint: disable=broad-except
            logging.exception("Failed to load dag from %s", config_file_path)
            failed_imports.append(config_file_path)
        else:
            factory.emit_telemetry("load_yaml_dags")
            logging.info("DAG loaded: %s", config_file_path)

    if failed_imports:
        raise ImportError("Failed importing some dags")

From what I've observed, even if the raise is after the successful imports are done, it removes all dags because this file which is seen as the entry point for all dags threw an error.

So it seems to reason it would need to insert an import error in the database in a different way? that didn't raise an exception. I've poked around the airflow codebase a bit, but haven't found a method of inserting an importerror without just raising one.

cmarteepants commented 1 day ago

Hi @skiedude, that's a great point. I'm going to check with the Airflow team and get their thoughts on how we can log this so that the error shows up in the UI, without raising an exception and stopping the process. Stay tuned :)

tatiana commented 1 day ago

@skiedude @cmarteepants My thought was to iterate through all the YML files, identify the ones that are problematic, and raise the Import error by the end, printing out the list of YAMLs that were invalid - so the DAGs that can be loaded would be loaded, but we'd still have visibility of the errors via the Airflow UI

ashb commented 1 day ago

@skiedude @cmarteepants My thought was to iterate through all the YML files, identify the ones that are problematic, and raise the Import error by the end, printing out the list of YAMLs that were invalid - so the DAGs that can be loaded would be loaded, but we'd still have visibility of the errors via the Airflow UI

Does that work? My first thought is that ant exception at parse time results in no dags being loaded from the python file

kaxil commented 1 day ago

Also, all the Import Errors are deleted for a file, if that file is parsed correctly:

https://github.com/apache/airflow/blob/440c224af5592f9007eef43d1dbe9025aa34e177/airflow/dag_processing/processor.py#L452-L457

skiedude commented 21 hours ago

@skiedude @cmarteepants My thought was to iterate through all the YML files, identify the ones that are problematic, and raise the Import error by the end, printing out the list of YAMLs that were invalid - so the DAGs that can be loaded would be loaded, but we'd still have visibility of the errors via the Airflow UI

Does that work? My first thought is that ant exception at parse time results in no dags being loaded from the python file

This is what I have observed. Even if you raise the exception after the imports have taken place, since that file is the entry point for all of them, if it raises an exception they all still disappear.