datahub-project / datahub

The Metadata Platform for your Data and AI Stack
https://datahubproject.io
Apache License 2.0
9.89k stars 2.93k forks source link

Error emitting dbt Test metadata invoked from Astronomer Cosmos DbtTaskGroup #8260

Closed zhyzhkea closed 1 year ago

zhyzhkea commented 1 year ago

Describe the bug A clear and concise description of what the bug is.

To Reproduce Steps to reproduce the behavior:

  1. Create Airflow DAG with Astro Cosmos DbtTaskGroup:
   dbt_cosmos =  DbtTaskGroup(
        dbt_project_name="********",
        conn_id="snowflake",
        select={"configs": ["tags:master"]},
        dbt_root_path="/usr/local/airflow/dags/dbt",
        profile_name_override="********",
        target_name_override="dev",
        profile_args={
            "user":"********",
            "account":"***********",
            "warehouse":"COMPUTE_WH",
            "database":"TESTER",
            "schema":"MASTER", 
        },
        emit_datasets=True,
        dbt_args={
        "dbt_executable_path": f"{os.environ['AIRFLOW_HOME']}/dbt_venv/bin/dbt",
        },
    ) 
  1. Execute DAG
  2. Log for dbt model run shows successful emitting of Dataflow:
2023-06-19, 06:26:54 UTC] {taskinstance.py:1345} INFO - Marking task as SUCCESS. dag_id=db_s3_to_snowflake, task_id=sterling.customer.customer_run, execution_date=20230619T062214, start_date=20230619T062609, end_date=20230619T062654
[2023-06-19, 06:26:54 UTC] {base.py:73} INFO - Using connection ID 'datahub_rest_default' for task execution.
[2023-06-19, 06:26:54 UTC] {base.py:73} INFO - Using connection ID 'datahub_rest_default' for task execution.

[2023-06-19, 06:26:54 UTC] {_plugin.py:147} INFO - Emitting Datahub Dataflow: 

DataFlow(
urn=<datahub.utilities.urns.data_flow_urn.DataFlowUrn object at 0x7f88691a62f0>, 
id='db_s3_to_snowflake', 
orchestrator='airflow', 
cluster='prod', 
name=None, 
description='None\n\n', 
properties={'_access_control': 'None', 
            '_default_view': "'grid'", 
            'catchup': 'False', 
            'fileloc': "'/usr/local/airflow/dags/db_s3_snowflake_dag.py'", 
            'is_paused_upon_creation': 'None', 
            'start_date': "DateTime(2023, 5, 22, 7, 0, 0, tzinfo=Timezone('UTC'))", 
            'tags': "['airbyte']", 
            'timezone': "Timezone('America/Los_Angeles')"}, 
            url='http://localhost:8080/tree?dag_id=db_s3_to_snowflake', 
            tags={'airbyte'}, 
owners={'airflow'})

[2023-06-19, 06:26:54 UTC] {_plugin.py:165} INFO - Emitting Datahub Datajob: DataJob(id='sterling.customer.customer_run', urn=<datahub.utilities.urns.data_job_urn.DataJobUrn object at 0x7f88691a61a0>, flow_urn=<datahub.utilities.urns.data_flow_urn.DataFlowUrn object at 0x7f88692fdb70>, name=None, description=None, properties={'depends_on_past': 'False', 'email': "['zhyzhkea@hotmail.com']", 'label': "'customer_run'", 'execution_timeout': 'None', 'sla': 'None', 'task_id': "'sterling.customer.customer_run'", 'trigger_rule': "<TriggerRule.ALL_SUCCESS: 'all_success'>", 'wait_for_downstream': 'False', 'downstream_task_ids': "{'sterling.customer.customer_test'}", 'inlets': '[]', 'outlets': '[]'}, url='http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=db_s3_to_snowflake&_flt_3_task_id=sterling.customer.customer_run', tags={'airbyte'}, owners={'airflow'}, group_owners=set(), inlets=[], outlets=[], upstream_urns=[<datahub.utilities.urns.data_job_urn.DataJobUrn object at 0x7f88691a6080>])

[2023-06-19, 06:26:54 UTC] {_plugin.py:179} INFO - Emitted Start Datahub Dataprocess Instance: DataProcessInstance(id='db_s3_to_snowflake_sterling.customer.customer_run_manual__2023-06-19T06:22:14.583059+00:00', urn=<datahub.utilities.urns.data_process_instance_urn.DataProcessInstanceUrn object at 0x7f88691a79d0>, orchestrator='airflow', cluster='prod', type='BATCH_AD_HOC', template_urn=<datahub.utilities.urns.data_job_urn.DataJobUrn object at 0x7f88691a61a0>, parent_instance=None, properties={'run_id': 'manual__2023-06-19T06:22:14.583059+00:00', 'duration': '45.228359', 'start_date': '2023-06-19 06:26:09.299307+00:00', 'end_date': '2023-06-19 06:26:54.527666+00:00', 'execution_date': '2023-06-19 06:22:14.583059+00:00', 'try_number': '1', 'hostname': 'efec9bc53a52', 'max_tries': '0', 'external_executor_id': 'None', 'pid': '13606', 'state': 'success', 'operator': 'DbtRunLocalOperator', 'priority_weight': '5', 'unixname': 'astro', 'log_url': 'http://localhost:8080/log?execution_date=2023-06-19T06%3A22%3A14.583059%2B00%3A00&task_id=sterling.customer.customer_run&dag_id=db_s3_to_snowflake&map_index=-1'}, url='http://localhost:8080/log?execution_date=2023-06-19T06%3A22%3A14.583059%2B00%3A00&task_id=sterling.customer.customer_run&dag_id=db_s3_to_snowflake&map_index=-1', inlets=[], outlets=[], upstream_urns=[])

