open-metadata / OpenMetadata

OpenMetadata is a unified metadata platform for data discovery, data observability, and data governance powered by a central metadata repository, in-depth column level lineage, and seamless team collaboration.
https://open-metadata.org
Apache License 2.0
5.18k stars 985 forks source link

Airflow Lineage: OpenMetadataLineageOperator only sets singular inlets #11752

Closed zach-all closed 1 year ago

zach-all commented 1 year ago

Affected module Ingestion Framework: OpenMetadataLineageOperator

Describe the bug When defining multiple inlets per outlet in Airflow Tasks, only one (random) inlet of those defined is shown in the lineage graph of the related outlet table.

To Reproduce

from datetime import datetime

from airflow.operators.empty import EmptyOperator
from airflow_provider_openmetadata.lineage.operator import OpenMetadataLineageOperator
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import OpenMetadataConnection
from metadata.generated.schema.security.client.openMetadataJWTClientConfig import OpenMetadataJWTClientConfig

with Dag(
    dag_id="lineage_test",
    start_date=datetime(2023, 5, 1),
    schedule_interval="0 0 * * *",
    default_args={},
    owner="dataops",
) as dag:
    ometa_connection = OpenMetadataConnection(
        hostPort="",
        authProvider="openmetadata",
        securityConfig=OpenMetadataJWTClientConfig(
            jwtToken=""
        ),
    )

    add_lineage = OpenMetadataLineageOperator(
        task_id='add_lineage',
        depends_on_past=False,
        server_config=ometa_connection,
        service_name="airflow_lineage_op_service",
        only_keep_dag_lineage=True,
    )

    BASE = "aws_redshift.database.base"
    FCT = "aws_redshift.database.fct"
    t1 = EmptyOperator(
        task_id="build_cancel_rate_per_studio",
        inlets={
            "t1": [f"{BASE}.customer", f"{BASE}.appointment"]
        },
        outlets={
            "t1": [f"{FCT}.cancel_rate_per_studio"]
        },
    )

    t2 = EmptyOperator(
        task_id="build_checkin_pd",
        inlets={
            "t2": [f"{BASE}.address", f"{BASE}.checkin"]
        },
        outlets={
            "t2": [f"{FCT}.checkin_pd"]
        },
    )

    [t1, t2] >> add_lineage

Its logging multiple inlets per outlet in the task..

