r-spark / sparklyr.flint

Sparklyr extension making Flint time series library functionalities (https://github.com/twosigma/flint) easily accessible through R
9 stars 3 forks source link

Azure Databricks java.lang.ClassNotFoundException: com.twosigma.flint.timeseries.TimeSeriesRDDBuilder #55

Open josephd000 opened 3 years ago

josephd000 commented 3 years ago

Error

# Step 2: specify how the Spark dataframe should be interpreted as a time series by Flint
ts_rdd <- fromSDF(sdf, is_sorted = TRUE, time_unit = "SECONDS", time_column = "t")
Error : java.lang.ClassNotFoundException: com.twosigma.flint.timeseries.TimeSeriesRDDBuilder

Expectation

That I can use basic sparklyr.flint functions on Azure Databricks without classpath errors by using install.packages("sparklyr.flint").

Details

I've created a "Library" with flint-0.6.0 from Maven and installed it onto my cluster, detached and reattached my notebook, called library(sparklyr.flint) before spark_connect() and it still can't find the library.

Config

Reproducible code

install.packages("sparklyr")
install.packages("sparklyr.flint")
library(sparklyr)
library(sparklyr.flint)

# Step 0: decide which Spark version to use, how to connect to Spark, etc
# spark_version <- "3.0.0"
Sys.setenv(SPARK_HOME = "~/spark/spark-3.0.1-bin-hadoop3.2")
sc <- spark_connect(method = "databricks")

example_time_series <- data.frame(
  t = c(1, 3, 4, 6, 7, 10, 15, 16, 18, 19),
  v = c(4, -2, NA, 5, NA, 1, -4, 5, NA, 3)
)

# Step 1: import example time series data into a Spark dataframe
sdf <- copy_to(sc, example_time_series, overwrite = TRUE)

# Step 2: specify how the Spark dataframe should be interpreted as a time series by Flint
ts_rdd <- fromSDF(sdf, is_sorted = TRUE, time_unit = "SECONDS", time_column = "t")
Error : java.lang.ClassNotFoundException: com.twosigma.flint.timeseries.TimeSeriesRDDBuilder Error : java.lang.ClassNotFoundException: com.twosigma.flint.timeseries.TimeSeriesRDDBuilder
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
    at com.databricks.backend.daemon.driver.ClassLoaders$LibraryClassLoader.loadClass(ClassLoaders.scala:151)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:264)
    at sparklyr.StreamHandler.handleMethodCall(stream.scala:106)
    at sparklyr.StreamHandler.read(stream.scala:61)
    at sparklyr.BackendHandler.$anonfun$channelRead0$1(handler.scala:58)
    at scala.util.control.Breaks.breakable(Breaks.scala:42)
    at sparklyr.BackendHandler.channelRead0(handler.scala:39)
    at sparklyr.BackendHandler.channelRead0(handler.scala:14)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:295)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)
yitao-li commented 3 years ago

@josephd000 I think that is a good question for the Databricks folks.

My understanding is there are some extra levels of indirection with Spark connection when working with a Databricks cluster and also some form of jar file loading logic built into Databricks runtime which is entirely proprietary, so you will need some additional steps to make it work on a Databricks cluster.

Meanwhile if I do find something simple that make the Databricks use case work I'll let you know.

josephd000 commented 3 years ago

@yitao-li , I went digging through the sparklyr.flint code and found the non-exported function, sparklyr.flint:::spark_dependencies(). Running this, it returned:

sparklyr.flint:::spark_dependencies(spark_version = "3.0.1", scala_version = "2.12")
$jars
NULL

$packages
[1] "org.clapper:grizzled-slf4j_2.12:1.3.4"      "org.sparklyr:sparklyr-flint_3-0_2-12:0.7.0"

$initializer
NULL

$catalog
NULL

$repositories
[1] "https://github.com/org-sparklyr/sparklyr.flint/raw/maven2"

attr(,"class")
[1] "spark_dependency"

I then created those "Libraries" on Databricks by passing in the "packages" and "repositories" where the Databricks Library GUI asks for "Coordinates" and "Repository", respectively. After installing these two "Libraries" on my cluster, I was able to successfully use sparklyr.flint::from_sdf()! :)

yitao-li commented 3 years ago

@josephd000 Good to know! :+1: I guess I can look into whether those things can be streamlined a bit for Databricks clusters. In all other scenarios (e.g., working with a EMR cluster or running Spark in local mode) all dependencies are taken care of automatically based on what sparklyr.flint:::spark_dependencies() returns. I think sparklyr is trying to do the same with Databricks connection as well but probably installed the jar files to the wrong location somehow.

kehldaniel commented 3 years ago

I have the same issue with Spark 3.1.1, Scala 2.12, Sparklyr 1.7.1 and Sparklyr.flint 0.2.1. I don't think I can install libraries on the cluster, I hope there will be some smooth solution soon. Thank you for the great looking package!

yitao-li commented 3 years ago

@kehldaniel Did you also create a sparklyr connection using

sc <- spark_connect(method = "databricks")

or similar?

kehldaniel commented 3 years ago

Yes, (after trying hard with my own code that is running on my own laptop) I am running the exact same lines of code as in the original post by josephd000 and get the same error.