instaclustr / sample-KafkaSparkCassandra

Introductory sample scala app using Apache Spark Streaming to accept data from Kafka and write a summary to Cassandra.
23 stars 23 forks source link

Guava incompatibility issues #1

Closed alecinvan closed 8 years ago

alecinvan commented 8 years ago

Hello,

We installed Cloudera CDH5, Cassandra DataStax Community Distribution v3.3, now we run this sample-KafkaSparkCassandra on top of them, can getting such error:

Caused by: java.lang.IllegalStateException: Detected Guava issue #1635 which indicates that a version of Guava less than 16.01 is in use. This introduces codec resolution issues and potentially other incompatibility issues in the driver. Please upgrade to Guava 16.01 or later.

I wonder should I manually change Guava to 16.01 or later?

thanks

slater-ben commented 8 years ago

Hi Alec,

It seems that others have had a similar issue: https://groups.google.com/a/lists.datastax.com/forum/#!msg/spark-connector-user/HnTsWJkI5jo/8ZY1BbzEDAAJ

I would try changing the last line of build.sbt to: libraryDependencies += ("org.apache.spark" %% "spark-streaming-kafka" % "1.4.1").exclude("org.spark-project.spark", "unused").exclude("com.google","guava")

Which I think should stop the older version of guava from coming into the build from the spark streaming library. Let me know how you go.

alecinvan commented 8 years ago

Hello, Slater-ben

Thank you very much for the reply, I run as your suggestion and getting such error

Exception in thread "main" java.lang.ExceptionInInitializerError at com.simumind.streaming.KafkaSparkCassandra$.main(KafkaSparkCassandra.scala:58) at com.simumind.streaming.KafkaSparkCassandra.main(KafkaSparkCassandra.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.IllegalStateException: Detected Guava issue #1635 which indicates that a version of Guava less than 16.01 is in use. This introduces codec resolution issues and potentially other incompatibility issues in the driver. Please upgrade to Guava 16.01 or later. at com.datastax.driver.core.SanityChecks.checkGuava(SanityChecks.java:62) at com.datastax.driver.core.SanityChecks.check(SanityChecks.java:36) at com.datastax.driver.core.Cluster.(Cluster.java:67) ... 11 more

thanks

slater-ben commented 8 years ago

OK. That was a bit of a guess that didn't work out.

I'll see if I can find some time in the next few days to reproduce and come up with a fix. I know some have my colleagues have had some success with shading the Guava jar in maven but I'll need to see if I can figure that out in sbt.

Cheers Ben

alecinvan commented 8 years ago

Great! thanks a lot

slater-ben commented 8 years ago

H Alec,

I played around a bit but couldn't reproduce on our normal set up (and didn't really want to get into learning to set up CDH5). From a little bit of research I'd suggest trying a slightly difference version of that modified sbt line:

libraryDependencies += ("org.apache.spark" %% "spark-streaming-kafka" % "1.4.1").exclude("org.spark-project.spark", "unused").exclude("com.google.guava","guava-jdk5")

If that doesn't work can you provide some more detail on how you built the sample and the Cassandra connector? Did you follow the tutorial linked from the readme or a different approach?

Cheers Ben

alecinvan commented 8 years ago

thanks Ben

CaptainDylan commented 8 years ago

Is there a resolution for this? I am running into the same error. Using Spark 1.5.1 and Cassandra connector 1.5.0. The above suggestions did not help.

slater-ben commented 8 years ago

Hi, we didn't resolve this particular one although we've been working through some similar issues as we update our Spark offering to 1.6. One of the guys working on that pointed me at this solution: http://arjon.es/2015/10/12/making-hadoop-2-dot-6-plus-spark-cassandra-driver-play-nice-together/

I would say there is a fair chance adding the shade rule mentioned there will work:

assemblyShadeRules in assembly := Seq(
  ShadeRule.rename("com.google.**" -> "shadeio.@1").inAll
)
slater-ben commented 8 years ago

We've tested this with the Instaclustr Managed Spark 1.6 offering now and didn't have any issue so I will close this issue. I suspect the suggestion above will work if anyone runs into the issue in other environments.

ishumishra commented 8 years ago

Spark Cassandra Connector (1.5 and 1.6) both are unstable and throwing similar errors when you try to use them. Building correct connector itself is a difficult task. Before building the connector you may not be aware which version of Spark will work with which version of cassandra and which version of scala do you have or should you have and which version of connector should you get from githhub.

