microsoft / Mobius

C# and F# language binding and extensions to Apache Spark
MIT License
941 stars 213 forks source link

Getting error with Spark Streaming from Kafka #654

Open rmattos opened 7 years ago

rmattos commented 7 years ago

Hi,

I´m trying to run an application that receive stream from kafka, but i´m facing the follow error:

D:\Spark\spark-clr_2.11-2.0.200\runtime\scripts>sparkclr-submit.cmd debug --jars D:\Spark\spark-clr_2.11-2.0.200\runtime\dependencies\spark-streaming-kafka-0-10-assembly_2.11-2.0.2.jar --exe SocialMiner.Alexandria.Assistant.exe D:\Projects\socialminer-alexandria\Workers\SocialMiner.Alexandria.Assistant\bin\Debug [sparkclr-submit.cmd] SPARKCLR_JAR=spark-clr_2.11-2.0.200.jar [sparkclr-submit.cmd] LAUNCH_CLASSPATH="D:\Spark\spark-clr_2.11-2.0.200\runtime\lib\spark-clr_2.11-2.0.200.jar;D:\Spark\spark-2.0.2-bin-hadoop2.4\jars*" Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 17/04/16 23:43:05 INFO CSharpRunner: Starting CSharpBackend! 17/04/16 23:43:05 INFO CSharpRunner: Port number used by CSharpBackend is 5567


more error details:

[2017-04-17T03:03:12.2648354Z] [ROGER] [Error] [JvmBridge] JVM method execution failed: Constructor failed for class org.apache.spark.streaming.api.java.JavaStreamingContext when called with 1 parameters ([Index=1, Type=String, Value=s3n://path/to/checkpoint], ) [2017-04-17T03:03:12.2653351Z] [ROGER] [Error] [JvmBridge] java.lang.IllegalArgumentException: requirement failed: Spark Streaming cannot be initialized with both SparkContext and checkpoint as null at scala.Predef$.require(Predef.scala:224) at org.apache.spark.streaming.StreamingContext.(StreamingContext.scala:131) at org.apache.spark.streaming.StreamingContext.(StreamingContext.scala:111) at org.apache.spark.streaming.api.java.JavaStreamingContext.(JavaStreamingContext.scala:144) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.spark.api.csharp.CSharpBackendHandler.handleMethodCall(CSharpBackendHandler.scala:167) at org.apache.spark.api.csharp.CSharpBackendHandler.handleBackendRequest(CSharpBackendHandler.scala:106) at org.apache.spark.api.csharp.CSharpBackendHandler.channelRead0(CSharpBackendHandler.scala:32) at org.apache.spark.api.csharp.CSharpBackendHandler.channelRead0(CSharpBackendHandler.scala:28) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137) at java.lang.Thread.run(Thread.java:745)

[2017-04-17T03:03:12.2693363Z] [ROGER] [Exception] [JvmBridge] JVM method execution failed: Constructor failed for class org.apache.spark.streaming.api.java.JavaStreamingContext when called with 1 parameters ([Index=1, Type=String, Value=s3n://path/to/checkpoint], ) at Microsoft.Spark.CSharp.Interop.Ipc.JvmBridge.CallJavaMethod(Boolean isStatic, Object classNameOrJvmObjectReference, String methodName, Object[] parameters)

Do you have any idea what could be, i tried different .jar versions to spark-streaming-kafka but i´m still getting the same error.

skaarthik commented 7 years ago

There is a bug in the code, I guess, based on the error message "java.lang.IllegalArgumentException: requirement failed: Spark Streaming cannot be initialized with both SparkContext and checkpoint as null"

rmattos commented 7 years ago

I´m to trying execute the example code:

const string topic = "test"; const string checkpointPath = "s3n://path/to/checkpoint";

StreamingContext ssc = StreamingContext.GetOrCreate(checkpointPath, () => { var conf = new SparkConf(); SparkContext sc = new SparkContext(conf); StreamingContext context = new StreamingContext(sc, 2000L); context.Checkpoint(checkpointPath);

                var perTopicPartitionKafkaOffsets = new Dictionary<string, long>();
                var kafkaParams = new Dictionary<string, string> {
                    {"bootstrap.servers", "localhost:9092" },
                    {"auto.offset.reset", "latest" }
                };

                var dstream = KafkaUtils.CreateDirectStream(context, new List<string> { topic }, kafkaParams.Select(v => new Tuple<string, string>(v.Key, v.Value)), perTopicPartitionKafkaOffsets.Select(v => new Tuple<string, long>(v.Key, v.Value)));

                dstream.ForeachRDD((time, rdd) =>
                {
                    long batchCount = rdd.Count();
                    int numPartitions = rdd.GetNumPartitions();

                    Console.WriteLine("-------------------------------------------");
                    Console.WriteLine("Time: {0}", time);
                    Console.WriteLine("-------------------------------------------");
                    Console.WriteLine("Count: " + batchCount);
                    Console.WriteLine("Partitions: " + numPartitions);
                });

                return context;
            });

        ssc.Start();
        ssc.AwaitTermination();
skaarthik commented 7 years ago

I have never tried using S3 for checkpoints. So, I have no idea why context initiali fails for you. You may want to try running a Spark Streaming app that uses S3 and implemented in Scala before trying out S3 with Mobius.

skaarthik commented 7 years ago

I guess you may need to include path to S3 jar in Mobius class path (set in sparkclr-submit.cmd) and tweak HADOOP configuration.