snowflakedb / spark-snowflake

Snowflake Data Source for Apache Spark.
http://www.snowflake.net
Apache License 2.0
211 stars 98 forks source link

Spark Connector executes query twice on failure #561

Open Leonti opened 3 months ago

Leonti commented 3 months ago

We have observed, that when a SELECT query fails it is executed twice. This undesirable, because once a warehouse timeout is reached (let's say it's 3 hours) it runs a second very similar query which times out again and we end up paying double for failed jobs.
Here is a minimal AWS Glue 4.0 (Spark 3.3, spark-snowflake_2.12-2.15.0-spark_3.3.jar+snowflake-jdbc-3.15.1.jar) PySpark job to reproduce:

import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
from py4j.java_gateway import java_import

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

args = getResolvedOptions(
    sys.argv,
    [
        "sf-connection",
        "sf-database",
        "sf-warehouse",
        "sf-username",
        "sf-password"
    ],
)

sc = SparkContext.getOrCreate()
sc.setLogLevel('DEBUG')
glueContext = GlueContext(sc)

spark = glueContext.spark_session
job = Job(glueContext)

java_import(spark._jvm, SNOWFLAKE_SOURCE_NAME)
spark._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.enablePushdownSession(spark._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())

sfOptions={
        "sfURL" : "https://<snowflake-url-here>",
        "sfUser" : args["sf_username"],
        "sfPassword" : args["sf_password"],
        "sfDatabase": args["sf_database"],
        "sfSchema": "PUBLIC",
        "sfWarehouse": args["sf_warehouse"],
        "ABORT_DETACHED_QUERY": "false",
        "STATEMENT_TIMEOUT_IN_SECONDS": "20",
        "query": f"select data from <table_here>",
    }

df = spark.read.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).load().cache()

job.commit()

Please note the "STATEMENT_TIMEOUT_IN_SECONDS": "20" to force query to timeout. Table has enough data not to return results in 20 seconds. What is happening is that we see 3 failed queries being run in Snowflake, here the full list of queries in Snowflake after this query is run:

'alter session set timezone = 'UTC'...'
'SELECT * FROM ( ( select data from...' <- 9:56:42 AM - 9:57:03 AM, timed out after 21 seconds
'alter session set timezone = 'UTC'...'
'SELECT * FROM ( ( select data from...' <- 9:57:07 AM - 9:57:28 AM, timed out after 21 seconds
'alter session set timezone = 'UTC'...'
'SELECT "DATA" FROM (select data from...' <- 9:57:32 AM - 9:57:53 AM, timed out after 21 seconds
'select SYSTEM$CANCEL_QUERY(...'
'select SYSTEM$CANCEL_QUERY(...'
'select SYSTEM$CANCEL_QUERY(...'

Please note that 3 queries run and the last one is SELECT "DATA" instead of SELECT *

When the same exact job is successful ("STATEMENT_TIMEOUT_IN_SECONDS": "20" is removed) we only see:

'alter session set timezone = 'UTC'...'
'SELECT * FROM ( ( select data...'
'select * from table(result_scan(...'

Which is what you'd expect.