[2023-06-19, 06:26:54 UTC] {_plugin.py:191} INFO - Emitted Completed Data Process Instance: DataProcessInstance(id='db_s3_to_snowflake_sterling.customer.customer_run_manual__2023-06-19T06:22:14.583059+00:00', urn=<datahub.utilities.urns.data_process_instance_urn.DataProcessInstanceUrn object at 0x7f88691a7e50>, orchestrator='airflow', cluster='prod', type='BATCH_SCHEDULED', template_urn=<datahub.utilities.urns.data_job_urn.DataJobUrn object at 0x7f88691a61a0>, parent_instance=None, properties={}, url=None, inlets=[], outlets=[], upstream_urns=[])
[2023-06-19, 06:26:54 UTC] {local_task_job_runner.py:225} INFO - Task exited with return code 0
[2023-06-19, 06:26:55 UTC] {taskinstance.py:2651} INFO - 1 downstream tasks scheduled from follow-on schedule check
  1. Log for dbt Test shows error emitting DataFLow:
[2023-06-19, 06:27:43 UTC] {taskinstance.py:1345} INFO - Marking task as SUCCESS. dag_id=db_s3_to_snowflake, task_id=sterling.customer.customer_test, execution_date=20230619T062214, start_date=20230619T062657, end_date=20230619T062743
[2023-06-19, 06:27:43 UTC] {base.py:73} INFO - Using connection ID 'datahub_rest_default' for task execution.
[2023-06-19, 06:27:43 UTC] {base.py:73} INFO - Using connection ID 'datahub_rest_default' for task execution.

[2023-06-19, 06:27:43 UTC] {_plugin.py:147} INFO - Emitting Datahub Dataflow: 
DataFlow(
urn=<datahub.utilities.urns.data_flow_urn.DataFlowUrn object at 0x7f886938ece0>, 
id='db_s3_to_snowflake', 
orchestrator='airflow', 
cluster='prod', 
name=None, 
description='None\n\n', 
properties={'_access_control': 'None', 
            '_default_view': "'grid'", 
            'catchup': 'False', 
            'fileloc': "'/usr/local/airflow/dags/db_s3_snowflake_dag.py'", 
            'is_paused_upon_creation': 'None', 
            'start_date': "DateTime(2023, 5, 22, 7, 0, 0, tzinfo=Timezone('UTC'))", 
            'tags': "['airbyte']", 
            'timezone': "Timezone('America/Los_Angeles')"}, 
            url='http://localhost:8080/tree?dag_id=db_s3_to_snowflake', 
            tags={'airbyte'}, 
owners={'airflow'})

[2023-06-19, 06:27:43 UTC] {logging_mixin.py:149} INFO - Exception: Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/datahub_provider/_plugin.py", line 288, in custom_on_success_callback
    datahub_task_status_callback(context, status=InstanceRunResult.SUCCESS)
  File "/usr/local/lib/python3.10/site-packages/datahub_provider/_plugin.py", line 163, in datahub_task_status_callback
    datajob.outlets.append(outlet.urn)
**AttributeError: 'Dataset' object has no attribute 'urn'. Did you mean: 'uri'?**
[2023-06-19, 06:27:44 UTC] {local_task_job_runner.py:225} INFO - Task exited with return code 0
[2023-06-19, 06:27:44 UTC] {taskinstance.py:2651} INFO - 1 downstream tasks scheduled from follow-on schedule check

Expected behavior There is no error in the log and Datahub Lineage shows both run and test tasks

Screenshots

image

image

DataHub Dataset Lineage does not show an edge from Airflow to dbt:

image

image

Desktop (please complete the following information):

Additional context Please guide me in the right direction to resolve this issue. Currently, I'm running "datahub ingest -c ./dbt-recipe.yml" to ingest dbt artifacts. Thanks.

treff7es commented 1 year ago

@zhyzhkea is it possible you use Airflow's Dataset and not Datahub's Dataset in your inlets and outlets? https://github.com/datahub-project/datahub/blob/abf73b2d10b3bc00325c243fa0acaf303cb74310/metadata-ingestion/src/datahub_provider/entities.py#L17 vs https://github.com/apache/airflow/blob/79bcc2e668e648098aad6eaa87fe8823c76bc69a/airflow/datasets/__init__.py#L26

We currently only support the first one.

zhyzhkea commented 1 year ago

Unfortunately, Astronomer Cosmos DbtTaskGroup does not allow to specify inlets and outlets. DataHub plugin "acryl-datahub-airflow-plugin" emit events automatically. The dbt model run is successfully emitted and visible in the Lineage graph but dbt test run failed to emit an event [ File "/usr/local/lib/python3.10/site-packages/datahub_provider/_plugin.py", line 163, in datahub_task_status_callback datajob.outlets.append(outlet.urn) ]. Am I missing some configuration?

github-actions[bot] commented 1 year ago

This issue is stale because it has been open for 30 days with no activity. If you believe this is still an issue on the latest DataHub release please leave a comment with the version that you tested it with. If this is a question/discussion please head to https://slack.datahubproject.io. For feature requests please use https://feature-requests.datahubproject.io

github-actions[bot] commented 1 year ago

This issue was closed because it has been inactive for 30 days since being marked as stale.