open-metadata / OpenMetadata

OpenMetadata is a unified metadata platform for data discovery, data observability, and data governance powered by a central metadata repository, in-depth column level lineage, and seamless team collaboration.
https://open-metadata.org
Apache License 2.0
5.43k stars 1.03k forks source link

Custom Pipeline Status #18252

Open benoitcharmettant opened 1 week ago

benoitcharmettant commented 1 week ago

Is your feature request related to a problem? Please describe. It is not currently possible to specify a pipeline status from the API.

In my case I am building a custom connector to my orchestrator service (Mage). I would like to include track the pipeline status (of the last 10 runs lets say). The CreatePipelineRequest does not allow the specification of a pipeline run status

Describe the solution you'd like I'm not sure how pipeline statuses are handled in OpenMetadata official connectors (like airflow) but in my case someting like :

CreatePipelineRequest(
    name=pipe.get("uuid"),
    displayName=pipe.get("name"),
    service=service_entity.fullyQualifiedName,
    description=pipe.get("description"),
    sourceUrl=f"{self.base_url}/pipelines/{pipe.get('uuid')}/triggers",
    tasks=pipe.tasks,
    pipelineStatus=[
       PipelineStatus(
       timestamp=statuses[0].timestamp,
       executionStatus=StatusType.successful,
       taskStatus=[t.status for t in pipe.runs[-1].tasks]
     ),
    PipelineStatus(
       timestamp=statuses[0].timestamp,
       executionStatus=StatusType.failed,
       taskStatus=[t.status for t in pipe.runs[-2].tasks]
     ),
    ....
    ]
 )

This would either set the last 10 runs (or add runs to the list of runs added in previous ingestions)

Describe alternatives you've considered If you know any other way I could achieve this, that'll be welcome !

benoitcharmettant commented 1 week ago

I've noticed the SDK function

pipeline = metadata.add_pipeline_status(
    'fqn',
    PipelineStatus(
        timestamp=int(datetime.datetime.timestamp(datetime.datetime(2024,10,12,12,00,00))),
        executionStatus=StatusType.Successful,
        taskStatus=[
            TaskStatus(name="name", executionStatus=StatusType.Failed),
            TaskStatus(name="name", executionStatus=StatusType.Successful)
            ]
        )
)

But it doesn't seem to update the pipeline. It leads to an inconsistent behavior in the UI. The color of each task in the DAG window reflects the task status that I've defined, but no execution appear in the Execution tab

Moreover, if I query the same pipeline after the update using :

pipeline_entity = metadata.get_by_name(entity=Pipeline, fqn='fqn')

I don't get any the pipeline_entity is None

I'm using version 1.5.5 of openmetadata