After lots of fight when you are able to build the connector it says incorrect guava.

I am not providing any solution because after wasting lots of time I have not yet resolved it and not able to run a simple hello from spark to cassandra but what I can give you a suggestion is to save your time and utilize in something by the time issues in github are resolved and build continues to pass without any error. Very unfortunate and unstable without support project.

slater-ben commented 8 years ago

Hi ishumura,

I agree that with Spark generally, and particularly the Cassandra connector, getting all the dependencies lined up, etc can be work.

Are you using the Instaclustr managed service? If so, these samples have all been tested to build and run with the versions we provide there and our support team is available to help if you do have issues.

If you are not currently using Instaclustr then you can get a 14 day free trial via this page if you are interested: http://www.instaclustr.com/solutions/spark/

Cheers Ben

ishumishra commented 8 years ago

Less than an hour back following steps were performed: [info] Packaging /home/cloudera/spark-cassandra-connector/spark-cassandra-connector/target/scala-2.10/spark-cassandra-connector-assembly-1.5.0.jar ... [info] Done packaging. [info] ScalaTest [info] Run completed in 9 seconds, 391 milliseconds. [info] Total number of tests run: 0 [info] Suites: completed 0, aborted 0 [info] Tests: succeeded 0, failed 0, canceled 0, ignored 0, pending 0 [info] No tests were executed. [info] Passed: Total 44, Failed 0, Errors 0, Passed 44 [info] Checking every .class/.jar file's SHA-1. [info] Merging files... [warn] Merging 'META-INF/INDEX.LIST' with strategy 'last' [warn] Merging 'META-INF/LICENSE.txt' with strategy 'last' [warn] Merging 'META-INF/MANIFEST.MF' with strategy 'discard' [warn] Merging 'META-INF/NOTICE.txt' with strategy 'last' [warn] Merging 'META-INF/io.netty.versions.properties' with strategy 'last' [warn] Strategy 'discard' was applied to a file [warn] Strategy 'last' was applied to 4 files [info] SHA-1: bb6e4b6f390182d31edb1e4de9fec8490d18c57a [info] Packaging /home/cloudera/spark-cassandra-connector/spark-cassandra-connector-java/target/scala-2.10/spark-cassandra-connector-java-assembly-1.5.0.jar ... [info] Done packaging. [success] Total time: 790 s, completed Apr 4, 2016 2:04:32 PM ... ... [cloudera@quickstart spark-cassandra-connector]$ spark-shell --jars /home/cloudera/spark-cassandra-connector/spark-cassandra-connector-java/target/scala-2.10/spark-cassandra-connector-java-assembly-1.5.0.jar SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/usr/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 16/04/04 14:19:13 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 16/04/04 14:19:14 INFO spark.SecurityManager: Changing view acls to: cloudera 16/04/04 14:19:14 INFO spark.SecurityManager: Changing modify acls to: cloudera 16/04/04 14:19:14 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(cloudera); users with modify permissions: Set(cloudera) 16/04/04 14:19:15 INFO spark.HttpServer: Starting HTTP Server 16/04/04 14:19:15 INFO server.Server: jetty-8.y.z-SNAPSHOT 16/04/04 14:19:15 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:35937 16/04/04 14:19:15 INFO util.Utils: Successfully started service 'HTTP class server' on port 35937. ...................................................... ................. 6/04/04 14:20:21 INFO session.SessionState: No Tez session required at this point. hive.execution.engine=mr. 16/04/04 14:20:21 INFO repl.SparkILoop: Created sql context (with Hive support).. SQL context available as sqlContext.

scala> sc.stop

16/04/04 14:21:50 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/static/sql,null}


16/04/04 14:21:50 INFO spark.SparkContext: Successfully stopped SparkContext

scala> 16/04/04 14:21:50 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 16/04/04 14:21:50 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 16/04/04 14:21:50 INFO Remoting: Remoting shut down 16/04/04 14:21:50 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down.

scala> import com.datastax.spark.connector. import com.datastax.spark.connector.

scala> import org.apache.spark.SparkContext import org.apache.spark.SparkContext

scala> import org.apache.spark.SparkContext. import org.apache.spark.SparkContext.

scala> import org.apache.spark.SparkConf import org.apache.spark.SparkConf

scala> val conf = new SparkConf(true).set("spark.cassandra.connection.host", "localhost") conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@186b141d

