astronomer / astronomer-cosmos

Run your dbt Core projects as Apache Airflow DAGs and Task Groups with a few lines of code
https://astronomer.github.io/astronomer-cosmos/
Apache License 2.0
653 stars 168 forks source link

Cosmos 1.6.0 does not emit Datasets on Airflow 2.9.3 with athena-dbt #1203

Closed moritzsanne closed 1 month ago

moritzsanne commented 1 month ago

Hey Everyone, I would like to set up data aware scheduling and would like cosmos to automatically emit Airflow datasets.

My airflow env looks like this:

Here's the relevant part of the log of a cosmos run.

[2024-09-11, 16:06:41 UTC] {logging_mixin.py:190} INFO - 16:06:41
[2024-09-11, 16:06:43 UTC] {logging_mixin.py:190} INFO - 16:06:43  dbt.adapters.athena.constants adapter: Table 'mtv_vertriebs_rohdaten_download_bestandskundenpotential' has no TableType attribute - Ignoring
[2024-09-11, 16:06:48 UTC] {logging_mixin.py:190} INFO - 16:06:48  Concurrency: 1 threads (target='db__prod_general_prepared__bucket__prepared_non_pii')
[2024-09-11, 16:06:48 UTC] {logging_mixin.py:190} INFO - 16:06:48
[2024-09-11, 16:06:48 UTC] {logging_mixin.py:190} INFO - 16:06:48  1 of 1 START sql table model prod_general_prepared.data_glue_jobs_metadata ..... [RUN]
[2024-09-11, 16:06:58 UTC] {logging_mixin.py:190} INFO - 16:06:58  1 of 1 OK created sql table model prod_general_prepared.data_glue_jobs_metadata  [OK 100 in 9.38s]
[2024-09-11, 16:06:58 UTC] {logging_mixin.py:190} INFO - 16:06:58
[2024-09-11, 16:06:58 UTC] {logging_mixin.py:190} INFO - 16:06:58  Finished running 1 table model in 0 hours 0 minutes and 16.36 seconds (16.36s).
[2024-09-11, 16:06:58 UTC] {logging_mixin.py:190} INFO - 16:06:58
[2024-09-11, 16:06:58 UTC] {logging_mixin.py:190} INFO - 16:06:58  Completed successfully
[2024-09-11, 16:06:58 UTC] {logging_mixin.py:190} INFO - 16:06:58
[2024-09-11, 16:06:58 UTC] {logging_mixin.py:190} INFO - 16:06:58  Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
**[2024-09-11, 16:06:58 UTC] {local.py:196} WARNING - Artifact schema version: https://schemas.getdbt.com/dbt/manifest/v12.json is above dbt-ol supported version 7. This might cause errors.**
**[2024-09-11, 16:06:58 UTC] {local.py:389} INFO - Inlets: []
[2024-09-11, 16:06:58 UTC] {local.py:390} INFO - Outlets: []**
[2024-09-11, 16:06:58 UTC] {dag.py:3229} INFO - Sync 1 DAGs

Can anyone help identify the Problem? Is athena-dbt incompatible with open lineage? I'd like to understand where I would need to contribute to make this work.

Many thanks!

dosubot[bot] commented 1 month ago

Hey @moritzsanne! I'm here to help you with your Cosmos issue. I can assist with bugs, questions, and contributions while you wait for a human maintainer. Let's tackle this together!

I found two similar issues that might be relevant to your problem:

  1. Bug when using Airflow 2.9.1 and trying to emit Datasets - This issue was closed by a pull request that gracefully errors if users try to emit datasets with Airflow 2.9.0 or 2.9.1 [1].
  2. No datasets emitted in DbtLocalBaseOperator after Cosmos upgrade to 1.3.0 - This issue was closed by a pull request that fixed the OpenLineage integration documentation [2].

Cosmos 1.6.0 does not emit Datasets on Airflow 2.9.3 with athena-dbt due to a breaking change in Dataset URIs introduced in Apache Airflow 2.9.0 and 2.9.1. This issue is documented in the code:

except ValueError:
    raise AirflowCompatibilityError(
        """
        Apache Airflow 2.9.0 & 2.9.1 introduced a breaking change in Dataset URIs, to be fixed in newer versions:
        https://github.com/apache/airflow/issues/39486

        If you want to use Cosmos with one of these Airflow versions, you will have to disable emission of Datasets:
        By setting ``emit_datasets=False`` in ``RenderConfig``. For more information, see https://astronomer.github.io/astronomer-cosmos/configuration/render-config.html.
        """
    )

