microsoft / Purview-ADB-Lineage-Solution-Accelerator

A connector to ingest Azure Databricks lineage into Microsoft Purview
MIT License
92 stars 56 forks source link

Lineage missing in purview #194

Open Kishor-Radhakrishnan opened 1 year ago

Kishor-Radhakrishnan commented 1 year ago

We have many cases where lineage is missing in purview. We will keep this issue and update logs to investigate.

23/04/05 08:27:02 ERROR EventEmitter: Could not emit lineage w/ exception java.net.SocketTimeoutException: Read timed out at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) at java.net.SocketInputStream.read(SocketInputStream.java:171) at java.net.SocketInputStream.read(SocketInputStream.java:141) at sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:476) at sun.security.ssl.SSLSocketInputRecord.readHeader(SSLSocketInputRecord.java:470) at sun.security.ssl.SSLSocketInputRecord.bytesInCompletePacket(SSLSocketInputRecord.java:70) at sun.security.ssl.SSLSocketImpl.readApplicationRecord(SSLSocketImpl.java:1369) at sun.security.ssl.SSLSocketImpl.access$300(SSLSocketImpl.java:73) at sun.security.ssl.SSLSocketImpl$AppInputStream.read(SSLSocketImpl.java:978) at io.openlineage.spark.shaded.org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:137) at io.openlineage.spark.shaded.org.apache.http.impl.io.SessionInputBufferImpl.fillBuffer(SessionInputBufferImpl.java:153) at io.openlineage.spark.shaded.org.apache.http.impl.io.SessionInputBufferImpl.readLine(SessionInputBufferImpl.java:280) at io.openlineage.spark.shaded.org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:138) at io.openlineage.spark.shaded.org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:56) at io.openlineage.spark.shaded.org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.jav [log4j-active (7).txt](https://github.com/microsoft/Purview-ADB-Lineage-Solution-Accelerator/files/11166751/log4j-active.7.txt)

log4j-active (8).txt Attached two logs from databricks with errors. Please do investigate

Kishor-Radhakrishnan commented 1 year ago

log4j-active (7).txt

hmoazam commented 1 year ago

Hi @Kishor-Radhakrishnan, was this also fixed by the changes you made to fix #193, or do you still need us to look into it?

Kishor-Radhakrishnan commented 1 year ago

@hmoazam Yes, As this error is different and we are seeing it frequent in DBR 10.4 LTS

23/04/05 08:27:02 ERROR EventEmitter: Could not emit lineage w/ exception java.net.SocketTimeoutException: Read timed out at java.net.SocketInputStream.socketRead0(Native Method) at

hmoazam commented 1 year ago

Hi Kishor, could you please share what data sources you're reading from and writing to when you're getting this error?

wjohnson commented 1 year ago

@Kishor-Radhakrishnan would you please try this updated branch to help us collect more information on this issue?

https://github.com/microsoft/Purview-ADB-Lineage-Solution-Accelerator/tree/hotfix/maxQueryPlanOLIn

If you could build and install this version and then run a notebook that is failing to send lineage even after our last fix.

If you could please monitor the logs in the OpenLineageIn function, we should see:

  1. The OpenLineage payload being sent to Event Hubs
  2. The error message of the event being too large.

If you could provide us that full open lineage payload and the error message, we can troubleshoot further and find a fix for this issue.

Kishor-Radhakrishnan commented 1 year ago

@wjohnson attaching log having this error from monitor openlineage.txt.zip

wjohnson commented 1 year ago

@Kishor-Radhakrishnan thank you so much for this! Would you be able to share any snippet of code on how you're reading the /mnt/lob_shippingcycle/active/gcss/value_added_service_booking/ path?

In this case, it appears to not recognize the partitions of the date_part column and results in a large number of inputs and a large number of metadata for column lineage.

I think there are two approaches to solving this.

  1. (Long Term) Understanding why OpenLineage isn't recognizing the date_part partitions as a single input table.
  2. (Short Term) Adding a feature to the Databricks to Purview Solution Accelerator that limits the size of the column lineage metadata.
    • We could add an app setting named maxColumnLineageSize that would behave similarly to maxQueryPlanSize.
Kishor-Radhakrishnan commented 1 year ago

@wjohnson I am adding a snippet of code where write occur.

`def write_dataframe(df,tgt_path,part_col): try: if part_col != "NA": ( df .write .format("delta") .mode("overwrite") .partitionBy(part_col)
.option("mergeSchema","true") .option("overwriteSchema","true") .save(tgt_path) ) print("Data successfully written as partitioned path") else: ( df .write .mode("overwrite") .format("delta") .option("overwriteSchema","true") .save(tgt_path) ) print("Data successfully written as non partitioned path")

except Exception as exception: logger.error("Error in write_dataframe function") raise Exception('Exception while calling function write_dataframe : ',exception)

def full_load(tgt_tbl,src_data_path,cntrl_tbl,tgt_delta_path,table_type,delete_flag,part_col,opt_flg): try: src_format = get_source_data_format(src_data_path) print("Source data format is {0} ".format(src_format)) src_incremental_df = get_source_data_full(src_data_path,rownumber,delete_flag,src_format) print("source data created") write_dataframe(src_incremental_df,tgt_delta_path,part_col) spark.sql("optimize delta.{target_data_path}".format(target_data_path = tgt_delta_path)) print("optimize command completed") spark.sql("""insert into {control_table} PARTITION (TABLE_NAME) select '{table_typ}','{target_table}',cast(current_date as timestamp),cast(current_date as string),'INITIAL LOAD' """.format(control_table=cntrl_tbl,target_table=tgt_tbl,table_typ=table_type)) print("Successfully completed the full load for table {0}".format(tgt_tbl)) except Exception as exception: logger.error("Error in full_load function") raise Exception('Exception while calling function full_load : ',exception)

`

Kishor-Radhakrishnan commented 1 year ago

also we face this issue for many different notebooks may be . we need to see why partition is not identified properly.

May we can try short term solution of column lineage skip to see if it capture whole dataset level lineage.

wjohnson commented 9 months ago

We are adding the column lineage removal as part of the next release.