microsoft / Mobius

C# and F# language binding and extensions to Apache Spark
MIT License
941 stars 213 forks source link

failed to run SparkClrKafka - SerializationException in ForeachRDD #682

Open unruledboy opened 6 years ago

unruledboy commented 6 years ago

Hi,

My environment:

And run the batch word count example like this:

C:\Mobius-master\build\runtime\scripts\sparkclr-submit.cmd --verbose --jars C:\Mobius-master\build\runtime\dependencies\spark-csv_2.10-1.4.0.jar,C:\Mobius-master\build\runtime\dependencies\commons-csV-1.4.jar --exe SparkClrKafka.exe C:\Mobius-master\examples\Streaming\Kafka\bin\Debug\ localhost:9092 test C:\Hadoop\CheckPoint C:\Output\WordCount

Parameters:

  1. localhost:9092
  2. test
  3. C:\Hadoop\CheckPoint
  4. C:\DataLake\Output\WordCount

And I get the output like the following. I read about the issue at https://github.com/Microsoft/Mobius/issues/631 and follow the instruction but still same.

Any idea? Thanks.

Classpath elements: file:/C:/Mobius-master/build/runtime/lib/spark-clr_2.11-2.0.200.jar file:/C:/Mobius-master/build/runtime/dependencies/spark-csv_2.10-1.4.0.jar file:/C:/Mobius-master/build/runtime/dependencies/commons-csV-1.4.jar

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 18/02/22 18:05:19 INFO CSharpRunner: Starting CSharpBackend! 18/02/22 18:05:19 INFO CSharpRunner: Port number used by CSharpBackend is 52061 18/02/22 18:05:19 INFO CSharpRunner: Adding key=spark.jars and value=file:/C:/Mobius-master/build/runtime/dependencies/spark-csv_2.10-1.4.0.jar,file:/C:/Mobius-master/build/runtime/dependencies/commons-csV-1.4.jar,file:/C:/Mobius-master/build/runtime/lib/spark-clr_2.11-2.0.200.jar to environment 18/02/22 18:05:19 INFO CSharpRunner: Adding key=spark.app.name and value=SparkClrKafka to environment 18/02/22 18:05:19 INFO CSharpRunner: Adding key=spark.submit.deployMode and value=client to environment 18/02/22 18:05:19 INFO CSharpRunner: Adding key=spark.master and value=local[*] to environment 18/02/22 18:05:19 INFO SparkContext: Running Spark version 2.0.2 18/02/22 18:05:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 18/02/22 18:05:20 INFO SecurityManager: Changing view acls to: Administrator 18/02/22 18:05:20 INFO SecurityManager: Changing modify acls to: Administrator 18/02/22 18:05:20 INFO SecurityManager: Changing view acls groups to: 18/02/22 18:05:20 INFO SecurityManager: Changing modify acls groups to: 18/02/22 18:05:20 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(Administrator); groups with view permissions: Set(); users with modify permissions: Set(Administrator); groups with modify permissions: Set() 18/02/22 18:05:20 INFO Utils: Successfully started service 'sparkDriver' on port 52071. 18/02/22 18:05:20 INFO SparkEnv: Registering MapOutputTracker 18/02/22 18:05:20 INFO SparkEnv: Registering BlockManagerMaster 18/02/22 18:05:20 INFO DiskBlockManager: Created local directory at C:\Users\Administrator\AppData\Local\Temp\2\blockmgr-d34bbc66-8a0c-42c1-a56d-371742fc3bb2 18/02/22 18:05:20 INFO MemoryStore: MemoryStore started with capacity 366.3 MB 18/02/22 18:05:21 INFO SparkEnv: Registering OutputCommitCoordinator 18/02/22 18:05:21 INFO Utils: Successfully started service 'SparkUI' on port 4040. 18/02/22 18:05:21 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://10.61.195.229:4040 18/02/22 18:05:21 INFO SparkContext: Added JAR file:/C:/Mobius-master/build/runtime/dependencies/spark-csv_2.10-1.4.0.jar at spark://10.61.195.229:52071/jars/spark-csv_2.10-1.4.0.jar with timestamp 1519283121402 18/02/22 18:05:21 INFO SparkContext: Added JAR file:/C:/Mobius-master/build/runtime/dependencies/commons-csV-1.4.jar at spark://10.61.195.229:52071/jars/commons-csV-1.4.jar with timestamp 1519283121402 18/02/22 18:05:21 INFO SparkContext: Added JAR file:/C:/Mobius-master/build/runtime/lib/spark-clr_2.11-2.0.200.jar at spark://10.61.195.229:52071/jars/spark-clr_2.11-2.0.200.jar with timestamp 1519283121402 18/02/22 18:05:21 INFO Executor: Starting executor ID driver on host localhost 18/02/22 18:05:21 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 52082. 18/02/22 18:05:21 INFO NettyBlockTransferService: Server created on 10.61.195.229:52082 18/02/22 18:05:21 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 10.61.195.229, 52082) 18/02/22 18:05:21 INFO BlockManagerMasterEndpoint: Registering block manager 10.61.195.229:52082 with 366.3 MB RAM, BlockManagerId(driver, 10.61.195.229, 52082) 18/02/22 18:05:21 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 10.61.195.229, 52082) 18/02/22 18:05:22 INFO CSharpBackendHandler: Connecting to a callback server at port 52084 18/02/22 18:05:22 INFO VerifiableProperties: Verifying properties 18/02/22 18:05:22 INFO VerifiableProperties: Property auto.offset.reset is overridden to smallest 18/02/22 18:05:22 INFO VerifiableProperties: Property group.id is overridden to 18/02/22 18:05:22 INFO VerifiableProperties: Property zookeeper.connect is overridden to 18/02/22 18:05:24 ERROR CSharpBackendHandler: Exception caught: java.io.IOException: An existing connection was forcibly closed by the remote host at sun.nio.ch.SocketDispatcher.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:192) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) at io.netty.buffer.UnpooledUnsafeDirectByteBuf.setBytes(UnpooledUnsafeDirectByteBuf.java:447) at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881) at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:242) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137) at java.lang.Thread.run(Thread.java:748) 18/02/22 18:05:24 INFO CSharpRunner: Closing CSharpBackend 18/02/22 18:05:24 INFO CSharpBackend: Requesting to close all call back sockets. 18/02/22 18:05:24 INFO CSharpRunner: Return CSharpBackend code -532462766 18/02/22 18:05:24 INFO Utils: Utils.exit() with status: -532462766, maxDelayMillis: 1000 18/02/22 18:05:24 INFO SparkContext: Invoking stop() from shutdown hook 18/02/22 18:05:24 INFO SparkUI: Stopped Spark web UI at http://10.61.195.229:4040 18/02/22 18:05:24 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 18/02/22 18:05:24 INFO MemoryStore: MemoryStore cleared 18/02/22 18:05:24 INFO BlockManager: BlockManager stopped 18/02/22 18:05:24 INFO BlockManagerMaster: BlockManagerMaster stopped 18/02/22 18:05:24 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 18/02/22 18:05:24 INFO SparkContext: Successfully stopped SparkContext 18/02/22 18:05:24 INFO ShutdownHookManager: Shutdown hook called 18/02/22 18:05:24 INFO ShutdownHookManager: Deleting directory C:\Users\Administrator\AppData\Local\Temp\2\spark-58986750-2822-439d-b47d-011fb35058a5