Regarding the compatibility of athena-dbt with OpenLineage, there is no explicit mention of incompatibility. However, the code does handle scenarios where OpenLineage is not available:

try:
    from airflow.datasets import Dataset
    from openlineage.common.provider.dbt.local import DbtLocalArtifactProcessor
except ModuleNotFoundError:
    is_openlineage_available = False
    DbtLocalArtifactProcessor = None
else:
    is_openlineage_available = True

And it logs a warning if OpenLineage is not available:

logger.warning(
    "To enable emitting Openlineage events, upgrade to Airflow 2.7 or install astronomer-cosmos[openlineage]."
)

To enable OpenLineage events in Cosmos when using Airflow 2.10.1, follow these steps:

  1. Install the necessary dependencies:

    pip install astronomer-cosmos[openlineage]
  2. Configure the OpenLineage processor:

    from openlineage.common.provider.dbt.local import DbtLocalArtifactProcessor
    from cosmos.constants import OPENLINEAGE_PRODUCER, LINEAGE_NAMESPACE
    
    openlineage_processor = DbtLocalArtifactProcessor(
       producer=OPENLINEAGE_PRODUCER,
       job_namespace=LINEAGE_NAMESPACE,
       project_dir=project_dir,
       profile_name=self.profile_config.profile_name,
       target=self.profile_config.target_name,
    )
    openlineage_processor.should_raise_on_unsupported_command = False
    try:
       events = openlineage_processor.parse()
       self.openlineage_events_completes = events.completes
    except (FileNotFoundError, NotImplementedError, ValueError, KeyError, jinja2.exceptions.UndefinedError):
       self.log.debug("Unable to parse OpenLineage events", stack_info=True)
  3. Handle datasets and register them:

    from airflow.datasets import Dataset
    from cosmos.exceptions import AirflowCompatibilityError
    
    def get_datasets(self, source: Literal["inputs", "outputs"]) -> list[Dataset]:
       uris = []
       for completed in self.openlineage_events_completes:
           for output in getattr(completed, source):
               dataset_uri = output.namespace + "/" + output.name
               uris.append(dataset_uri)
       self.log.debug("URIs to be converted to Dataset: %s", uris)
    
       datasets = []
       try:
           datasets = [Dataset(uri) for uri in uris]
       except ValueError:
           raise AirflowCompatibilityError(
               """
               Apache Airflow 2.9.0 & 2.9.1 introduced a breaking change in Dataset URIs, to be fixed in newer versions:
               https://github.com/apache/airflow/issues/39486
    
               If you want to use Cosmos with one of these Airflow versions, you will have to disable emission of Datasets:
               By setting ``emit_datasets=False`` in ``RenderConfig``. For more information, see https://astronomer.github.io/astronomer-cosmos/configuration/render-config.html.
               """
           )
       return datasets
    
    def register_dataset(self, new_inlets: list[Dataset], new_outlets: list[Dataset]) -> None:
       with create_session() as session:
           self.outlets.extend(new_outlets)
           self.inlets.extend(new_inlets)
           for task in self.dag.tasks:
               if task.task_id == self.task_id:
                   task.outlets.extend(new_outlets)
                   task.inlets.extend(new_inlets)
           DAG.bulk_write_to_db([self.dag], session=session)
           session.commit()
  4. Collect OpenLineage facets on task completion:

    from airflow.models.taskinstance import TaskInstance
    from openlineage.client.run import RunEvent
    
    def get_openlineage_facets_on_complete(self, task_instance: TaskInstance) -> OperatorLineage:
       inputs = []
       outputs = []
       run_facets: dict[str, Any] = {}
       job_facets: dict[str, Any] = {}
       # Populate inputs, outputs, run_facets, and job_facets as needed
       return OperatorLineage(inputs=inputs, outputs=outputs, run_facets=run_facets, job_facets=job_facets)

By following these steps, you can enable OpenLineage events in Cosmos when using Airflow 2.10.1 [3].

To continue talking to Dosu, mention @dosu.

moritzsanne commented 1 month ago

I think I identified the problem and will close this issue because it is not a cosmos problem. OpenLineage currently does not support the athena connector. Here's the PR to add support for it.