ogirardot / spark-riak-example

Spark and Riak connector example in Scala
GNU General Public License v2.0
0 stars 0 forks source link

Exception in thread "main" java.util.concurrent.ExecutionException when submit to Spark #1

Open tuanho27 opened 8 years ago

tuanho27 commented 8 years ago

Hello, At the beginning I wrote my simple project but I stuck at connect to riak to save & query data. So, I try with your example. However, the met this problem, bellow is my setting:

ENV: Spark 1.6.2 Riak KV 2.1.4 Spark-riak-connector: 1.5.1

RUN: sbt assembly & then submit to spark spark-submit --class "com.lateralthoughts.spark.QueryAll" \ --master local[4] \ target/scala-2.10/spark-riak-example-assembly-1.0.jar

===> Issues: Exception in thread "main" java.util.concurrent.ExecutionException: com.basho.riak.client.core.netty.RiakResponseException: Unknown message code: 70 at com.basho.riak.client.core.FutureOperation.get(FutureOperation.java:314) at com.basho.riak.client.api.commands.CoreFutureAdapter.get(CoreFutureAdapter.java:52) at com.basho.riak.client.api.RiakCommand.execute(RiakCommand.java:89) at com.basho.riak.client.api.RiakClient.execute(RiakClient.java:328) at com.basho.riak.spark.rdd.connector.CachedSession.execute(SessionCache.scala:233) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.basho.riak.spark.rdd.connector.SessionProxy.invoke(SessionCache.scala:196) at com.sun.proxy.$Proxy13.execute(Unknown Source) at com.basho.riak.spark.rdd.partitioner.RiakCoveragePlanBasedPartitioner$$anonfun$partitions$1.apply(RiakCoveragePlanBasedPartitioner.scala:73) at com.basho.riak.spark.rdd.partitioner.RiakCoveragePlanBasedPartitioner$$anonfun$partitions$1.apply(RiakCoveragePlanBasedPartitioner.scala:64) at com.basho.riak.spark.rdd.connector.RiakConnector$$anonfun$withSessionDo$1.apply(RiakConnector.scala:58) at com.basho.riak.spark.rdd.connector.RiakConnector$$anonfun$withSessionDo$1.apply(RiakConnector.scala:58) at com.basho.riak.spark.rdd.connector.RiakConnector.closeSessionAfterUse(RiakConnector.scala:63) at com.basho.riak.spark.rdd.connector.RiakConnector.withSessionDo(RiakConnector.scala:58) at com.basho.riak.spark.rdd.connector.RiakConnector.withSessionDo(RiakConnector.scala:60) at com.basho.riak.spark.rdd.partitioner.RiakCoveragePlanBasedPartitioner$.partitions(RiakCoveragePlanBasedPartitioner.scala:64) at com.basho.riak.spark.rdd.RiakRDD.getPartitions(RiakRDD.scala:53) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1922) at com.basho.riak.spark.rdd.RDDFunctions.saveToRiak(RDDFunctions.scala:43) at com.lateralthoughts.spark.QueryAll$delayedInit$body.apply(QueryAll.scala:20) at scala.Function0$class.apply$mcV$sp(Function0.scala:40) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32) at scala.App$class.main(App.scala:71) at com.lateralthoughts.spark.QueryAll$.main(QueryAll.scala:6) at com.lateralthoughts.spark.QueryAll.main(QueryAll.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: com.basho.riak.client.core.netty.RiakResponseException: Unknown message code: 70 at com.basho.riak.client.core.netty.RiakResponseHandler.channelRead(RiakResponseHandler.java:52) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244) at io.netty.handler.codec.ByteToMessageCodec.channelRead(ByteToMessageCodec.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137) at java.lang.Thread.run(Thread.java:745) 16/09/07 22:00:38 WARN SessionCache: Session Cleaner was interrunpted java.lang.InterruptedException: sleep interrupted at java.lang.Thread.sleep(Native Method) at com.basho.riak.spark.rdd.connector.SessionCache$$anon$1.run(SessionCache.scala:166)

I am a newbie, but I see that, your scala code is not have a "main" function in class. Is that make this issue???

ogirardot commented 8 years ago

Hi, I think you're just using incompatible versions for Riak and the connector. Regarding the main, there is one (otherwise it would not launch) and clearly defined in the stacktrace, scala.App$class.main(App.scala:71) at com.lateralthoughts.spark.QueryAll$.main(QueryAll.scala:6) at the extends App creates a main.

Can you test with similar versions for the connector and storage ?

tuanho27 commented 8 years ago

@ogirardot, Thanks for reply.

Regarding the main, there is one (otherwise it would not launch) and clearly defined in the stacktrace,

Yes. I see

Can you test with similar versions for the connector and storage ?

Below is the define version in the connector release docs, version 1.6.0 (release within this week)

Compatibility Riak TS 1.3.1+ Apache Spark 1.6+ Scala 2.10 and 2.11 Java 8 Coming Soon Support for Riak KV 2.3 and later [(https://github.com/basho/spark-riak-connector)]

It does not define in release doc of the Connector for riak KV. Currently, my app is use Riak KV 2.1.4. It's does not support yet, right? Could you provide me what the version is compatible OR just give me your version setting environment that work well with this example?