unruledboy commented 6 years ago

I modified the word count example not using the checkpoint due to it kept throwing error: java.lang.IllegalArgumentException: requirement failed: Spark Streaming cannot be initialized with both SparkContext and checkpoint as null

And, if I use pipe redirect the std output to a file, I can see extra error like this:

[sparkclr-submit.cmd] SPARKCLR_HOME is set to C:\Mobius-master\build\runtime\scripts.. [sparkclr-submit.cmd] SPARKCLR_JAR=spark-clr_2.11-2.0.200.jar [sparkclr-submit.cmd] LAUNCH_CLASSPATH="C:\Mobius-master\build\runtime\scripts..\lib\spark-clr_2.11-2.0.200.jar;C:\spark-2.0.2-bin-hadoop2.7\jars*" [sparkclr-submit.cmd] Command to run --verbose --name SparkClrKafka --jars C:\Mobius-master\build\runtime\dependencies\spark-csv_2.10-1.4.0.jar,C:\Mobius-master\build\runtime\dependencies\commons-csV-1.4.jar --class org.apache.spark.deploy.csharp.CSharpRunner C:\Mobius-master\build\runtime\lib\spark-clr_2.11-2.0.200.jar C:\Mobius-master\examples\Streaming\Kafka\bin\Debug\ C:\Mobius-master\examples\Streaming\Kafka\bin\Debug\SparkClrKafka.exe localhost:9092 test C:\Hadoop\CheckPoint C:\Output\WordCount [2018-02-22 18:05:19,808] [1] [INFO ] [Microsoft.Spark.CSharp.Configuration.ConfigurationService] - ConfigurationService runMode is LOCAL [2018-02-22 18:05:19,808] [1] [INFO ] [Microsoft.Spark.CSharp.Configuration.ConfigurationService+SparkCLRConfiguration] - CSharpBackend successfully read from environment variable CSHARPBACKEND_PORT [2018-02-22 18:05:19,808] [1] [INFO ] [Microsoft.Spark.CSharp.Proxy.Ipc.SparkCLRIpcProxy] - CSharpBackend port number to be used in JvMBridge is 52061 [2018-02-22 18:05:19,949] [1] [INFO ] [Microsoft.Spark.CSharp.Core.SparkConf] - Spark app name set to SparkCLRKafka Example [2018-02-22 18:05:22,074] [1] [INFO ] [Microsoft.Spark.CSharp.Proxy.Ipc.StreamingContextIpcProxy] - Callback server port number is 52084

