h2oai / sparkling-water

Sparkling Water provides H2O functionality inside Spark cluster
https://docs.h2o.ai/sparkling-water/3.3/latest-stable/doc/index.html
Apache License 2.0
967 stars 359 forks source link

Error connecting rsparkling with sparkling-water in aws #2095

Closed joscani closed 4 years ago

joscani commented 4 years ago

Hi. I'm using sparkling-water 3.30.0.1 in aws fusing spark-scala , but I have problem with connecting with R. R version in aws is 3.4.1. My code is

library(tidyverse)
library(rsparkling)
library(sparklyr)
library(h2o)

options(rsparkling.sparklingwater.version = "3.30.0.1-1-2.3")
options(rsparkling.sparklingwater.location = "/home/jcanadar/sparkling-water-3.30.0.1-1-2.3/jars/sparkling-water-assembly_2.11-3.30.0.1-1-2.3-all.jar")

spark_path <- "/usr/lib/spark"
Sys.setenv(SPARK_HOME = spark_path)

conf <- sparklyr::spark_config()
conf$spark.sql.catalogImplementation <- "hive"
conf$spark.dynamicAllocation.enabled <- "false"
conf$spark.executor.instances <- 10

conf$spark.executor.cores <- 5
conf$spark.executor.memory <-  "10G"
conf$spark.driver.memory <- "2g"
conf$spark.memory.fraction <- 0.95

spark <- spark_connect(master = "yarn",
                    version = "2.3.2",
                    config = conf)

# error initialize spark with rsparkling before create h2ocontext
hc <- H2OContext.getOrCreate() 
Error: org.apache.spark.sql.AnalysisException: java.lang.NoSuchMethodError: com.amazonaws.transform.JsonErrorUnmarshaller.<init>(Ljava/lang/Class;Ljava/lang/String;)V;
    at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:106)
    at org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:194)
    at org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:114)
    at org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:102)
    at org.apache.spark.sql.hive.HiveSessionStateBuilder.externalCatalog(HiveSessionStateBuilder.scala:39)
    at org.apache.spark.sql.hive.HiveSessionStateBuilder.catalog$lzycompute(HiveSessionStateBuilder.scala:54)
    at org.apache.spark.sql.hive.HiveSessionStateBuilder.catalog(HiveSessionStateBuilder.scala:52)
    at org.apache.spark.sql.hive.HiveSessionStateBuilder$$anon$1.<init>(HiveSessionStateBuilder.scala:69)
    at org.apache.spark.sql.hive.HiveSessionStateBuilder.analyzer(HiveSessionStateBuilder.scala:69)
    at org.apache.spark.sql.internal.BaseSessionStateBuilder$$anonfun$build$2.apply(BaseSessionStateBuilder.scala:293)
    at org.apache.spark.sql.internal.BaseSessionStateBuilder$$anonfun$build$2.apply(BaseSessionStateBuilder.scala:293)
    at org.apache.spark.sql.internal.SessionState.analyzer$lzycompute(SessionState.scala:79)
    at org.apache.spark.sql.internal.SessionState.analyzer(SessionState.scala:79)
    at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
    at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
    at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at sparklyr.Invoke.invoke(invoke.scala:147)
    at sparklyr.StreamHandler.handleMethodCall(stream.scala:136)
    at sparklyr.StreamHandler.read(stream.scala:61)
    at sparklyr.BackendHandler$$anonfun$channelRead0$1.apply$mcV$sp(handler.scala:58)
    at scala.util.control.Breaks.breakable(Breaks.scala:38)
    at sparklyr.BackendHandler.channelRead0(handler.scala:38)
    at sparklyr.BackendHandler.channelRead0(handler.scala:14)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoSuchMethodError: com.amazonaws.transform.JsonErrorUnmarshaller.<init>(Ljava/lang/Class;Ljava/lang/String;)V
    at com.amazonaws.protocol.json.SdkJsonProtocolFactory.createErrorUnmarshallers(SdkJsonProtocolFactory.java:105)
    at com.amazonaws.protocol.json.SdkJsonProtocolFactory.<init>(SdkJsonProtocolFactory.java:49)
    at com.amazonaws.services.glue.AWSGlueClient.<clinit>(AWSGlueClient.java:140)
    at com.amazonaws.services.glue.AWSGlueClientBuilder.build(AWSGlueClientBuilder.java:61)
    at com.amazonaws.services.glue.AWSGlueClientBuilder.build(AWSGlueClientBuilder.java:27)
    at com.amazonaws.client.builder.AwsSyncClientBuilder.build(AwsSyncClientBuilder.java:46)
    at com.amazonaws.glue.catalog.metastore.AWSGlueClientFactory.newClient(AWSGlueClientFactory.java:70)
    at com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient.<init>(AWSCatalogMetastoreClient.java:146)
    at com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory.createMetaStoreClient(AWSGlueDataCatalogHiveClientFactory.java:16)
    at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:3007)
    at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3042)
    at org.apache.hadoop.hive.ql.metadata.Hive.getAllDatabases(Hive.java:1235)
    at org.apache.hadoop.hive.ql.metadata.Hive.reloadFunctions(Hive.java:175)
    at org.apache.hadoop.hive.ql.metadata.Hive.<clinit>(Hive.java:167)
    at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:503)
    at org.apache.spark.sql.hive.client.HiveClientImpl.newState(HiveClientImpl.scala:180)
    at org.apache.spark.sql.hive.client.HiveClientImpl.<init>(HiveClientImpl.scala:114)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:264)
    at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:385)
    at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:287)
    at org.apache.spark.sql.hive.HiveExternalCatalog.client$lzycompute(HiveExternalCatalog.scala:66)
    at org.apache.spark.sql.hive.HiveExternalCatalog.client(HiveExternalCatalog.scala:65)
    at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.apply$mcZ$sp(HiveExternalCatalog.scala:195)
    at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.apply(HiveExternalCatalog.scala:195)
    at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.apply(HiveExternalCatalog.scala:195)
    at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:97)
    ... 53 more

