microsoft / Mobius

C# and F# language binding and extensions to Apache Spark
MIT License
941 stars 213 forks source link

RDD<string>.SaveAsTextFile yields a SerializationException #643

Open PerthCharern opened 7 years ago

PerthCharern commented 7 years ago

Context:

I'm trying to write the RDD as text file on Azure Blob (wasb).

My code looks similar to this:

                    var stream = EventHubsUtils.CreateUnionStream( ssc, eventhubsParams.Select( v => new Tuple<string, string>( v.Key, v.Value ) ) );

                    DStream<JToken> timestampEntries = stream
                        .Map( timestamp => Encoding.UTF8.GetString( timestamp ) )
                        .Map( timestamp => JToken.Parse( timestamp ) )
                        .Map( jToken =>
                            string.Join( ",",
                                jToken["xxx"],
                                jToken["yyy"],
                                jToken["zzz"] ) );

                    timestampEntries.ForeachRDD(
                        rdd =>
                        {
                            rdd.SaveAsTextFile( $"{outputPath}/output" );
                        });

Exception:

I'm getting a serialization error due to some anonymous function made by the SaveAsTextFile call (I think). The LatencyEntityGenerator+<>c__DisplayClass0_0 below is that anonymous function.

Unhandled Exception:
System.Runtime.Serialization.SerializationException: Type 'Microsoft.Spark.CSharp.Examples.LatencyEntityGenerator+<>c__DisplayClass0_0' in Assembly 'LatencyEntityGenerator, Version=0.0.0.0, Culture=neutral, PublicKeyToken=null' is not marked as serializable.
  at System.Runtime.Serialization.FormatterServices.InternalGetSerializableMembers (System.RuntimeType type) <0x7f71ed020f70 + 0x00401> in <filename unknown>:0
  at System.Runtime.Serialization.FormatterServices.GetSerializableMembers (System.Type type, StreamingContext context) <0x7f71ed021770 + 0x001cb> in <filename unknown>:0
  at System.Runtime.Serialization.Formatters.Binary.WriteObjectInfo.InitMemberInfo () <0x7f71ed010e50 + 0x000e9> in <filename unknown>:0
  at System.Runtime.Serialization.Formatters.Binary.WriteObjectInfo.InitSerialize (System.Object obj, ISurrogateSelector surrogateSelector, StreamingContext context, System.Runtime.Serialization.Formatters.Binary.SerObjectInfoInit serObjectInfoInit, IFormatterConverter converter, System.Runtime.Serialization.Formatters.Binary.ObjectWriter objectWriter, System.Runtime.Serialization.SerializationBinder binder) <0x7f71ed010170 + 0x0040a> in <filename unknown>:0
  at System.Runtime.Serialization.Formatters.Binary.WriteObjectInfo.Serialize (System.Object obj, ISurrogateSelector surrogateSelector, StreamingContext context, System.Runtime.Serialization.Formatters.Binary.SerObjectInfoInit serObjectInfoInit, IFormatterConverter converter, System.Runtime.Serialization.Formatters.Binary.ObjectWriter objectWriter, System.Runtime.Serialization.SerializationBinder binder) <0x7f71ed010100 + 0x00064> in <filename unknown>:0
  at System.Runtime.Serialization.Formatters.Binary.ObjectWriter.Write (System.Runtime.Serialization.Formatters.Binary.WriteObjectInfo objectInfo, System.Runtime.Serialization.Formatters.Binary.NameInfo memberNameInfo, System.Runtime.Serialization.Formatters.Binary.NameInfo typeNameInfo) <0x7f71ed017cf0 + 0x00277> in <filename unknown>:0
  at System.Runtime.Serialization.Formatters.Binary.ObjectWriter.Serialize (System.Object graph, System.Runtime.Remoting.Messaging.Header[] inHeaders, System.Runtime.Serialization.Formatters.Binary.__BinaryWriter serWriter, Boolean fCheck) <0x7f71ed016ac0 + 0x005fb> in <filename unknown>:0
  at System.Runtime.Serialization.Formatters.Binary.BinaryFormatter.Serialize (System.IO.Stream serializationStream, System.Object graph, System.Runtime.Remoting.Messaging.Header[] headers, Boolean fCheck) <0x7f71ed00d5d0 + 0x0012e> in <filename unknown>:0
  at System.Runtime.Serialization.Formatters.Binary.BinaryFormatter.Serialize (System.IO.Stream serializationStream, System.Object graph, System.Runtime.Remoting.Messaging.Header[] headers) <0x7f71ed00d5a0 + 0x00021> in <filename unknown>:0
  at System.Runtime.Serialization.Formatters.Binary.BinaryFormatter.Serialize (System.IO.Stream serializationStream, System.Object graph) <0x7f71ed00d580 + 0x00018> in <filename unknown>:0
  at Microsoft.Spark.CSharp.Streaming.DStream`1[T].ForeachRDD (System.Action`2 f) <0x4156cf70 + 0x00085> in <filename unknown>:0
  at Microsoft.Spark.CSharp.Streaming.DStream`1[T].ForeachRDD (System.Action`1 f) <0x4156cdc0 + 0x00113> in <filename unknown>:0
  at Microsoft.Spark.CSharp.Examples.LatencyEntityGenerator+<>c__DisplayClass0_0.<Main>b__0 () <0x41566dc0 + 0x005b3> in <filename unknown>:0
  at Microsoft.Spark.CSharp.Streaming.StreamingContext.GetOrCreate (System.String checkpointPath, System.Func`1 creatingFunc) <0x41566950 + 0x00038> in <filename unknown>:0
  at Microsoft.Spark.CSharp.Examples.LatencyEntityGenerator.Main (System.String[] args) <0x41528d60 + 0x00433> in <filename unknown>:0
[ERROR] FATAL UNHANDLED EXCEPTION: System.Runtime.Serialization.SerializationException: Type 'Microsoft.Spark.CSharp.Examples.LatencyEntityGenerator+<>c__DisplayClass0_0' in Assembly 'LatencyEntityGenerator, Version=0.0.0.0, Culture=neutral, PublicKeyToken=null' is not marked as serializable.
  at System.Runtime.Serialization.FormatterServices.InternalGetSerializableMembers (System.RuntimeType type) <0x7f71ed020f70 + 0x00401> in <filename unknown>:0
  at System.Runtime.Serialization.FormatterServices.GetSerializableMembers (System.Type type, StreamingContext context) <0x7f71ed021770 + 0x001cb> in <filename unknown>:0
  at System.Runtime.Serialization.Formatters.Binary.WriteObjectInfo.InitMemberInfo () <0x7f71ed010e50 + 0x000e9> in <filename unknown>:0
  at System.Runtime.Serialization.Formatters.Binary.WriteObjectInfo.InitSerialize (System.Object obj, ISurrogateSelector surrogateSelector, StreamingContext context, System.Runtime.Serialization.Formatters.Binary.SerObjectInfoInit serObjectInfoInit, IFormatterConverter converter, System.Runtime.Serialization.Formatters.Binary.ObjectWriter objectWriter, System.Runtime.Serialization.SerializationBinder binder) <0x7f71ed010170 + 0x0040a> in <filename unknown>:0
  at System.Runtime.Serialization.Formatters.Binary.WriteObjectInfo.Serialize (System.Object obj, ISurrogateSelector surrogateSelector, StreamingContext context, System.Runtime.Serialization.Formatters.Binary.SerObjectInfoInit serObjectInfoInit, IFormatterConverter converter, System.Runtime.Serialization.Formatters.Binary.ObjectWriter objectWriter, System.Runtime.Serialization.SerializationBinder binder) <0x7f71ed010100 + 0x00064> in <filename unknown>:0
  at System.Runtime.Serialization.Formatters.Binary.ObjectWriter.Write (System.Runtime.Serialization.Formatters.Binary.WriteObjectInfo objectInfo, System.Runtime.Serialization.Formatters.Binary.NameInfo memberNameInfo, System.Runtime.Serialization.Formatters.Binary.NameInfo typeNameInfo) <0x7f71ed017cf0 + 0x00277> in <filename unknown>:0
  at System.Runtime.Serialization.Formatters.Binary.ObjectWriter.Serialize (System.Object graph, System.Runtime.Remoting.Messaging.Header[] inHeaders, System.Runtime.Serialization.Formatters.Binary.__BinaryWriter serWriter, Boolean fCheck) <0x7f71ed016ac0 + 0x005fb> in <filename unknown>:0
  at System.Runtime.Serialization.Formatters.Binary.BinaryFormatter.Serialize (System.IO.Stream serializationStream, System.Object graph, System.Runtime.Remoting.Messaging.Header[] headers, Boolean fCheck) <0x7f71ed00d5d0 + 0x0012e> in <filename unknown>:0
  at System.Runtime.Serialization.Formatters.Binary.BinaryFormatter.Serialize (System.IO.Stream serializationStream, System.Object graph, System.Runtime.Remoting.Messaging.Header[] headers) <0x7f71ed00d5a0 + 0x00021> in <filename unknown>:0
  at System.Runtime.Serialization.Formatters.Binary.BinaryFormatter.Serialize (System.IO.Stream serializationStream, System.Object graph) <0x7f71ed00d580 + 0x00018> in <filename unknown>:0
  at Microsoft.Spark.CSharp.Streaming.DStream`1[T].ForeachRDD (System.Action`2 f) <0x4156cf70 + 0x00085> in <filename unknown>:0
  at Microsoft.Spark.CSharp.Streaming.DStream`1[T].ForeachRDD (System.Action`1 f) <0x4156cdc0 + 0x00113> in <filename unknown>:0
  at Microsoft.Spark.CSharp.Examples.LatencyEntityGenerator+<>c__DisplayClass0_0.<Main>b__0 () <0x41566dc0 + 0x005b3> in <filename unknown>:0
  at Microsoft.Spark.CSharp.Streaming.StreamingContext.GetOrCreate (System.String checkpointPath, System.Func`1 creatingFunc) <0x41566950 + 0x00038> in <filename unknown>:0
  at Microsoft.Spark.CSharp.Examples.LatencyEntityGenerator.Main (System.String[] args) <0x41528d60 + 0x00433> in <filename unknown>:0
skaarthik commented 7 years ago

What is LatencyEntityGenerator? Is it the class that has the main method you are running?

PerthCharern commented 7 years ago

Yes. The class that is being complained about regarding non-serializability is NOT that class though.

The DisplayClass0_0 suffix suggests that this is some anonymous function being called.

PerthCharern commented 7 years ago

Also, just to give the complete picture:

The code will work fine if instead of SaveAsTextFile, we call .Collect() and simply loop over to print the output is console.

This is why I believe the anonymous function is called somewhere inside SaveAsTextFile.

skaarthik commented 7 years ago

I think what you pass to ForeachRDD is the anonymous method that is not marked serializable in compiler-generated class. Try moving that to a method in a [serializable] class and use that method when calling ForeachRDD. That might fix the issue.

Use PiHelper from examples as a reference.

PerthCharern commented 7 years ago

@skaarthik So I removed all the processing from the Map function (apart from Encoding.UTF8.GetString which is needed to get from byte[] to string) and I'm still getting the same error. Basically, here's my code right now:

                    var stream = EventHubsUtils.CreateUnionStream( ssc, eventhubsParams.Select( v => new Tuple<string, string>( v.Key, v.Value ) ) );
                    DStream<string> timestampEntries = stream
                        .Map( timestamp => Encoding.UTF8.GetString( timestamp ) )

                    timestampEntries.ForeachRDD(
                        rdd =>
                        {
                            rdd.SaveAsTextFile( $"{outputPath}/output" );
                        });

If I do this instead of SaveAsTextFile, everything works ok.

                    timestampEntries.ForeachRDD(
                        rdd =>
                        {
                            foreach ( string timestamp in rdd.Collect() )
                            {
                                Console.WriteLine(timestamp);
                            }

                            //rdd.SaveAsTextFile( $"{outputPath}/output" );
                        });

Does this mean that it's either something in SaveAsTextFile or Encoding.UTF8.GetString that's not serializable? I am a little unclear on how to verify that at the moment, but I'll keep looking...

skaarthik commented 7 years ago

Did you try creating a non-anonymous method to use with Map method and in ForEachRDD methods?

unruledboy commented 6 years ago

Exact same problem here, I call and serialize like the following as you advised in the other issue according to the Pi example.

I looks like any .net function call in ForeachRDD regardless wrapped in serializable class or not will result in this error. Any idea?

countByLogLevelAndTime.ForeachRDD(countByLogLevel =>
                {
                    //countByLogLevel.SaveAsTextFile(string.Format("{0}/{1}", appOutputPath, Guid.NewGuid()));
                    foreach (var logCount in countByLogLevel.Collect())
                    {
                        new Saver().Save(appOutputPath, logCount);
                        Console.WriteLine($"detailed log:{logCount}");
                    }
                });

        [Serializable]
        private class Saver
        {
            public void Save(string path, string log)
            {
                //Console.WriteLine(string.Format("{0}\\{1}"));
                File.WriteAllText(string.Format("{0}\\{1}", path, Guid.NewGuid()), log);
            }
        }
purefunkce commented 6 years ago

I am having same issues. Using mono5 on linux.