[2023-05-24, 14:22:40 UTC] {{lineage_parser.py:126}} INFO - Found inlets {'t1': ['aws_redshift.database.base.customer', 'aws_redshift.database.base.appointment']} in task build_cancel_rate_per_studio
[2023-05-24, 14:22:40 UTC] {{lineage_parser.py:126}} INFO - Found outlets {'t1': ['aws_redshift.database.fct.cancel_rate_per_studio']} in task build_cancel_rate_per_studio
[2023-05-24, 14:22:40 UTC] {{lineage_parser.py:126}} INFO - Found inlets {'t2': ['aws_redshift.database.base.address', 'aws_redshift.database.base.checkin']} in task build_checkin_pd
[2023-05-24, 14:22:40 UTC] {{lineage_parser.py:126}} INFO - Found outlets {'t2': ['aws_redshift.database.fct.checkin_pd']} in task build_checkin_pd
[2023-05-24, 14:22:40 UTC] {{operator.py:66}} INFO - Extracted the following XLet data from the DAG: [XLets(inlets={'aws_redshift.database.base.customer', 'aws_redshift.database.base.appointment'}, outlets={'aws_redshift.database.fct.cancel_rate_per_studio'}), XLets(inlets={'aws_redshift.database.base.checkin', 'aws_redshift.database.base.address'}, outlets={'aws_redshift.database.fct.checkin_pd'})]
[2023-05-24, 14:22:40 UTC] {{runner.py:333}} INFO - Executing Airflow Lineage Runner...
[2023-05-24, 14:22:40 UTC] {{runner.py:152}} INFO - Creating or updating Pipeline Entity from DAG...
[2023-05-24, 14:22:40 UTC] {{runner.py:338}} INFO - Processing XLet data [XLets(inlets={'aws_redshift.database.base.customer', 'aws_redshift.database.base.appointment'}, outlets={'aws_redshift.database.fct.cancel_rate_per_studio'}), XLets(inlets={'aws_redshift.database.base.checkin', 'aws_redshift.database.base.address'}, outlets={'aws_redshift.database.fct.checkin_pd'})]
[2023-05-24, 14:22:40 UTC] {{runner.py:341}} INFO - Got some xlet data. Processing lineage for inlets={'aws_redshift.database.base.customer', 'aws_redshift.database.base.appointment'} outlets={'aws_redshift.database.fct.cancel_rate_per_studio'}
[2023-05-24, 14:22:40 UTC] {{runner.py:344}} INFO - `only_keep_dag_lineage` is set to True. Cleaning lineage not in inlets or outlets...
[2023-05-24, 14:22:40 UTC] {{runner.py:341}} INFO - Got some xlet data. Processing lineage for inlets={'aws_redshift.database.base.checkin', 'aws_redshift.database.base.address'} outlets={'aws_redshift.database.fct.checkin_pd'}
[2023-05-24, 14:22:40 UTC] {{runner.py:344}} INFO - `only_keep_dag_lineage` is set to True. Cleaning lineage not in inlets or outlets...
[2023-05-24, 14:22:40 UTC] {{runner.py:313}} INFO - Removing upstream edge with aws_redshift.database.base.customer
[2023-05-24, 14:22:40 UTC] {{lineage_mixin.py:146}} ERROR - Error 404 trying to DELETE linage for fromEntity=EntityReference(id=Uuid(__root__=UUID('98f0efe1-9154-4303-9d33-edf13ea735ce')), type='table', name=None, fullyQualifiedName=None, description=None, displayName=None, deleted=None, href=None) toEntity=EntityReference(id=Uuid(__root__=UUID('bc4fc16b-a611-4cb6-8341-65bf98a637d6')), type='pipeline', name=None, fullyQualifiedName=None, description=None, displayName=None, deleted=None, href=None) description=None lineageDetails=None
[2023-05-24, 14:22:40 UTC] {{runner.py:322}} INFO - Removing downstream edge with aws_redshift.database.fct.cancel_rate_per_studio
[2023-05-24, 14:22:40 UTC] {{lineage_mixin.py:146}} ERROR - Error 404 trying to DELETE linage for fromEntity=EntityReference(id=Uuid(__root__=UUID('bc4fc16b-a611-4cb6-8341-65bf98a637d6')), type='pipeline', name=None, fullyQualifiedName=None, description=None, displayName=None, deleted=None, href=None) toEntity=EntityReference(id=Uuid(__root__=UUID('82b68176-7560-4258-8b83-f8151f008fc1')), type='table', name=None, fullyQualifiedName=None, description=None, displayName=None, deleted=None, href=None) description=None lineageDetails=None
[2023-05-24, 14:22:40 UTC] {{taskinstance.py:1401}} INFO - Marking task as SUCCESS. dag_id=lineage_test, task_id=add_lineage, execution_date=20230523T000000, start_date=20230524T142239, end_date=20230524T142240

..But then finally only one inlet is shown in the Lineage graph image

Expected behavior It should show every inlet that builds up an outlet table in the lineage graph.

Version:

Additional context Tried setting only_keep_dag_lineage=False but didnt change anything, so I dont think its related to the ERROR trying to DELETE lineage logs..

zach-all commented 1 year ago

Tested things on 1.0.2: On each Airflow DAG Run it will add one inlet. So you could run the dag n times to make it add all n inlets ^^

But another problem is: When changing the inlets, the previous inlets are still there.

    t2 = EmptyOperator(
        task_id="build_checkin_pd",
        inlets={
            "t2": [f"{BASE}.new_booking_entry", f"{BASE}.customer"]
        },
        outlets={
            "t2": [f"{FCT}.checkin_pd"]
        },
    ) 

After 2 runs: image

pmbrull commented 1 year ago

thanks for the details. I'm able to reproduce it, on it now