astronomer / astro-sdk

Astro SDK allows rapid and clean development of {Extract, Load, Transform} workflows using Python and SQL, powered by Apache Airflow.
https://astro-sdk-python.rtfd.io/
Apache License 2.0
348 stars 43 forks source link

Open lineage bug workaround #1336

Open sunank200 opened 1 year ago

sunank200 commented 1 year ago

Describe the bug Message from @conorbev So while waiting for the release I tried it out from main and I do see the TransformOperator producing Inputs and Outputs now, I also did not need to set the custom extractor env var. It seemed to work fine with the default extractor. Very cool!

(https://astronomer.slack.com/archives/D044NMSCBTR/p1669860239092409) I think I've found the reason why the SQL that the TransformOperator was supplying in the facet did not show up. It's because the SQL facet is only being supplied in the COMPLETE OpenLineage event and you are running into this bug: https://github.com/MarquezProject/marquez/issues/2230 ( I confirmed this by manually copying your SQL facet into the START event that you send and then the SQL does show up as expected: https://cloud.astronomer.io/lineage/graph/job/conor-astrosdk4/calculate_popular_movies.top_five_animations ) #2230 Only first job context is taken into consideration.

So I think we certainly need to fix that bug in Marquez either way, but in the meantime, I wanted to check with you: Is it possible for you to set the facets during the START event or you only know it at the end ?

sunank200 commented 1 year ago

Currently task_instance is not available on get_openlineage_facets_on_start() on DefaultExtractor. sql is accessible using task_instance object as its template field is rendered in execute method.

    def get_openlineage_facets_on_start(self):
        from astro.lineage import (
            BaseFacet,
            OperatorLineage,
            SqlJobFacet,
        )
        base_sql_query = task_instance.xcom_pull(task_ids=task_instance.task_id, key="base_sql_query")
        job_facets: dict[str, BaseFacet] = {"sql": SqlJobFacet(query=base_sql_query)}

        return OperatorLineage(
            inputs=[], outputs=[], run_facets={}, job_facets=job_facets
        )

So I think fix on Default extractor would be required for this suggested workaround to work

cc: @conorbev

sunank200 commented 1 year ago

This is still blocked on OL side.

sunank200 commented 1 year ago

This issue is closed now on OL side so we should test this out now.