open-metadata / OpenMetadata

OpenMetadata is a unified metadata platform for data discovery, data observability, and data governance powered by a central metadata repository, in-depth column level lineage, and seamless team collaboration.
https://open-metadata.org
Apache License 2.0
5.55k stars 1.05k forks source link

Lineage not working with MetadataWorkflow #17956

Open mvpkarthik1 opened 1 month ago

mvpkarthik1 commented 1 month ago

Affected module Ingestion Framework

Describe the bug running lineage ingestion from OM UI using airflow. Running the lineage using SDK. workflow = MetadataWorkflow.create(workflow_config_lineage)

{
  "id": "1ba6def5-a68f-4bdb-b623-85ba83d7a37b",
  "name": "test_bigquery_lineage_UUN4TfYs",
  "displayName": "test_bigquery_lineage_UUN4TfYs",
  "description": null,
  "pipelineType": "lineage",
  "owner": {
    "id": "ba7f8cd1-24df-431d-b52d-cc5d0f7f3232",
    "type": "user",
    "name": "admin",
    "fullyQualifiedName": "admin",
    "description": null,
    "displayName": null,
    "deleted": false,
    "href": null
  },
  "fullyQualifiedName": "test_bigquery.test_bigquery_lineage_UUN4TfYs",
  "sourceConfig": {
    "config": {
      "type": "DatabaseLineage",
      "queryLogDuration": 100,
      "queryLogFilePath": null,
      "resultLimit": 1000,
      "parsingTimeoutLimit": 300,
      "filterCondition": null,
      "schemaFilterPattern": {
        "includes": [],
        "excludes": []
      },
      "tableFilterPattern": {
        "includes": [],
        "excludes": []
      },
      "databaseFilterPattern": {
        "includes": [],
        "excludes": []
      }
    }
  },
  "openMetadataServerConnection": {
    "clusterName": "openmetadata",
    "type": "OpenMetadata",
    "hostPort": "http://openmetadata-server:8585/api",
    "authProvider": "openmetadata",
    "verifySSL": "no-ssl",
    "sslConfig": null,
    "securityConfig": {
      "jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJvcGVuLW1ldGFkYXRhLm9yZyIsInN1YiI6ImluZ2VzdGlvbi1ib3QiLCJlbWFpbCI6ImluZ2VzdGlvbi1ib3RAb3Blbm1ldGFkYXRhLm9yZyIsImlzQm90Ijp0cnVlLCJ0b2tlblR5cGUiOiJCT1QiLCJpYXQiOjE3MTU2NzY5NDUsImV4cCI6bnVsbH0.l4tpj-IIMr_2mFQ18HuZoIcMyOjgimeGCEirWfZpL4LXRkvM7oZMJ2DsFyF1jLJqB00oENzK2IRso67aGITmeCobxtLDIVIQUWyUJqeuCS1kJW54y-Kx29PBGAN24jphT23p6TIHjyQKPdOQbKWxe1to6F4gJr3Mvrp2rjMTB3pilctuF5JFe4Xxd6bB2daFdiUwHVGrB-p5u7eVrCOK_0v0EhSN9ACL2ZkCYnL-rOOWOsjcozPpX8crebQg_tFGSQJ1mcOobbNWRRdoTy9flnWO0lpUpkz_6k7XIxG7iuxaCYnRlgzMHECksiU0s7pNaI_rzPf_YnIvTcZmXWDIFA"
    },
    "secretsManagerProvider": "noop",
    "secretsManagerLoader": "noop",
    "apiVersion": "v1",
    "includeTopics": true,
    "includeTables": true,
    "includeDashboards": true,
    "includePipelines": true,
    "includeMlModels": true,
    "includeUsers": true,
    "includeTeams": true,
    "includeGlossaryTerms": true,
    "includeTags": true,
    "includePolicy": true,
    "includeMessagingServices": true,
    "enableVersionValidation": true,
    "includeDatabaseServices": true,
    "includePipelineServices": true,
    "limitRecords": 1000,
    "forceEntityOverwriting": false,
    "storeServiceConnection": true,
    "elasticsSearch": null,
    "supportsDataInsightExtraction": true,
    "supportsElasticSearchReindexingExtraction": true,
    "extraHeaders": null
  },
  "airflowConfig": {
    "pausePipeline": false,
    "concurrency": 1,
    "startDate": "2024-09-19T00:00:00+00:00",
    "endDate": null,
    "pipelineTimezone": "UTC",
    "retries": 0,
    "retryDelay": 300,
    "pipelineCatchup": false,
    "scheduleInterval": null,
    "maxActiveRuns": 1,
    "workflowTimeout": null,
    "workflowDefaultView": "tree",
    "workflowDefaultViewOrientation": "LR",
    "email": null
  },
  "service": {
    "id": "248d3a32-9d97-42cb-ae91-e42dba582ff9",
    "type": "databaseService",
    "name": "test_bigquery",
    "fullyQualifiedName": "test_bigquery",
    "description": null,
    "displayName": null,
    "deleted": false,
    "href": null
  },
  "pipelineStatuses": null,
  "loggerLevel": "INFO",
  "deployed": false,
  "enabled": true,
  "href": "http://XX.XX.XX.8585/api/v1/services/ingestionPipelines/1ba6def5-a68f-4bdb-b623-85ba83d7a37b",
  "version": 0.1,
  "updatedAt": 1726828387898,
  "updatedBy": "admin",
  "changeDescription": null,
  "deleted": false,
  "provider": "user"
}
{'source': {'type': 'bigquery-lineage', 'serviceName': 'Client_Region_Data', 'sourceConfig': {'config': {'queryLogDuration': 1000, 'resultLimit': 1000, 'parsingTimeoutLimit': 300, `'type':` 'DatabaseLineage'}}}, 'sink': {'type': 'metadata-rest', 'config': {}}, 'workflowConfig': {'loggerLevel': 'DEBUG', 'openMetadataServerConfig': {'hostPort': '***********************************', 'authProvider': 'openmetadata', 'securityConfig': {'jwtToken': '***********************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************'}}}}

