OHDSI / WebAPI

OHDSI WebAPI contains all OHDSI services that can be called from OHDSI applications
Apache License 2.0
126 stars 156 forks source link

Generating Cohort Pathways fails on Databricks jdbc endpoint #2281

Closed guybartal closed 8 months ago

guybartal commented 1 year ago

Expected behavior

Generating Cohort Pathway should work on Databricks jdbc endpoint (tested on MS SQL and Azure Synapse and it works).

Actual behavior

Fails with the following error:

java.util.concurrent.CompletionException: org.springframework.jdbc.UncategorizedSQLException: StatementCallback; uncategorized SQLException for SQL [WITH insertion_temp AS (
(SELECT 979617051 as design_hash, person_id, start_date, end_date 
FROM temp.ljy97wl2final_cohort CO
) UNION ALL (SELECT design_hash, subject_id, cohort_start_date, cohort_end_date FROM results.cohort_cache ))
INSERT OVERWRITE TABLE results.cohort_cache SELECT * FROM insertion_temp]; SQL state [HY000]; error code [500051]; [Simba][SparkJDBCDriver](500051) ERROR processing query/statement. Error Code: 0, SQL state: org.apache.hive.service.cli.HiveSQLException: Error running query: io.delta.exceptions.ConcurrentAppendException: Files were added to the root of the table by a concurrent update. Please try the operation again.
Conflicting commit: {"timestamp":1684764324215,"operation":"WRITE","operationParameters":{"mode":Overwrite,"partitionBy":[]},"readVersion":383,"isolationLevel":"WriteSerializable","isBlindAppend":false,"operationMetrics":{"numFiles":"44","numOutputRows":"102839223","numOutputBytes":"692159545"},"engineInfo":"Databricks-Runtime/12.2.x-scala2.12","txnId":"6f0b5601-2a84-4435-a6fc-3948e4f54971"}
Refer to https://docs.microsoft.com/azure/databricks/delta/concurrency-control for more details.
    at org.apache.spark.sql.hive.thriftserver.HiveThriftServerErrors$.runningQueryError(HiveThriftServerErrors.scala:56)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.$anonfun$execute$1(SparkExecuteStatementOperation.scala:609)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.unity.EmptyHandle$.runWith(UCSHandle.scala:124)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:501)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.$anonfun$run$2(SparkExecuteStatementOperation.scala:361)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.IdentityClaim$.withClaim(IdentityClaim.scala:48)
    at org.apache.spark.sql.hive.thriftserver.ThriftLocalProperties.withLocalProperties(ThriftLocalProperties.scala:156)
    at org.apache.spark.sql.hive.thriftserver.ThriftLocalProperties.withLocalProperties$(ThriftLocalProperties.scala:51)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.wi

Steps to reproduce behavior

konstjar commented 1 year ago

Can you please try a version from master branch? Where were some fixes related to Databricks dialect submitted. Maybe it will fix your problem.

chrisknoll commented 1 year ago

I think @konstjar is correct but I will call out something that we saw with certain JDBC drivers for redshift (there's an open issue about it somewhere), but the issue was related to writing to cohort_cache (which is our caching mechanism to cohort generation). I would definitely go with @konstjar suggestion first, but also there may be a 'leaking transaction' or some sort of cache-insert-conflict behavior happening where the cohort is being generated twice and the cache-inserts are leading to this conflict.

TomWhite-MedStar commented 9 months ago

@chrisknoll and @konstjar , we're seeing the same issue when we try to generate multiple different cohorts concurrently (on the latest release: WebAPI 2.13.0 and Atlas 2.13.0).

The failing queries are like:

INSERT
  OVERWRITE TABLE omop_results.cohort_inclusion_stats_cache
SELECT
  *
FROM
  omop__results.cohort_inclusion_stats_cache
WHERE
  NOT (
    design_hash = -1114425973
    and mode_id = 1
  )

and

WITH insertion_temp AS (
  (
    SELECT
      662387645 as design_hash,
      person_id,
      start_date,
      end_date
    FROM
      tmp_v0907.pel5lbbbfinal_cohort CO
  )
  UNION ALL
    (
      SELECT
        design_hash,
        subject_id,
        cohort_start_date,
        cohort_end_date
      FROM
        omop_results.cohort_cache
    )
)
INSERT
  OVERWRITE TABLE omop_results.cohort_cache (
    design_hash,
    subject_id,
    cohort_start_date,
    cohort_end_date
  )
SELECT
  *
FROM
  insertion_temp

In both cases, it appears that the use of INSERT OVERWRITE TABLE may be causing the issue, rather than calling a DELETE statement followed by an INSERT.

chrisknoll commented 9 months ago

Ok, this behavior goes back to a very very old discussion about DMBS support...(and I did object to this approach at the time):

There were some DBMS platforms that did not support the delete operator on a table, instead you would re-create the table by inserting everything EXCEPT the rows you meant to delete. My original objection is exactly what you raised here: what if there are 2 sessions doing the same thing? Which delete wins?

So, this is buried in the bowels of SqlRender, but this was done at least 5 years ago if I'm mistaken, and since then, maybe those platforms have support for DELETE FROM operations now? @TomWhite-MedStar , are you saying that DataBricks does allow the DELETE FROM {table} where ... syntax? If so, we can resolve this in SqlRender.

Edit: here's my post! I bet that sand milkshake sounds pretty good about now.

TomWhite-MedStar commented 9 months ago

@chrisknoll , if you can give me examples of DELETE FROM {table} syntax that needs to be supported, I'm happy to test them.

I just did the following and it worked fine on Databricks:

create table tmp.cohort_copy as select * from results.cohort where cohort_id in (305, 309, 581, 582);
delete from tmp.cohort_copy  where cohort_id in (581, 582);
chrisknoll commented 9 months ago

That would be it. Usually it's just a simple 'delete from table where col = value' to drop prior results.

If you can do DELETE FROM, then we need to figurout the sql render modification that will not substitute the DELETE FROM with INSERT OVERWRITE.

Edit: I think there is a case of a dual-column delete in the first exaple you gave:

WHERE
  NOT (
    design_hash = -1114425973
    and mode_id = 1
  )

This was origionally 'DELETE FROM .... WHERE design_hash = xxx and mode_id = 1'.

TomWhite-MedStar commented 9 months ago

I confirmed that DELETE FROM .... WHERE design_hash = xxx and mode_id = 1 works on Databricks.

chrisknoll commented 9 months ago

Ok, I'd suggest open an issue on sqlRender to make the change there, then the fix to this issue will be to update WebAPI dependency on SqlRender to the new version that fixes it.