scala> val sc = new SparkContext(conf)

16/04/04 14:24:39 INFO spark.SparkContext: Running Spark version 1.5.0-cdh5.5.0


16/04/04 14:24:43 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 41978. 16/04/04 14:24:43 INFO netty.NettyBlockTransferService: Server created on 41978 16/04/04 14:24:43 INFO storage.BlockManagerMaster: Trying to register BlockManager 16/04/04 14:24:43 INFO storage.BlockManagerMasterEndpoint: Registering block manager localhost:41978 with 534.5 MB RAM, BlockManagerId(driver, localhost, 41978) 16/04/04 14:24:43 INFO storage.BlockManagerMaster: Registered BlockManager sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@39b2831f

scala> val test_rdd=sc.cassandraTable("satish", "chele") test_rdd: com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandraTableScanRDD[0] at RDD at CassandraRDD.scala:15

scala> test_rdd.first java.lang.IllegalStateException: Detected Guava issue #1635 which indicates that a version of Guava less than 16.01 is in use. This introduces codec resolution issues and potentially other incompatibility issues in the driver. Please upgrade to Guava 16.01 or later. at com.datastax.driver.core.SanityChecks.checkGuava(SanityChecks.java:62) at com.datastax.driver.core.SanityChecks.check(SanityChecks.java:36)

ishumishra commented 8 years ago

And here is the out come of all that hard work, congratulations: scala> test_rdd.first java.lang.IllegalStateException: Detected Guava issue #1635 which indicates that a version of Guava less than 16.01 is in use. This introduces codec resolution issues and potentially other incompatibility issues in the driver. Please upgrade to Guava 16.01 or later. at com.datastax.driver.core.SanityChecks.checkGuava(SanityChecks.java:62) at com.datastax.driver.core.SanityChecks.check(SanityChecks.java:36) at com.datastax.driver.core.Cluster.(Cluster.java:67) at com.datastax.spark.connector.cql.DefaultConnectionFactory$.clusterBuilder(CassandraConnectionFactory.scala:35) at com.datastax.spark.connector.cql.DefaultConnectionFactory$.createCluster(CassandraConnectionFactory.scala:87) at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:153) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:148) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:148) at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:31) at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:56) at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:81) at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:109) at com.datastax.spark.connector.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:120) at com.datastax.spark.connector.cql.Schema$.fromCassandra(Schema.scala:254) at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.tableDef(CassandraTableRowReaderProvider.scala:51) at com.datastax.spark.connector.rdd.CassandraTableScanRDD.tableDef$lzycompute(CassandraTableScanRDD.scala:59) at com.datastax.spark.connector.rdd.CassandraTableScanRDD.tableDef(CassandraTableScanRDD.scala:59) at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.verify(CassandraTableRowReaderProvider.scala:146) at com.datastax.spark.connector.rdd.CassandraTableScanRDD.verify(CassandraTableScanRDD.scala:59) at com.datastax.spark.connector.rdd.CassandraTableScanRDD.getPartitions(CassandraTableScanRDD.scala:143) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1277) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) at org.apache.spark.rdd.RDD.withScope(RDD.scala:306) at org.apache.spark.rdd.RDD.take(RDD.scala:1272) at com.datastax.spark.connector.rdd.CassandraRDD.take(CassandraRDD.scala:121) at com.datastax.spark.connector.rdd.CassandraRDD.take(CassandraRDD.scala:122) at org.apache.spark.rdd.RDD$$anonfun$first$1.apply(RDD.scala:1312) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) at org.apache.spark.rdd.RDD.withScope(RDD.scala:306) at org.apache.spark.rdd.RDD.first(RDD.scala:1311) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:34) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:39) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:41) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:43) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:45) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:47) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:49) at $iwC$$iwC$$iwC$$iwC$$iwC.(:51) at $iwC$$iwC$$iwC$$iwC.(:53) at $iwC$$iwC$$iwC.(:55) at $iwC$$iwC.(:57) at $iwC.(:59) at (:61) at .(:65) at .() at .(:7) at .() at $print() at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

slater-ben commented 8 years ago

Yes - there seems to be an incompatibility between the Cloudera distribution and the Cassandra Spark Connector. If the shading suggestion above doesn't work for you then I'd try logging an issue with the Cassandra connector project or posting to their mailing list (see https://github.com/datastax/spark-cassandra-connector).