yaooqinn / itachi

A library that brings useful functions from various modern database management systems to Apache Spark
https://itachi.readthedocs.io/
Apache License 2.0
53 stars 4 forks source link

Using the library in Databricks environment #8

Closed MrPowers closed 3 years ago

MrPowers commented 3 years ago

I did a bit of experimentation and looks like it's tricky to use this lib in Databricks.

Any way we can provide an interface that doesn't require the user to set a configuration option?

Perhaps we can let the user run an import statement like import org.apache.spark.sql.itachi.postgres._ to get all the functions? The function registration process is still a little fuzzy for me. Let me know if you think this would be possible!

yaooqinn commented 3 years ago

SGTM to have scala/java APIs support for Itachi

MrPowers commented 3 years ago

@yaooqinn - yea, we should consider Scala/Java APIs, but I'm specifically referring to how we can get the current SQL functions working in a Databricks notebook. For example, how can I get select array_cat(arr1, arr2) as both_arrays from some_data working in a Databricks notebook?

yaooqinn commented 3 years ago

I have no idea how Databricks notebook works.

But I guess it is as same as adding delta lake as we both use SparkSessionExtentions. FYI, https://docs.delta.io/latest/delta-update.html#configure-sparksession&language-scala.

Also cc @cloud-fan @Hyukjinkwon

MrPowers commented 3 years ago

