coderxio / sagerx

Open drug data pipelines curated by pharmacists.
https://coderx.io/sagerx
Other
41 stars 11 forks source link

RxNorm HIstorical throws error d/t not being DROP CASCADE #258

Closed jrlegrand closed 1 month ago

jrlegrand commented 4 months ago

Problem Statement

RxNorm Historical runs fine but when it adds to database on second run, it throws an error b/c other views/tables depend on it. We are using if_exists="replace" in our DAG.

        ndc_df.to_sql(
            "rxnorm_historical",
            con=engine,
            schema="datasource",
            if_exists="replace",
            dtype={"ndcs": sqlalchemy.dialects.postgresql.JSONB},
            index=False
        )

I'm guessing DROP CASCADE is not the answer. Likely we want to TRUNCATE the old table.

Criteria for Success

Run multiple RxNorm Historical DAGs without hitting this error and while preserving downstream views and marts appropriately.

Additional Information

Error message:

*** Reading local file: /opt/airflow/logs/dag_id=rxnorm_historical/run_id=manual__2024-02-20T14:26:36.508805+00:00/task_id=extract_load/attempt=1.log
[2024-02-20, 14:26:41 UTC] {taskinstance.py:1083} INFO - Dependencies all met for <TaskInstance: rxnorm_historical.extract_load manual__2024-02-20T14:26:36.508805+00:00 [queued]>
[2024-02-20, 14:26:41 UTC] {taskinstance.py:1083} INFO - Dependencies all met for <TaskInstance: rxnorm_historical.extract_load manual__2024-02-20T14:26:36.508805+00:00 [queued]>
[2024-02-20, 14:26:41 UTC] {taskinstance.py:1279} INFO - 
--------------------------------------------------------------------------------
[2024-02-20, 14:26:41 UTC] {taskinstance.py:1280} INFO - Starting attempt 1 of 1
[2024-02-20, 14:26:41 UTC] {taskinstance.py:1281} INFO - 
--------------------------------------------------------------------------------
[2024-02-20, 14:26:41 UTC] {taskinstance.py:1300} INFO - Executing <Task(_PythonDecoratedOperator): extract_load> on 2024-02-20 14:26:36.508805+00:00
[2024-02-20, 14:26:41 UTC] {standard_task_runner.py:55} INFO - Started process 1160469 to run task
[2024-02-20, 14:26:41 UTC] {standard_task_runner.py:82} INFO - Running: ['***', 'tasks', 'run', 'rxnorm_historical', 'extract_load', 'manual__2024-02-20T14:26:36.508805+00:00', '--job-id', '123', '--raw', '--subdir', 'DAGS_FOLDER/rxnorm_historical/rxnorm_historical_dag.py', '--cfg-path', '/tmp/tmpxaw_xfz2']
[2024-02-20, 14:26:41 UTC] {standard_task_runner.py:83} INFO - Job 123: Subtask extract_load
[2024-02-20, 14:26:41 UTC] {task_command.py:388} INFO - Running <TaskInstance: rxnorm_historical.extract_load manual__2024-02-20T14:26:36.508805+00:00 [running]> on host 7e7adb0cb55d
[2024-02-20, 14:26:41 UTC] {taskinstance.py:1509} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=***
AIRFLOW_CTX_DAG_ID=rxnorm_historical
AIRFLOW_CTX_TASK_ID=extract_load
AIRFLOW_CTX_EXECUTION_DATE=2024-02-20T14:26:36.508805+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=manual__2024-02-20T14:26:36.508805+00:00
[2024-02-20, 14:31:21 UTC] {logging_mixin.py:137} INFO - MILESTONE 1000
[2024-02-20, 14:36:05 UTC] {logging_mixin.py:137} INFO - MILESTONE 2000
[2024-02-20, 14:40:47 UTC] {logging_mixin.py:137} INFO - MILESTONE 3000
[2024-02-20, 14:45:31 UTC] {logging_mixin.py:137} INFO - MILESTONE 4000
[2024-02-20, 14:50:11 UTC] {logging_mixin.py:137} INFO - MILESTONE 5000
[2024-02-20, 14:54:49 UTC] {logging_mixin.py:137} INFO - MILESTONE 6000
[2024-02-20, 14:59:27 UTC] {logging_mixin.py:137} INFO - MILESTONE 7000
[2024-02-20, 15:04:07 UTC] {logging_mixin.py:137} INFO - MILESTONE 8000
[2024-02-20, 15:08:45 UTC] {logging_mixin.py:137} INFO - MILESTONE 9000
[2024-02-20, 15:13:22 UTC] {logging_mixin.py:137} INFO - MILESTONE 10000
[2024-02-20, 15:18:03 UTC] {logging_mixin.py:137} INFO - MILESTONE 11000
[2024-02-20, 15:22:42 UTC] {logging_mixin.py:137} INFO - MILESTONE 12000
[2024-02-20, 15:27:21 UTC] {logging_mixin.py:137} INFO - MILESTONE 13000
[2024-02-20, 15:32:02 UTC] {logging_mixin.py:137} INFO - MILESTONE 14000
[2024-02-20, 15:36:41 UTC] {logging_mixin.py:137} INFO - MILESTONE 15000
[2024-02-20, 15:41:21 UTC] {logging_mixin.py:137} INFO - MILESTONE 16000
[2024-02-20, 15:46:00 UTC] {logging_mixin.py:137} INFO - MILESTONE 17000
[2024-02-20, 15:50:40 UTC] {logging_mixin.py:137} INFO - MILESTONE 18000
[2024-02-20, 15:55:18 UTC] {logging_mixin.py:137} INFO - MILESTONE 19000
[2024-02-20, 15:59:59 UTC] {logging_mixin.py:137} INFO - MILESTONE 20000
[2024-02-20, 16:04:40 UTC] {logging_mixin.py:137} INFO - MILESTONE 21000
[2024-02-20, 16:09:20 UTC] {logging_mixin.py:137} INFO - MILESTONE 22000
[2024-02-20, 16:14:01 UTC] {logging_mixin.py:137} INFO - MILESTONE 23000
[2024-02-20, 16:18:39 UTC] {logging_mixin.py:137} INFO - MILESTONE 24000
[2024-02-20, 16:23:17 UTC] {logging_mixin.py:137} INFO - MILESTONE 25000
[2024-02-20, 16:27:55 UTC] {logging_mixin.py:137} INFO - MILESTONE 26000
[2024-02-20, 16:32:37 UTC] {logging_mixin.py:137} INFO - MILESTONE 27000
[2024-02-20, 16:37:17 UTC] {logging_mixin.py:137} INFO - MILESTONE 28000
[2024-02-20, 16:41:58 UTC] {logging_mixin.py:137} INFO - MILESTONE 29000
[2024-02-20, 16:46:38 UTC] {logging_mixin.py:137} INFO - MILESTONE 30000
[2024-02-20, 16:51:16 UTC] {logging_mixin.py:137} INFO - MILESTONE 31000
[2024-02-20, 16:55:55 UTC] {logging_mixin.py:137} INFO - MILESTONE 32000
[2024-02-20, 17:00:34 UTC] {logging_mixin.py:137} INFO - MILESTONE 33000
[2024-02-20, 17:05:12 UTC] {logging_mixin.py:137} INFO - MILESTONE 34000
[2024-02-20, 17:09:51 UTC] {logging_mixin.py:137} INFO - MILESTONE 35000
[2024-02-20, 17:14:31 UTC] {logging_mixin.py:137} INFO - MILESTONE 36000
[2024-02-20, 17:19:09 UTC] {logging_mixin.py:137} INFO - MILESTONE 37000
[2024-02-20, 17:23:48 UTC] {logging_mixin.py:137} INFO - MILESTONE 38000
[2024-02-20, 17:28:27 UTC] {logging_mixin.py:137} INFO - MILESTONE 39000
[2024-02-20, 17:33:12 UTC] {logging_mixin.py:137} INFO - MILESTONE 40000
[2024-02-20, 17:37:51 UTC] {logging_mixin.py:137} INFO - MILESTONE 41000
[2024-02-20, 17:42:33 UTC] {logging_mixin.py:137} INFO - MILESTONE 42000
[2024-02-20, 17:50:44 UTC] {connectionpool.py:813} WARNING - Retrying (Retry(total=4, connect=None, read=None, redirect=None, status=None)) after connection broken by 'SSLError(SSLZeroReturnError(6, 'TLS/SSL connection has been closed (EOF) (_ssl.c:1091)'))': /REST/rxcui/104845/allhistoricalndcs.json
[2024-02-20, 17:54:05 UTC] {logging_mixin.py:137} INFO - MILESTONE 43000
[2024-02-20, 18:01:49 UTC] {logging_mixin.py:137} INFO - MILESTONE 44000
[2024-02-20, 18:09:13 UTC] {logging_mixin.py:137} INFO - MILESTONE 45000
[2024-02-20, 18:18:44 UTC] {logging_mixin.py:137} INFO - MILESTONE 46000
[2024-02-20, 18:23:25 UTC] {logging_mixin.py:137} INFO - MILESTONE 47000
[2024-02-20, 18:28:05 UTC] {logging_mixin.py:137} INFO - MILESTONE 48000
[2024-02-20, 18:32:45 UTC] {logging_mixin.py:137} INFO - MILESTONE 49000
[2024-02-20, 18:37:25 UTC] {logging_mixin.py:137} INFO - MILESTONE 50000
[2024-02-20, 18:42:04 UTC] {logging_mixin.py:137} INFO - MILESTONE 51000
[2024-02-20, 18:46:44 UTC] {logging_mixin.py:137} INFO - MILESTONE 52000
[2024-02-20, 18:51:24 UTC] {logging_mixin.py:137} INFO - MILESTONE 53000
[2024-02-20, 18:56:04 UTC] {logging_mixin.py:137} INFO - MILESTONE 54000
[2024-02-20, 19:00:47 UTC] {logging_mixin.py:137} INFO - MILESTONE 55000
[2024-02-20, 19:05:28 UTC] {logging_mixin.py:137} INFO - MILESTONE 56000
[2024-02-20, 19:10:11 UTC] {logging_mixin.py:137} INFO - MILESTONE 57000
[2024-02-20, 19:14:51 UTC] {logging_mixin.py:137} INFO - MILESTONE 58000
[2024-02-20, 19:19:31 UTC] {logging_mixin.py:137} INFO - MILESTONE 59000
[2024-02-20, 19:24:11 UTC] {logging_mixin.py:137} INFO - MILESTONE 60000
[2024-02-20, 19:28:51 UTC] {logging_mixin.py:137} INFO - MILESTONE 61000
[2024-02-20, 19:33:30 UTC] {logging_mixin.py:137} INFO - MILESTONE 62000
[2024-02-20, 19:38:10 UTC] {logging_mixin.py:137} INFO - MILESTONE 63000
[2024-02-20, 19:39:51 UTC] {logging_mixin.py:137} INFO -    rxcui                                               ndcs
0  91348  {'historicalNdcTime': [{'status': 'direct', 'r...
1  91349  {'historicalNdcTime': [{'status': 'direct', 'r...
2  91668  {'historicalNdcTime': [{'status': 'direct', 'r...
3  91691  {'historicalNdcTime': [{'status': 'direct', 'r...
4  91692  {'historicalNdcTime': [{'status': 'direct', 'r...
5  91792  {'historicalNdcTime': [{'status': 'direct', 'r...
6  91805  {'historicalNdcTime': [{'status': 'direct', 'r...
7  92582  {'historicalNdcTime': [{'status': 'direct', 'r...
8  92583  {'historicalNdcTime': [{'status': 'direct', 'r...
9  92584  {'historicalNdcTime': [{'status': 'direct', 'r...
[2024-02-20, 19:39:51 UTC] {crypto.py:83} WARNING - empty cryptography key - values will not be stored encrypted.
[2024-02-20, 19:39:51 UTC] {base.py:73} INFO - Using connection ID 'postgres_default' for task execution.
[2024-02-20, 19:39:51 UTC] {taskinstance.py:1768} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1901, in _execute_context
    cursor, statement, parameters, context
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
    cursor.execute(statement, parameters)
psycopg2.errors.DependentObjectsStillExist: cannot drop table datasource.rxnorm_historical because other objects depend on it
DETAIL:  view staging.stg_rxnorm_historical__ndcs depends on table datasource.rxnorm_historical
view staging.stg_rxnorm_historical__most_recent_ndcs depends on view staging.stg_rxnorm_historical__ndcs
HINT:  Use DROP ... CASCADE to drop the dependent objects too.

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/decorators/base.py", line 217, in execute
    return_value = super().execute(context)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/operators/python.py", line 175, in execute
    return_value = self.execute_callable()
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/operators/python.py", line 192, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/opt/airflow/dags/rxnorm_historical/rxnorm_historical_dag.py", line 100, in extract_load
    index=False
  File "/home/airflow/.local/lib/python3.7/site-packages/pandas/core/generic.py", line 2882, in to_sql
    method=method,
  File "/home/airflow/.local/lib/python3.7/site-packages/pandas/io/sql.py", line 728, in to_sql
    **engine_kwargs,
  File "/home/airflow/.local/lib/python3.7/site-packages/pandas/io/sql.py", line 1758, in to_sql
    dtype=dtype,
  File "/home/airflow/.local/lib/python3.7/site-packages/pandas/io/sql.py", line 1650, in prep_table
    table.create()
  File "/home/airflow/.local/lib/python3.7/site-packages/pandas/io/sql.py", line 860, in create
    self.pd_sql.drop_table(self.name, self.schema)
  File "/home/airflow/.local/lib/python3.7/site-packages/pandas/io/sql.py", line 1810, in drop_table
    self.get_table(table_name, schema).drop()
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/sql/schema.py", line 979, in drop
    bind._run_ddl_visitor(ddl.SchemaDropper, self, checkfirst=checkfirst)
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 3228, in _run_ddl_visitor
    conn._run_ddl_visitor(visitorcallable, element, **kwargs)
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 2211, in _run_ddl_visitor
    visitorcallable(self.dialect, self, **kwargs).traverse_single(element)
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/sql/visitors.py", line 524, in traverse_single
    return meth(obj, **kw)
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/sql/ddl.py", line 1102, in visit_table
    self.connection.execute(DropTable(table))
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1380, in execute
    return meth(self, multiparams, params, _EMPTY_EXECUTION_OPTS)
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/sql/ddl.py", line 81, in _execute_on_connection
    self, multiparams, params, execution_options
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1478, in _execute_ddl
    compiled,
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1944, in _execute_context
    e, statement, parameters, cursor, context
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 2125, in _handle_dbapi_exception
    sqlalchemy_exception, with_traceback=exc_info[2], from_=e
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1901, in _execute_context
    cursor, statement, parameters, context
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.InternalError: (psycopg2.errors.DependentObjectsStillExist) cannot drop table datasource.rxnorm_historical because other objects depend on it
DETAIL:  view staging.stg_rxnorm_historical__ndcs depends on table datasource.rxnorm_historical
view staging.stg_rxnorm_historical__most_recent_ndcs depends on view staging.stg_rxnorm_historical__ndcs
HINT:  Use DROP ... CASCADE to drop the dependent objects too.

[SQL: 
DROP TABLE datasource.rxnorm_historical]
(Background on this error at: https://sqlalche.me/e/14/2j85)
[2024-02-20, 19:39:51 UTC] {taskinstance.py:1323} INFO - Marking task as FAILED. dag_id=rxnorm_historical, task_id=extract_load, execution_date=20240220T142636, start_date=20240220T142641, end_date=20240220T193951
[2024-02-20, 19:39:51 UTC] {standard_task_runner.py:105} ERROR - Failed to execute job 123 for task extract_load ((psycopg2.errors.DependentObjectsStillExist) cannot drop table datasource.rxnorm_historical because other objects depend on it
DETAIL:  view staging.stg_rxnorm_historical__ndcs depends on table datasource.rxnorm_historical
view staging.stg_rxnorm_historical__most_recent_ndcs depends on view staging.stg_rxnorm_historical__ndcs
HINT:  Use DROP ... CASCADE to drop the dependent objects too.

[SQL: 
DROP TABLE datasource.rxnorm_historical]
(Background on this error at: https://sqlalche.me/e/14/2j85); 1160469)
[2024-02-20, 19:39:51 UTC] {local_task_job.py:208} INFO - Task exited with return code 1
[2024-02-20, 19:39:51 UTC] {taskinstance.py:2578} INFO - 0 downstream tasks scheduled from follow-on schedule check
jrlegrand commented 1 month ago

I believe this was resolved in sagerx.py - closing this issue.

    if if_exists == "replace":
        engine.execute(f'DROP TABLE IF EXISTS {schema_name}.{table_name} cascade')