databricks / spark-redshift

Redshift data source for Apache Spark
Apache License 2.0
607 stars 349 forks source link

Overwrite using staging table fails when table has dependencies #213

Open khaledh opened 8 years ago

khaledh commented 8 years ago

When using overwrite mode to save data to a table, and also leaving usestagingtable to its default value of true, the operation fails with the following error when the target table already has dependencies (e.g. a view depends on the table):

java.sql.SQLException: [Amazon](500310) Invalid operation: current transaction is aborted, commands ignored until end of transaction block;
    at com.amazon.redshift.client.messages.inbound.ErrorResponse.toErrorException(ErrorResponse.java:1830)
    at com.amazon.redshift.client.PGMessagingContext.handleErrorResponse(PGMessagingContext.java:804)
    at com.amazon.redshift.client.PGMessagingContext.handleMessage(PGMessagingContext.java:642)
    at com.amazon.jdbc.communications.InboundMessagesPipeline.getNextMessageOfClass(InboundMessagesPipeline.java:312)
    at com.amazon.redshift.client.PGMessagingContext.doMoveToNextClass(PGMessagingContext.java:1062)
    at com.amazon.redshift.client.PGMessagingContext.getParameterDescription(PGMessagingContext.java:978)
    at com.amazon.redshift.client.PGClient.prepareStatement(PGClient.java:1844)
    at com.amazon.redshift.dataengine.PGQueryExecutor.<init>(PGQueryExecutor.java:106)
    at com.amazon.redshift.dataengine.PGDataEngine.prepare(PGDataEngine.java:211)
    at com.amazon.jdbc.common.SPreparedStatement.<init>(Unknown Source)
    at com.amazon.jdbc.jdbc41.S41PreparedStatement.<init>(Unknown Source)
    at com.amazon.redshift.core.jdbc41.PGJDBC41PreparedStatement.<init>(PGJDBC41PreparedStatement.java:49)
    at com.amazon.redshift.core.jdbc41.PGJDBC41ObjectFactory.createPreparedStatement(PGJDBC41ObjectFactory.java:119)
    at com.amazon.jdbc.common.SConnection.prepareStatement(Unknown Source)
    at com.databricks.spark.redshift.RedshiftWriter.withStagingTable(RedshiftWriter.scala:137)
    at com.databricks.spark.redshift.RedshiftWriter.saveToRedshift(RedshiftWriter.scala:369)
    at com.databricks.spark.redshift.DefaultSource.createRelation(DefaultSource.scala:106)
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:222)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:209)
Caused by: com.amazon.support.exceptions.ErrorException: [Amazon](500310) Invalid operation: current transaction is aborted, commands ignored until end of transaction block;
    ... 29 more

I tracked this error down to the following code in RedshiftWriter.scala:

    try {
      action(tempTable.toString)

      if (jdbcWrapper.tableExists(conn, table.toString)) {
        jdbcWrapper.executeInterruptibly(conn.prepareStatement(
          s"""
             | BEGIN;
             | ALTER TABLE $table RENAME TO ${backupTable.escapedTableName};
             | ALTER TABLE $tempTable RENAME TO ${table.escapedTableName};
             | DROP TABLE $backupTable;
             | END;
           """.stripMargin.trim))
      } else {
        jdbcWrapper.executeInterruptibly(conn.prepareStatement(
          s"ALTER TABLE $tempTable RENAME TO ${table.escapedTableName}"))
      }
    } finally {
      jdbcWrapper.executeInterruptibly(conn.prepareStatement(s"DROP TABLE IF EXISTS $tempTable"))
    }

When trying this transaction manually in SQL Workbench, I get the following error:

[Amazon](500310) Invalid operation: cannot drop table myschema.mytable because other objects depend on it;

I was hoping that spark-redshift would let this error (which is the actual culprit) bubble up when it happens, but instead I get the error I mentioned in the beginning. This is happening because the original exception is masked by another exception that happens due to the DROP TABLE IF EXISTS in the finally block, which fails because the transaction is in a bad state at this point, giving the error message Invalid operation: current transaction is aborted, commands ignored until end of transaction block.

I'm not sure what the best solution is in this case. I'm open to suggestions.

JoshRosen commented 8 years ago

I guess we need another layer of try-finally in the finally block to log and ignore exceptions thrown there so that we don't mask the original cause / exception.

khaledh commented 8 years ago

It's a good idea to log the original exception for sure. I was hoping also for a solution to the main problem, which is how can I use spark-redshift to overwrite a table that has dependencies? I guess there's no easy way around this.

One solution would be to not overwrite, but to truncate the table then append to it. But this could lead to data loss if the append doesn't complete. It also leaves a window of time where the table would be unavailable to other queries.

JoshRosen commented 8 years ago

Not a solution to your problem necessarily but I think that that the changes in #157 should have addressed the silent exception loss issue that made this ticket harder to debug.

suryasev commented 8 years ago

Hitting same issue here. Is it possible to somehow pass a parameter to automatically do the cascading delete when this situation crops up?

http://docs.aws.amazon.com/redshift/latest/dg/r_DROP_TABLE.html

suryasev commented 8 years ago

I tried using the new preactions to manually drop the table w/ cascade, but it seems that preactions happen after the table is already created.

suryasev commented 8 years ago

So the only solution I found other than forking or managing a separate Redshift connection was to save to a temp table, do the drop cascade as a preaction and a rename of the table as a postaction: .option("preactions", s"DROP TABLE IF EXISTS $dbtable CASCADE;") .option("dbtable", s"$dbtable$tempTableSuffix") .option("postactions", s"ALTER TABLE $dbtable$tempTableSuffix RENAME TO $dbtable;")

As awesome as this is, I'd love to see either preactions happen before the table create or a separate feature for CASCADE. Views can be created by any random person in Redshift, potentially causing this error.

JoshRosen commented 8 years ago

Another way to avoid this problem would be to have spark-redshift truncate the existing table then load new rows rather than dropping and re-creating the table. The current behavior in spark-redshift matches how Spark's built-in JDBC data source behaves (i.e. Spark also drops and re-creates the table), but there are multiple JIRA tickets proposing to use TRUNCATE there was well:

(/cc @marmbrus @rxin @dongjoon-hyun)

dongjoon-hyun commented 8 years ago

Thank you for pining me, @JoshRosen .

dongjoon-hyun commented 8 years ago

SPARK-16410 is trying to make SaveMode.Truncate.

SPARK-16463 is trying to make truncate option in SaveMode.Overwrite.

https://issues.apache.org/jira/browse/SPARK-16463 (https://github.com/apache/spark/pull/14086)

I think SPARK-16463 is the fast way to support TRUNCATE feature with minimal change. Also, PR is ready.

sugix commented 7 years ago

facing the same issue.

sugix commented 7 years ago

Was totally fine as long as analysts created views on top of the tables, all Overwrite mode transfers are failing because it cannot drop the table

moohebat commented 2 years ago

Let's say we have this scenario: There is a materialized view connected to the table. If I try to overwrite the table, as under the hood, the tables should be drop and recreate, I get an error due to the dependancies. If in the preaction, I drop the table with cascade effect, View also will be dropped, which is not preferable as the BI is connected to the view and it will cause a bad user experience. I should refresh the data every 15 minutes, is there any way that without effecting the Views, I write into tables by Spark?