Upon further investigation I believe I found the root cause of the issue.

  1. Spark has a list of strategies. It iterates over them to get a list of plans.
    For Snowflake it seems to have 2 strategies: net.snowflake.spark.snowflake.pushdowns.SnowflakeStrategy from Snowflake and org.apache.spark.sql.execution.datasources.DataSourceStrategy
  2. Each strategy is supposed to return a plan on apply. Plans are generated lazily and only materialise here
  3. The first plan to be generated is coming from SnowflakeStrategy. I'm not sure this is what Spark expects (I've never implemented a Spark strategy), but generating a plan actually involves running the whole query! So it can spend 3 hours there, timeout and then return no plan. Unfortunately exceptions are swallowed here and en empty plan is returned instead of an exception. We've seen it multiple times on timeouts and once with no_error_code_from_server. The outcome is the message in logs Pushdown failed :Status of query associated with resultSet is FAILED_WITH_ERROR. Statement reached its statement or warehouse timeout of 20 second(s) and was canceled. Results not generated. after 3 hours (our default timeout) or 20 seconds in this case. It also happened to us once after 1 hour with no_error_code_from_server (separate issue that we are still investigating). In both cases the outcome is that it just returns Nil as response.
  4. Since the first strategy returns Nil (empty plan list), the org.apache.spark.sql.execution.datasources.DataSourceStrategy one kicks in which invokes buildScan which again needs to run a very similar query (the one with SELECT "DATA", because it's a different strategy now).

I believe the ultimate problem is this:

/** Clean up the plan, then try to generate a query from it for Snowflake.
  * The Try-Catch is unnecessary and may obfuscate underlying problems,
  * but in beta mode we'll do it for safety and let Spark use other strategies
  * in case of unexpected failure.
  */

If it had just thrown exception Spark wouldn't have attempted to use another strategy.

Because I've enabled debug logs I could see DEBUG SnowflakeTelemetryMessageSender: Send Telemetry logs which thankfully have stack traces:

"SELECT * FROM ( ( select data from":
java.sql.SQLException: Status of query associated with resultSet is FAILED_WITH_ERROR. Statement reached its statement or warehouse timeout of 20 second(s) and was canceled. Results not generated.
at net.snowflake.client.jdbc.SFAsyncResultSet.getRealResults(SFAsyncResultSet.java:159)
at net.snowflake.client.jdbc.SFAsyncResultSet.getResultSetSerializables(SFAsyncResultSet.java:396)
at net.snowflake.spark.snowflake.SnowflakeRelation.liftedTree1$1(SnowflakeRelation.scala:222)
at net.snowflake.spark.snowflake.SnowflakeRelation.getSnowflakeResultSetRDD(SnowflakeRelation.scala:202)
at net.snowflake.spark.snowflake.SnowflakeRelation.getRDD(SnowflakeRelation.scala:167)
at net.snowflake.spark.snowflake.SnowflakeRelation.buildScanFromSQL(SnowflakeRelation.scala:107)
at net.snowflake.spark.snowflake.pushdowns.querygeneration.QueryBuilder.toRDD(QueryBuilder.scala:122)
at net.snowflake.spark.snowflake.pushdowns.querygeneration.QueryBuilder.rdd$lzycompute(QueryBuilder.scala:46)
at net.snowflake.spark.snowflake.pushdowns.querygeneration.QueryBuilder.rdd(QueryBuilder.scala:46)
at net.snowflake.spark.snowflake.pushdowns.querygeneration.QueryBuilder$.$anonfun$getRDDFromPlan$1(QueryBuilder.scala:307)
at scala.Option.map(Option.scala:230)
at net.snowflake.spark.snowflake.pushdowns.querygeneration.QueryBuilder$.getRDDFromPlan(QueryBuilder.scala:306)
at net.snowflake.spark.snowflake.pushdowns.SnowflakeStrategy.buildQueryRDD(SnowflakeStrategy.scala:42)
at net.snowflake.spark.snowflake.pushdowns.SnowflakeStrategy.apply(SnowflakeStrategy.scala:21)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
"SELECT "DATA" FROM (select data from" query:
"java.sql.SQLException: Status of query associated with resultSet is FAILED_WITH_ERROR. Statement reached its statement or warehouse timeout of 20 second(s) and was canceled. Results not generated.
at net.snowflake.client.jdbc.SFAsyncResultSet.getRealResults(SFAsyncResultSet.java:159)
at net.snowflake.client.jdbc.SFAsyncResultSet.getResultSetSerializables(SFAsyncResultSet.java:396)
at net.snowflake.spark.snowflake.SnowflakeRelation.liftedTree1$1(SnowflakeRelation.scala:222)
at net.snowflake.spark.snowflake.SnowflakeRelation.getSnowflakeResultSetRDD(SnowflakeRelation.scala:202)
at net.snowflake.spark.snowflake.SnowflakeRelation.getRDD(SnowflakeRelation.scala:167)
at net.snowflake.spark.snowflake.SnowflakeRelation.buildScan(SnowflakeRelation.scala:147)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$apply$4(DataSourceStrategy.scala:364)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$pruneFilterProject$1(DataSourceStrategy.scala:398)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:454)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:397)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:364)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:72)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)

This shows that 2 strategies are being used one after another. I'm attaching full stack traces to the issue.

I believe not catching this exception would fix the issue, so that in case of a timeout or another unknown query error Spark job would fail instead of trying another strategy until it also fails with a timeout.

select_column_error.txt select_star_error.txt