Unhandled Exception: System.Runtime.Serialization.SerializationException: Type 'Microsoft.Spark.CSharp.Examples.SparkClrKafkaExample+<>cDisplayClass0_0' in Assembly 'SparkClrKafka, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null' is not marked as serializable. at System.Runtime.Serialization.FormatterServices.InternalGetSerializableMembers(RuntimeType type) at System.Runtime.Serialization.FormatterServices.<>c__DisplayClass9_0.b0(MemberHolder _) at System.Collections.Concurrent.ConcurrentDictionary2.GetOrAdd(TKey key, Func2 valueFactory) at System.Runtime.Serialization.FormatterServices.GetSerializableMembers(Type type, StreamingContext context) at System.Runtime.Serialization.Formatters.Binary.WriteObjectInfo.InitMemberInfo() at System.Runtime.Serialization.Formatters.Binary.WriteObjectInfo.InitSerialize(Object obj, ISurrogateSelector surrogateSelector, StreamingContext context, SerObjectInfoInit serObjectInfoInit, IFormatterConverter converter, ObjectWriter objectWriter, SerializationBinder binder) at System.Runtime.Serialization.Formatters.Binary.ObjectWriter.Write(WriteObjectInfo objectInfo, NameInfo memberNameInfo, NameInfo typeNameInfo) at System.Runtime.Serialization.Formatters.Binary.ObjectWriter.Serialize(Object graph, Header[] inHeaders, __BinaryWriter serWriter, Boolean fCheck) at System.Runtime.Serialization.Formatters.Binary.BinaryFormatter.Serialize(Stream serializationStream, Object graph, Header[] headers, Boolean fCheck) at Microsoft.Spark.CSharp.Streaming.DStream1.ForeachRDD(Action2 f) at Microsoft.Spark.CSharp.Streaming.DStream1.ForeachRDD(Action1 f) at Microsoft.Spark.CSharp.Examples.SparkClrKafkaExample.Main(String[] args) in C:\Users\wilson.chen\Downloads\Mobius-master\examples\Streaming\Kafka\Program.cs:line 58