@yaooqinn - I'll try to figure it out. You can sign up for a Databricks Community account if you'd like to do experimentation on your end (it's free).

Will need to make sure to spell out all the steps clearly for Databricks users cause that'll be the most common runtime for this lib. Thanks!

yaooqinn commented 3 years ago

Looks nice! I will try it later

HyukjinKwon commented 3 years ago

Hm, it's just jars right? I think it should be same as just regular Spark. For example, setting spark.jars to standard Maven coordinate like com.github.yaooqinn:itachi:0.1.0.

Or I believe Databricks allows UI for that: Screen Shot 2021-03-16 at 11 17 52 AM

with Spark 3.2 (DBR 9), users should be able to add them runtime via ADD JAR with Ivy scheme, see also https://github.com/apache/spark/pull/29966

MrPowers commented 3 years ago

@yaooqinn @HyukjinKwon - thanks for the responses. This post provides more context.

I started up a Databricks community cluster with the spark.sql.extensions=org.apache.spark.sql.extra.PostgreSQLExtensions configuration option set.

Screen Shot 2021-03-16 at 7 22 52 AM

Then I attached the library.

Screen Shot 2021-03-16 at 7 28 10 AM

The array_append function that's defined in itachi isn't accessible like I expected it to be:

Screen Shot 2021-03-16 at 7 32 51 AM

new org.apache.spark.sql.extra.PostgreSQLExtensions().apply(spark.extensions) didn't work either.

Screen Shot 2021-03-16 at 7 42 18 AM

Here's how we're currently injecting the functions:

class PostgreSQLExtensions extends Extensions {
  override def apply(ext: SparkSessionExtensions): Unit = {
    ext.injectFunction(Age.fd)
    ext.injectFunction(ArrayAppend.fd)
    ext.injectFunction(ArrayLength.fd)
    ext.injectFunction(FunctionAliases.array_cat)
    ext.injectFunction(IntervalJustifyLike.justifyDays)
    ext.injectFunction(IntervalJustifyLike.justifyHours)
    ext.injectFunction(IntervalJustifyLike.justifyInterval)
    ext.injectFunction(Scale.fd)
    ext.injectFunction(SplitPart.fd)
    ext.injectFunction(StringToArray.fd)
    ext.injectFunction(UnNest.fd)
  }
}

Per the SparkSessionExtensions docs, perhaps we need to do something like this?

class MyExtensions extends Function1[SparkSessionExtensions, Unit] {
  override def apply(extensions: SparkSessionExtensions): Unit = {
    extensions.injectResolutionRule { session =>
      ...
    }
    extensions.injectParser { (session, parser) =>
      ...
    }
  }
}

Thanks for the help!

yaooqinn commented 3 years ago

can we run set spark.sql.extensions to check whether itachi is on it?

yaooqinn commented 3 years ago

I test is locally with the age function, it seems fine

bin/spark-sql --packages com.github.yaooqinn:itachi_2.12:0.1.0 --conf spark.sql.extensions=org.apache.spark.sql.extra.PostgreSQLExtensions
spark-sql> select age(timestamp '2000', timestamp'1990')
         > ;
10 years
Time taken: 0.043 seconds, Fetched 1 row(s)
spark-sql> %
yaooqinn commented 3 years ago

spark-shell also works fine

scala> spark.sql("select age(timestamp '2000', timestamp'1990')").show
+---------------------------------------------------------------------+
|age(TIMESTAMP '2000-01-01 00:00:00', TIMESTAMP '1990-01-01 00:00:00')|
+---------------------------------------------------------------------+
|                                                             10 years|
+---------------------------------------------------------------------+

scala> spark.sql("select array_append(array(1, 2, 3), 4)").show
+-------------------------------+
|array_append(array(1, 2, 3), 4)|
+-------------------------------+
|                   [1, 2, 3, 4]|
+-------------------------------+
scala>
MrPowers commented 3 years ago

@yaooqinn - Ran set spark.sql.extensions to confirm that the extensions are added (it's still not working):

Screen Shot 2021-03-17 at 11 08 01 AM

Databricks requires that you set the configuration options before starting the cluster and then attach the JAR file once the cluster is running. That's probably what's causing the error.

Sim created another way of registering Spark SQL functions that works in Databricks. We might have to use that or another approach.

We'll have to get this figured out cause a lot of the ppl that will want to use Itachi are on Databricks!

yaooqinn commented 3 years ago

Databricks requires that you set the configuration options before starting the cluster and then attach the JAR file once the cluster is running. That's probably what's causing the error.

This is the same as what I did for a local test above. What does the cluster actually mean? the standalone with master/workers? or the app with driver/executors? However, I guess Databricks must support adding external jars and confs together before startup whatever the cluster is

MrPowers commented 3 years ago

@yaooqinn - I asked this question on StackOverflow and found two approaches that work on Databricks.

Option 1

import org.apache.spark.sql.catalyst.expressions.postgresql.{Age, ArrayAppend, ArrayLength, IntervalJustifyLike, Scale, SplitPart, StringToArray, UnNest}
import org.apache.spark.sql.extra.FunctionAliases

spark.sessionState.functionRegistry.registerFunction(Age.fd._1, Age.fd._2, Age.fd._3)
spark.sessionState.functionRegistry.registerFunction(FunctionAliases.array_cat._1, FunctionAliases.array_cat._2, FunctionAliases.array_cat._3)
spark.sessionState.functionRegistry.registerFunction(ArrayAppend.fd._1, ArrayAppend.fd._2, ArrayAppend.fd._3)
spark.sessionState.functionRegistry.registerFunction(ArrayLength.fd._1, ArrayLength.fd._2, ArrayLength.fd._3)
spark.sessionState.functionRegistry.registerFunction(IntervalJustifyLike.justifyDays._1, IntervalJustifyLike.justifyDays._2, IntervalJustifyLike.justifyDays._3)
spark.sessionState.functionRegistry.registerFunction(IntervalJustifyLike.justifyHours._1, IntervalJustifyLike.justifyHours._2, IntervalJustifyLike.justifyHours._3)
spark.sessionState.functionRegistry.registerFunction(IntervalJustifyLike.justifyInterval._1, IntervalJustifyLike.justifyInterval._2, IntervalJustifyLike.justifyInterval._3)
spark.sessionState.functionRegistry.registerFunction(Scale.fd._1, Scale.fd._2, Scale.fd._3)
spark.sessionState.functionRegistry.registerFunction(SplitPart.fd._1, SplitPart.fd._2, SplitPart.fd._3)
spark.sessionState.functionRegistry.registerFunction(StringToArray.fd._1, StringToArray.fd._2, StringToArray.fd._3)
spark.sessionState.functionRegistry.registerFunction(UnNest.fd._1, UnNest.fd._2, UnNest.fd._3)

Option 2

Cluster node initialization scripts

I haven't gotten this one working yet, but will keep you posted on my progress. Not sure what needs to be included in the cluster init script.

It's possible Option 1 will be better and we can just expose a wrapper function for itachi users to run that code. We should probably add a new function in Spark that makes it easier to register functions. This seems overly complicated.

yaooqinn commented 3 years ago

It looks like a limitation of Databricks runtime product to me 😁. It makes things complicated for pure SQL guys with third-party libs. IIUC, the spark.sql.extensions basically can not work with Databricks runtime CE, no matter what the extension is, FunctionInterface, ParserInterface, AnalyzerInterface.

Maybe there is another option if it supports:

  1. create an instance of SparkContext - sc
  2. Add Jar with itachi, sc.addJar("...")
  3. create SparkSession with extensions and the existing sc.
  4. run queries with spark
cloud-fan commented 3 years ago

IIUC, the spark.sql.extensions basically can not work with Databricks runtime CE

@hvanhovell Is this true? Doesn't seem like a reasonable limitation if ACL is not enabled. Also cc @liancheng as well.

hvanhovell commented 3 years ago

@cloud-fan this is correct. We initialize the session slightly differently, and as a result we load jars after the sessions has been created, we have an internal ticket to get this fixed (no ETA). The preferred way of doing this is to use init scripts (option 2).

cloud-fan commented 3 years ago

One idea is to extend the spark.sql.extensions config with resource files, similar to how we load DataSourceRegister. Then we can include the resource file in the jar and no need to set the config. @yaooqinn do you have interests to work on it?

MrPowers commented 3 years ago

@cloud-fan - Thanks for commenting. It'd be even better if we could register functions after a cluster is already started. The spark-alchemy project shows how this is possible. Here's the syntax.

import com.swoop.alchemy.spark.expressions.NativeFunctionRegistration
import org.apache.spark.sql.catalyst.expressions.ExpressionInfo
import org.apache.spark.sql.catalyst.expressions.postgresql.Age // from itachi
object ItachiFunctionRegistration extends NativeFunctionRegistration {
  val expressions: Map[String, (ExpressionInfo, FunctionBuilder)] = Map(
    expression[Age]("age")
  )
}
ItachiFunctionRegistration.registerFunctions(spark)

This is a bit more elegant than the syntax @alexott proposed here:

import org.apache.spark.sql.catalyst.expressions.postgresql.Age
import org.apache.spark.sql.extra.FunctionAliases

spark.sessionState.functionRegistry.registerFunction(Age.fd._1, Age.fd._2, Age.fd._3)

Itachi is an amazing project and think it'll be really important to get more users to Spark. I am going to suggest to the Databricks product team that they should add an "Enable Presto Syntax" button that'd give users the ability to load the relevant Itachi functions with the click of a button. That'd let users copy queries from AWS Athena / Postgres and run them in Databricks - I know a lot of users want this functionality.

I'm confident that the @yaooqinn + @cloud-fan Spark dream team can come up with an elegant solution that'll enable this great user experience for customers!

cloud-fan commented 3 years ago

Yea we can register functions after SparkSession is instantiated, as a workaround if the extension API can't be used. This means the library should provide an extra API for users to manually register functions with an existing SparkSession instance.

I think this is orthogonal to my proposal for making spark extension API easier to use. We can do both.

MrPowers commented 3 years ago

@cloud-fan - yep, I agree that registering functions with an existing SparkSession & making the spark extension API easier to work with are orthogonal chunks of work. Like your suggestion of doing both 😄

I will try to figure this out and send you a pull request. Might need your help 😉 Stay tuned!

alexott commented 3 years ago

@yaooqinn this is not a limitation of the Databricks runtime itself - this is a limitation of Databricks Community Edition.

yaooqinn commented 3 years ago

Sorry for the late reply. I was on holiday at that time.

One idea is to extend the spark.sql.extensions config with resource files, similar to how we load DataSourceRegister. Then we can include the resource file in the jar and no need to set the config.

Hi, @cloud-fan, according to the instructions that @MrPowers provides here, the reason for the problem is that DCE does not download the jars when start Spark Standalone cluster manager not a loading issue at Spark app side?

Thanks, @MrPowers for driving this. I am looking forward to your PR and willing to help.

cloud-fan commented 3 years ago

Databricks requires that you set the configuration options before starting the cluster and then attach the JAR file once the cluster is running. That's probably what's causing the error.

Ah, so the problem is DCE users can only attach jars after the cluster is running, so the solution should be to register itachi functions after the session is instantiated.

alexott commented 3 years ago

@cloud-fan exactly...

MrPowers commented 3 years ago

I wrote a blog post to recap what I learned from this conversation and share the knowledge with the Spark community. Thank you all for teaching me this valuable information.