And before

jakubhava commented 4 years ago

@joscani Can you please use latest version 3.30.0.2 ? On that version we have shaded the AWS dependencies so they don't collide with the environment.

joscani commented 4 years ago

I'll try the latest version next week. Thanks @jakubhava

joscani commented 4 years ago

Trying new version, 3.30.0.2 , h2oContext is up but I get other error.

library(tidyverse)
librrary(dplyr)
library(rsparkling)
library(sparklyr)
library(h2o)

options(rsparkling.sparklingwater.version = "3.30.0.2-1-2.3")
options(rsparkling.sparklingwater.location = "/home/jcanadar/sparkling-water-3.30.0.2-1-2.3/jars/sparkling-water-assembly_2.11-3.30.0.2-1-2.3-all.jar")

spark_path <- "/usr/lib/spark"
Sys.setenv(SPARK_HOME = spark_path)

conf <- sparklyr::spark_config()
conf$spark.sql.catalogImplementation <- "hive"
conf$spark.dynamicAllocation.enabled <- "false"
conf$spark.executor.instances <- 15

conf$spark.executor.cores <- 7
conf$spark.executor.memory <-  "40G"
conf$spark.driver.memory <- "2g"
conf$spark.memory.fraction <- 0.95

spark <- spark_connect(master = "yarn",
                    version = "2.3.2",
                    config = conf)

hc <- H2OContext.getOrCreate() 

hc is up and show the ips.


Sparkling Water Context:
 * Sparkling Water Version: 3.30.0.2-1-2.3
 * H2O name: jcanadar
 * cluster size: 15
 * list of used nodes:
  (executorId, host, port)
  ------------------------
  (0,100.82.4.9,54321)
  (1,100.82.4.9,54323)
  (2,100.82.4.9,54325)
  (3,100.82.4.9,54327)
  (4,100.82.8.28,54321)
  (5,100.82.8.28,54323)
  (6,100.82.8.28,54325)
  (7,100.82.8.28,54327)
  (8,100.82.5.50,54321)
  (9,100.82.5.50,54323)
  (10,100.82.5.50,54325)
  (11,100.82.5.50,54327)
  (12,100.82.9.75,54321)
  (13,100.82.9.75,54323)
  (14,100.82.9.75,54325)
  ------------------------

But I can't convert to h2oframe from spark dataframe

mtcars_tbl <- copy_to(spark, mtcars, overwrite = TRUE)
mtcars_tbl

mtcars_hf <- hc$asH2OFrame(mtcars_tbl)
mtcars_hf

> mtcars_hf <- hc$asH2OFrame(mtcars_tbl)
Error: java.lang.IllegalArgumentException: Object not found 33
    at sparklyr.StreamHandler.handleMethodCall(stream.scala:110)
    at sparklyr.StreamHandler.read(stream.scala:61)
    at sparklyr.BackendHandler$$anonfun$channelRead0$1.apply$mcV$sp(handler.scala:58)
    at scala.util.control.Breaks.breakable(Breaks.scala:38)
    at sparklyr.BackendHandler.channelRead0(handler.scala:38)
    at sparklyr.BackendHandler.channelRead0(handler.scala:14)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
    at java.lang.Thread.run(Thread.java:748)