Expected behavior airflow is able to get the kineage required. but SDK is not. I am not having any error msg while ingesting the data. Yet after successful completion of lineage ingestion I do not find any lineage information. Cannot see lineage while using SDK

workflow = MetadataWorkflow.create(workflow_config_lineage)
workflow.execute()
workflow.stop()

DEBUG logs:

[2024-09-23 15:39:55] INFO     {metadata.Ingestion:lineage_source:75} - Scanning query logs for 2024-09-08 - 2024-09-24
[2024-09-23 15:40:00] DEBUG    {metadata.Utils:sql_lineage:393} - Running lineage with query: INSERT INTO `rightdata-1376`.rddatasets.Client_Region_Data (
    Client_ID,
    Client_Name,
    Client_Address,
    Client_Phone,
    Client_Balance,
    Client_Segment,
    Nation_Name,
    Region_Name,
    Comment
)
SELECT 
    C.C_CUSTKEY AS Client_ID,
    C.C_NAME AS Client_Name,
    C.C_ADDRESS AS Client_Address,
    C.C_PHONE AS Client_Phone,
    C.C_ACCTBAL AS Client_Balance,
    C.C_MKTSEGMENT AS Client_Segment,
    N.N_NAME AS Nation_Name,
    R.R_NAME AS Region_Name,
    C.C_COMMENT AS Comment
FROM `rightdata-1376`.tpch_1.Customer C
JOIN `rightdata-1376`.tpch_1.Nation N
    ON C.C_NATIONKEY = N.N_NATIONKEY
JOIN `rightdata-1376`.tpch_1.Region R
    ON N.N_REGIONKEY = R.R_REGIONKEY
[2024-09-23 15:40:03] DEBUG    {metadata.Utils:sql_lineage:393} - Running lineage with query: INSERT INTO `rightdata-1376`.STAGE_Finance.Client (
    Client_ID,
    Client_Name,
    Client_Type,
    Client_Industry,
    Client_Region,
    Client_Segment
)
SELECT 
    C_CUSTKEY AS Client_ID,
    C_NAME AS Client_Name,
    -- Assuming you don't have exact data for the following columns, you can set them as NULL or use placeholders
    'Unknown' AS Client_Type,
    'Unknown' AS Client_Industry,
    'Unknown' AS Client_Region,
    C_MKTSEGMENT AS Client_Segment
FROM `rightdata-1376`.tpch_1.Customer
[2024-09-23 15:40:04] DEBUG    {metadata.Utils:sql_lineage:393} - Running lineage with query: INSERT INTO `rightdata-1376`.STAGE_Finance.Client (
    Client_ID,
    Client_Name,
    Client_Type,
    Client_Industry,
    Client_Region,
    Client_Segment
)
SELECT 
    C.C_CUSTKEY AS Client_ID,
    C.C_NAME AS Client_Name,
    'Unknown' AS Client_Type,  -- Placeholder, adjust as needed
    'Unknown' AS Client_Industry,  -- Placeholder, adjust as needed
    N.N_NAME AS Client_Region,  -- Using Nation name as Client_Region
    C.C_MKTSEGMENT AS Client_Segment
FROM `rightdata-1376`.tpch_1.Customer C
JOIN `rightdata-1376`.tpch_1.Nation N
    ON C.C_NATIONKEY = N.N_NATIONKEY
[2024-09-23 15:40:04] DEBUG    {metadata.Utils:sql_lineage:393} - Running lineage with query: 
INSERT INTO `rightdata-1376.rddatasets.Client_Region_Data` (
    Client_ID,
    Client_Name,
    Client_Address,
    Client_Phone,
    Client_Balance,
    Client_Segment,
    Nation_Name,
    Region_Name,
    Comment
)
SELECT 
    C.C_CUSTKEY AS Client_ID,
    C.C_NAME AS Client_Name,
    C.C_ADDRESS AS Client_Address,
    C.C_PHONE AS Client_Phone,
    C.C_ACCTBAL AS Client_Balance,
    C.C_MKTSEGMENT AS Client_Segment,
    N.N_NAME AS Nation_Name,
    R.R_NAME AS Region_Name,
    C.C_COMMENT AS Comment
FROM `rightdata-1376`.tpch_1.Customer C
JOIN `rightdata-1376`.tpch_1.Nation N
    ON C.C_NATIONKEY = N.N_NATIONKEY
JOIN `rightdata-1376`.tpch_1.Region R
    ON N.N_REGIONKEY = R.R_REGIONKEY

Version:

Additional context Add any other context about the problem here.