snowflakedb / spark-snowflake

Snowflake Data Source for Apache Spark.
http://www.snowflake.net
Apache License 2.0
215 stars 99 forks source link

Connection caching does not work. #485

Open michellemay opened 1 year ago

michellemay commented 1 year ago

When connecting to snowflake using spark-connector, connection caching does not occur.

It is completely unusable if MFA/DuoPush is used on snowflake. This piece of code creates 3 connections to snowflake and requires 3 MFA authorizations!

    val options = Map(
      Parameters.PARAM_SF_DATABASE -> "db",
      Parameters.PARAM_SF_SCHEMA -> "schema",
      Parameters.PARAM_SF_QUERY -> """SELECT * FROM table""",
      Parameters.PARAM_SF_URL -> fqdn,
      Parameters.PARAM_SF_USER -> username,
      Parameters.PARAM_SF_PASSWORD -> password,
      Parameters.PARAM_SF_ROLE -> role,
      Parameters.PARAM_SF_WAREHOUSE -> wh,
      "ALLOW_CLIENT_MFA_CACHING" -> "true",
      "CLIENT_SESSION_KEEP_ALIVE" -> "true"
    )

    val df: DataFrame = spark.read
      .format("snowflake")
      .options(options)
      .load()

    df.count()

While looking at the code and putting some breakpoints, I find that none of JDBCWrapper, DriverManager, SnowflakeDriver, SnowflakeConnectionV1, DefaultSFConnectionHandler will cache or reuse connections.

Versions:

So, unless I missed some piece of documentation showing how to cache connections, I cannot use spark connector for snowflake in my environment.

I thought about subclassing DefaultSource and providing a different JDBCWrapper than DefaultJDBCWrapper in the constructor. However, it looks like DefaultJDBCWrapper is hardcoded at multiple places and is also a private class.

sfc-gh-wfateem commented 1 year ago

Hi @michellemay, Thanks for reaching out. I saw you opened a support ticket with us and I just sent you a response. I'll plan to post the results of our findings here once we conclude that ticket to benefit the rest of the GitHub community.

michellemay commented 1 year ago

If I may add some context. MFA is only an issue when developping/debugging. Production code run under service account.

However, by not reusing the sessions, spark client makes it impossible to create and use temporary tables. We must instead use transient tables and manage lifetime.

sfc-gh-wfateem commented 1 year ago

@michellemay I'm assuming you're running this on a Linux environment, right? One of the issues here is that MFA token caching doesn't seem to be supported in Linux.

I find it a bit odd that this would create three different connections though. Can you give me a bit more detail on how you're running your application? You're saying this is in the development phase, so are you just running that code from something like spark-shell and in local mode for example?

michellemay commented 1 year ago

Hi Waleed,

yes, running this in a unit test locally on my laptop. But same thing happens when running this on EC2 in prod account on aws.

michellemay commented 1 year ago

That said, connection reuse must work. Otherwise, we will get a new connection and session for each call to snowflake we make. That is innefficient and also makes it impossible to have temporary tables for the duration of the session.

sfc-gh-wfateem commented 1 year ago

@michellemay is your laptop running MacOS or Linux? I'm just trying to get a better understanding of your environment to keep that in mind while I try to reproduce your issue.

michellemay commented 1 year ago

ubuntu

sfc-gh-wfateem commented 1 year ago

@michellemay so I ran a really quick test and I'm not seeing multiple connections. The query history in Snowflake shows that all the queries submitted by the JDBC driver were all part of the same session: scala> var sfOptions = Map( "sfUrl" -> "my_account.snowflakecomputing.com", "sfUser" -> "spark_user", "sfPassword" -> "***", "sfDatabase" -> "my_db", "sfSchema" -> "my_schema", "sfWarehouse" -> "X_SMALL", "sfRole" -> "SPARK_ROLE")

I then did the following which kicked off the queries:

scala> val df = spark.read.format(SNOWFLAKE_SOURCE_NAME).options(sfOptions).option("dbtable", "SOURCE_TABLE").load()
df: org.apache.spark.sql.DataFrame = [KEY: decimal(38,0), VALUE: string]

scala> df.show()
+---+------+
|KEY| VALUE|
+---+------+
|  1|value1|
|  2|value2|
|  3|value3|
+---+------+

Here's what I then see in Snowflake's query history page, notice the session ID is the same for all the queries that were submitted as the result of executing the code I shared above:

Screen Shot 2022-12-14 at 10 18 15 AM

I'm not sure why in your case you're creating three separate connections. Could it be that you have multiple partitions in that DataFrame? Do you see the same outcome if you do something like the following first?

    val df: DataFrame = spark.read
      .format("snowflake")
      .options(options)
      .load()
      .coalesce(1)

      df.count()

Now, if at this point you start running more actions, then that will result in a new DataFrame since that's immutable in Spark and we end up creating a new session (new connection) to submit a new set of queries:

scala> df.select("column_name").show()
+---+
|KEY|
+---+
|  1|
|  2|
|  3|
+---+

That to me seems reasonable since we're going to close off the connection object after finishing fetching the results for that first DataFrame. I don't think leaving a Connection object open indefinitely is a good idea. You would need some kind of connection pooling framework so that connection is returned back to the pool to be reused later on when calling getConnection, however, I see the JDBCWrapper leveraging java.sql.DriverManager so there really isn't any opportunity there to use connection pooling.

So there are two things here:

1.) Constantly getting MFA challenges because you end up creating a new connection with every action on a DataFrame.

For that, you would need to set the parameter ALLOW_CLIENT_MFA_CACHING to true on the Snowflake account level, not in your properties map like you're doing in your code. The issue here is that this isn't supported on Linux, so that doesn't help you unless you're running your tests on a macOS or Windows machine.

Making that supportable on Linux or making any changes to that is going to be outside of the Snowflake Spark connector's scope.

2.) The issue of creating new connections rather than reusing them.

That would be considered an enhancement and would require a change to allow someone to use a connection pooling framework of their choice (Hikari for example). That's something we can look into and discuss with the Spark connector's maintainers.

michellemay commented 1 year ago

Hi Waleed, thanks for your time investigating this.

Just to be sure, which version of spark and the connector are you using? I'm stuck on 3.0.1 for some time because we we encountered a critical issue while upgrading one of our applications.

I noticed some differences in terms of connection creation when running some tests on a more recent version of spark. It seems that 3.0.1 is a bit worse with that respect when compared to 3.1.1. However, I still have multiple connections for a single statement.

From what I see in the code, jdbc connection occurs inside net.snowflake.spark.snowflake.JDBCWrapper#getConnector. That function is called from several places:

For instance, lazy schema resolution of the SnowflakeRelation and fetching the result set will trigger two connections. For some operations, they get to share the same session. For instance, preactions will be executed on the same session.

My problem with new connections each time (other than MFA) is the fact that it makes it impossible to have temporary tables. In snowflake, temporary tables are destroyed after the session closes. We can work around this limitation by creating transient table and managing table lifetime, as well as implementing a cleanup process when unforeseen failures happens (like having our app being terminated by an external entity). However, it's not practical.

On a side note, snowflake connector for python (that we also use in sagemaker) does not suffer from that issue. Connecting and authenticating to snowflake is done once and then the session is reused as expected.

sfc-gh-wfateem commented 1 year ago

@michellemay These are the maven coordinates I used when starting up spark-shell: net.snowflake:snowflake-jdbc:3.13.22,net.snowflake:spark-snowflake_2.12:2.11.0-spark_3.2

The Apache Spark environment is 3.2.0 running on Scala 2.12.15.

There's quite a bit of a difference between the Snowflake Python Connector and the Spark connector, but I understand the issue you're pointing out here. How are you going about creating your tables from the spark connector? Are you just using the Utils.runquery() method?

Let me look into this a bit more and see what we can do here.

michellemay commented 1 year ago

I'm currently using plain dataframe writer:

    df.write
      .format("snowflake")
      .option(Parameters.PARAM_SF_DBTABLE, table)
      .options(options)
      .mode(mode)
      .save()

I'm not sure if it's the best way though.

How would you go to create a temp table ?

Ideally, I would have a 'temporary' flag on the dataframe writer and be able to reuse session ;)

sfc-gh-wfateem commented 1 year ago

The method you're using here wouldn't help you create a temporary table. There isn't a 'temporary' option and this is useful when you want to move data from Spark to Snowflake (persisting it). But I think I'm starting to get a clearer picture here of what you're trying to achieve.

I'm guessing the purpose of the temporary table in your case is to be able to perform some other work later on using that same data within that same Spark application, right? If that's the case, I don't think you would necessarily need to create a temporary table in Snowflake, you can just address that in Spark by doing something like:

val df = spark.read.format(SNOWFLAKE_SOURCE_NAME).options(sfOptions).option("dbtable", "SOURCE_TABLE").load()

df.createOrReplaceTempView("my_view"); 

val df2 = spark.sql("select * from my_view");

df2.show()

In this scenario, you're not bound to the Snowflake session anymore. That temporary view is tied to your SparkSession instead. Unless you're destroying the SparkSession and you recreate that frequently, then this should be good enough. Otherwise, you would want to use the createOrReplaceGlobalTempView() method instead, and that would be tied to SparkContext (i.e. when you terminate the application and SparkContext's stop method is called).

Wouldn't this essentially help you achieve what you're ultimately trying to do?

michellemay commented 1 year ago

The goal of sending data to snowflake is to be able to join/filter remotely. Lets say I have a table A where I can select some rows based on complex condition, bring that back to the spark cluster for other processing. Then, send the result to snowflake and inner join table B on it.

In my setup, we have hundreds of millions of rows in table A, processed and then filtered down to a few millions. (It's not possible to do that filtering purely in SQL). Upload that to snowflake and join with table B containing billions of rows. If I let spark do the join locally, snowflake will have to materialize and transfer billions of rows just to discard them once on spark cluster. Table B is a view over some other data. I'd rather let snowflake do the filtering efficiently.

sfc-gh-wfateem commented 1 year ago

Ok, that makes sense. Thanks for clarifying @michellemay.

sfc-gh-wfateem commented 1 year ago

@michellemay happy new year! I reached out to one of the maintainers, and the current design of the Spark connector doesn't make it possible to reuse a connection, so it pretty much needs to be redesigned. We've opened up a ticket internally to track that request.

michellemay commented 1 year ago

Hi Waleed,

This seems reasonnable. Thanks for the update.

sfc-gh-mrui commented 1 year ago

@michellemay @sfc-gh-wfateem The Spark Connector support shared connection from version 2.11.2. Suppose it will help your use case. https://github.com/snowflakedb/spark-snowflake/releases/tag/v2.11.2-spark_3.3