Thanks

jakubhava commented 4 years ago

Can you verify that mtcars_tbl <- copy_to(spark, mtcars, overwrite = TRUE) was successful and spark dataframe was returned?

joscani commented 4 years ago
> hc <- H2OContext.getOrCreate() 
 Connection successful!

R is connected to the H2O cluster: 
    H2O cluster uptime:         31 seconds 292 milliseconds 
    H2O cluster timezone:       UTC 
    H2O data parsing timezone:  UTC 
    H2O cluster version:        3.30.0.2 
    H2O cluster version age:    13 days  
    H2O cluster name:           sparkling-water-jcanadar_application_1589258341663_0009 
    H2O cluster total nodes:    15 
    H2O cluster total memory:   594.93 GB 
    H2O cluster total cores:    720 
    H2O cluster allowed cores:  105 
    H2O cluster healthy:        TRUE 
    H2O Connection ip:          100.82.3.4 
    H2O Connection port:        54321 
    H2O Connection proxy:       NA 
    H2O Internal Security:      FALSE 
    H2O API Extensions:         XGBoost, Algos, Amazon S3, Sparkling Water REST API Extensions, AutoML, Core V3, TargetEncoder, Core V4 
    R Version:                  R version 3.4.1 (2017-06-30) 

Reference class object of class "H2OContext"
Field "jhc":
<jobj[41]>
  org.apache.spark.h2o.H2OContext

Sparkling Water Context:
 * Sparkling Water Version: 3.30.0.2-1-2.3
 * H2O name: jcanadar
 * cluster size: 15
 * list of used nodes:
  (executorId, host, port)
  ------------------------
  (0,100.82.3.4,54321)
  (1,100.82.3.4,54323)
  (2,100.82.3.4,54325)
  (3,100.82.3.4,54327)
  (4,100.82.0.65,54321)
  (5,100.82.0.65,54323)
  (6,100.82.0.65,54325)
  (7,100.82.0.65,54327)
  (8,100.82.12.99,54321)
  (9,100.82.12.99,54323)
  (10,100.82.12.99,54325)
  (11,100.82.9.200,54321)
  (12,100.82.9.200,54323)
  (13,100.82.9.200,54325)
  (14,100.82.9.200,54327)
  ------------------------

  Open H2O Flow in browser: http://100.82.14.32:54323 (CMD + click in Mac OSX)

 * Yarn App ID of Spark application: application_1589258341663_0009

> mtcars_tbl <- copy_to(spark, mtcars, overwrite = TRUE)
> mtcars_tbl
# Source: spark<mtcars> [?? x 11]
     mpg   cyl  disp    hp  drat    wt  qsec    vs    am  gear  carb
   <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
 1  21       6  160    110  3.9   2.62  16.5     0     1     4     4
 2  21       6  160    110  3.9   2.88  17.0     0     1     4     4
 3  22.8     4  108     93  3.85  2.32  18.6     1     1     4     1
 4  21.4     6  258    110  3.08  3.22  19.4     1     0     3     1
 5  18.7     8  360    175  3.15  3.44  17.0     0     0     3     2
 6  18.1     6  225    105  2.76  3.46  20.2     1     0     3     1
 7  14.3     8  360    245  3.21  3.57  15.8     0     0     3     4
 8  24.4     4  147.    62  3.69  3.19  20       1     0     4     2
 9  22.8     4  141.    95  3.92  3.15  22.9     1     0     4     2
10  19.2     6  168.   123  3.92  3.44  18.3     1     0     4     4
# … with more rows
> mtcars_hf <- hc$asH2OFrame(mtcars_tbl)
> mtcars_hf
   mpg cyl disp  hp drat    wt  qsec vs am gear carb
1 21.0   6  160 110 3.90 2.620 16.46  0  1    4    4
2 21.0   6  160 110 3.90 2.875 17.02  0  1    4    4
3 22.8   4  108  93 3.85 2.320 18.61  1  1    4    1
4 21.4   6  258 110 3.08 3.215 19.44  1  0    3    1
5 18.7   8  360 175 3.15 3.440 17.02  0  0    3    2
6 18.1   6  225 105 2.76 3.460 20.22  1  0    3    1

This time works, maybe it was other problem with aws enviroment. When I try with my big dataset I get this error.

 train_hex <- hc$asH2OFrame(train_tbl, "train_hex")
Error: ai.h2o.sparkling.backend.exceptions.RestApiNotReachableException: External H2O node 100.82.3.4:54321 is not reachable.
Please verify that you are passing ip and port of existing cluster node and the cluster

It was a memory issue, I close the issue. Thanks @jakubhava for all your time