Open vivekrathiave opened 4 months ago
@vivekrathiave I'm happy that the library is helping you! Can you provide 1 or 2 examples of stored procedures please? I'll use them as unit tests to guide the implementation.
Sure, I can not quote original sp, but here is a pseudo query
Insert into A (col1, col2,col3) Select * from B;
MERGE INTO C USING A on C.col1 = A.col1 WHEN NOT MATCHED BY TARGET THEN INSERT (col1,col2,col3) VALUES (A.col1, A.col2,A.col3) WHEN NOT MATCHED BY SOURCE THEN UPDATE SET C.col2=A.col2, C.col3=A.col3
The Lineage of A is tracked by sp from table b, but not to table c where records are getting merged by this sp.
I hope this helps.
Awesome! I have a working solution. Tomorrow I'll merge and deploy the new version of the library. I'll leave a comment when it will be done.
@vivekrathiave I released the version 1.12.0 You can test it and let me know if it works for your stored procedures. Thank you very much for your feedback, merge statements are an important part that I missed.
Awesome, Thank you. Will try it out.
Hi @ivan-ostymchuk , Just tested and it works, though during the test I noticed one specific scenario
With s AS ( Select col1, col2, col3 from A ) MERGE INTO C USING s on C.col1 = s.col1 WHEN NOT MATCHED BY TARGET THEN INSERT (col1,col2,col3) VALUES (s.col1, s.col2,s.col3) WHEN NOT MATCHED BY SOURCE THEN UPDATE SET C.col2=s.col2, C.col3=s.col3
The lineage shows up from s to C instead of A to C. Just a minor issue, but wanted to make you aware of the scenario.
Hi @vivekrathiave, thank you, I'll work on it and fix it soon.
@vivekrathiave I released the version 1.13.0 with the fixes, If you'll test it please let me know if it works for your stored procedures.
Thank you @ivan-ostymchuk Will give it a try
Hi @ivan-ostymchuk , Just tested the new version, There seems to be a problem with new version There is a warning generated:
Parsing the stored procedures WARNING: unable to parse stored procedure name
And also after this , it is going into infinite loop somewhere, with full cpu usage and never ending process.
@vivekrathiave do you know on which stored procedure it goes into infinite loop? Could you provide a snippet of it please? Meanwhile I'll try to figure it out
this may be due to the unnestTempTables function? // Code
func unnestTempTables(tempTablesLineage map[string]spLineage) map[string]spLineage {
fmt.Println("Starting unnestTempTables")
// Create a new map to store the results
result := make(map[string]spLineage)
// Copy all temporary tables to the result
for key, lineage := range tempTablesLineage {
result[key] = lineage
}
// Expanding the Temporary Tables
for sink, lineage := range result {
fmt.Printf("Processing lineage: %s\n", sink)
for index := 0; index < len(lineage.sources); index++ {
source := lineage.sources[index]
if tmp, ok := result[source]; ok {
fmt.Printf("Unnesting source: %s\n", source)
lineage.sources = append(lineage.sources[:index], lineage.sources[index+1:]...)
lineage.sources = append(lineage.sources, tmp.sources...)
index = -1 // Restarting the loop for updated sources
}
}
cleanedSources := removeDuplicatesAndEmpty(lineage.sources)
lineage.sources = cleanedSources
result[lineage.sink] = lineage
}
fmt.Println("Ending unnestTempTables")
return result
}
func removeDuplicatesAndEmpty(sources []string) []string {
cleanedSources := make([]string, 0)
seen := make(map[string]bool)
for _, source := range sources {
if len(source) > 0 && !seen[source] {
cleanedSources = append(cleanedSources, source)
seen[source] = true
}
}
return cleanedSources
}
Debug: Unnesting source: #wtis_supplier Unnesting source: #unique_suppliers Unnesting source: #vd Unnesting source: #wtis_supplier Unnesting source: #unique_suppliers Unnesting source: #vd Unnesting source: all_suppliers_w_min_date Unnesting source: #wtis_supplier Unnesting source: #unique_suppliers Unnesting source: #vd Unnesting source: #wtis_supplier Unnesting source: #unique_suppliers Unnesting source: #vd Unnesting source: all_suppliers_w_min_date Unnesting source: #wtis_supplier Unnesting source: #unique_suppliers Unnesting source: #vd Unnesting source: #wtis_supplier Unnesting source: #unique_suppliers Unnesting source: #vd Unnesting source: all_suppliers_w_min_date Unnesting source: #wtis_supplier Unnesting source: #unique_suppliers Unnesting source: #vd
This fix work?
func unnestTempTables(tempTablesLineage map[string]spLineage) map[string]spLineage {
fmt.Println("Starting unnestTempTables")
// Create a new map to store the results
result := make(map[string]spLineage)
// Copy all temporary tables to the result
for key, lineage := range tempTablesLineage {
result[key] = lineage
}
// Tracking Processed Sources
processedSources := make(map[string]bool)
// Expanding the Temporary Tables
for sink, lineage := range result {
fmt.Printf("Processing lineage: %s\n", sink)
for index := 0; index < len(lineage.sources); index++ {
source := lineage.sources[index]
if _, alreadyProcessed := processedSources[source]; alreadyProcessed {
continue // Skip already processed sources
}
if tmp, ok := result[source]; ok {
fmt.Printf("Unnesting source: %s\n", source)
lineage.sources = append(lineage.sources[:index], lineage.sources[index+1:]...)
lineage.sources = append(lineage.sources, tmp.sources...)
processedSources[source] = true
index = -1 // Restarting the loop for updated sources
}
}
cleanedSources := removeDuplicatesAndEmpty(lineage.sources)
lineage.sources = cleanedSources
result[lineage.sink] = lineage
}
fmt.Println("Ending unnestTempTables")
return result
}
func removeDuplicatesAndEmpty(sources []string) []string {
cleanedSources := make([]string, 0)
seen := make(map[string]bool)
for _, source := range sources {
if len(source) > 0 && !seen[source] {
cleanedSources = append(cleanedSources, source)
seen[source] = true
}
}
return cleanedSources
}
I just tested it , and it works amazingly well in generating the lineage. Great work. One observation, When Using Merge in stored procedure., target table is not covered in the lineage.