OHDSI / WebAPI

OHDSI WebAPI contains all OHDSI services that can be called from OHDSI applications
Apache License 2.0
130 stars 169 forks source link

Non-fatal error in SQL construction for Pathway analysis on Spark (e.g. Databricks) #2275

Closed TomWhite-MedStar closed 1 year ago

TomWhite-MedStar commented 1 year ago

Expected behavior

No generated queries throw errors

Actual behavior

This query sent to Spark (e.g. Databricks): show columns in @target_database_schema.pathway_analysis_paths without substituting @target_database_schema. Note that it is a non-fatal error -- the Pathway analyses complete. However, the generated SQL does not include the specified list of columns, which may make the process brittle.

Steps to reproduce behavior

If Atlas is pointing to a DataBricks SQL Warehouse, it logs all queries. This query occurs immediately before an insert into pathway_analysis_paths. However, sometimes the failed query is followed by a successful query.

The show columns command appears to be generated by the getMetaFields() function in BigQuerySparkTranslate.java, which is called from sparkInsert(). getMetaFields() is called within a try/catch statement, so does not cause Pathway Analysis to fail when it fails. Somehow the target_table parameter is not substituted sometimes.

chrisknoll commented 1 year ago

Hmmm...I'll need more details, ie which query is being constructed , or why it is calling show columns in .... Sounds like it's trying to do a table upload? I'm not exactly sure....but let me do some digging. This analysis works like many of the other CDM analysis queries where we just construct sql and execute it on the source.

TomWhite-MedStar commented 1 year ago

@chrisknoll , I believe this may be called from savePaths.sql. However, pasting that query into SqlDeveloper, one does not see the show columns command. It appears that it is generated internally by SqlRender.

chrisknoll commented 1 year ago

So, the saves path is a very standard insert into ... select from that is used in different analysis SQL around the application (like cohort defs will insert the final cohort episodes into the results.cohort table).

What I should do is look where savePaths.sql is invoked, it's possible that there's some pre-work being done that would lead to that show columns command.

chrisknoll commented 1 year ago

So, I found this line, and it's doing a spark specific call to sparkHandleInsert, which I think does those extra steps and I don't understand it because it should just be translating sql and I don't know why there's special steps for inserts.

@anton-abushkevich and @ssuvorov-fls , the git history shows you added work to support Spark, can you let us know what's going on in this code and hep us understand why it has to touch the database while translating an insert statement?

chrisknoll commented 1 year ago

Just to follow up to my own question, the call over to sparkHandleInsert takes a connection string, and then passes a connection over to the other sparkhandleInsert call, but never does anything with the connection. I think we should remove that from the call and not open the connection while parsing the sql.

@anton-abushkevich and @ssuvorov-fls can you please provide your input?

ssuvorov-fls commented 1 year ago

@chrisknoll connection is used to get information about table structure to create "with ... as ... insert"

chrisknoll commented 1 year ago

Ok, I thought it wasn't used, but I see it now where it calls out to sparkInsert. It feels very strange that we can't do a SqlTranslate on a SQL string without having a connection to the database in spark, but I think i understand that there's some limitation with Spark where we need to construct an insert statement as:

with cte_wrapper(col1, col2, col3) as (
... some select query to fetch insert data ...
)
insert into @targetTable
select col1, col2, col3

I believe the limitation was that we can't use the insert into table (col1, col2, col3) select col1, col2, col3 ... syntax (ie: we can't specify the columns that we are inserting in the table) in DataBricks. @TomWhite-MedStar , is that still a limitation of google BigQuery or Spark? Because, the lack of support of that leads us to this (imo) convoluted operation where we get an insert statement with 3 columns, but the table has 4 columns and I think this piece of code has to 'fill in' blanks or select columns in the correct order to work around that we can't specify which columns in the table are assigned to wich values from the select statement.

With all due respect to Spark and GoogleBigQuery, I feel this limitation should disqualify that platform from use in the OHDSI stack....not being able to do a 1insert into T (col1, col2) select col1, col2 from Y` is a severe limitation...we insert data into tables all over the place.

chrisknoll commented 1 year ago

@TomWhite-MedStar , the answer to the question about where 'show columns' is here. While it's trying to process an insert statement to rewrite it to folow the CTE form (described above) it needs to get the columsn in the table so that the INSERT INTO T (col1,col2) can be re-written to include the necessary columns for Spark.

I see now where the problem is in PathwayStatisticsTasklet:

        String sql = SAVE_PATHS_SQL;
        if (source.getSourceDialect().equals("spark")) {
            sql = BigQuerySparkTranslate.sparkHandleInsert(sql, source.getSourceConnection());
        }

        PreparedStatementRenderer pathwayEventsPsr = new PreparedStatementRenderer(
                source,
                sql,
                new String[]{"target_database_schema"},
                new String[]{source.getTableQualifier(SourceDaimon.DaimonType.Results)},
                new String[]{GENERATION_ID},
                new Object[]{generationId}
        );

The params aren't being replaced in the case of spark. So need to add the part to render the sql with the correct params.

chrisknoll commented 1 year ago

@TomWhite-MedStar can you try this PR and see if that addresses your issue?