dotnet / spark

.NET for Apache® Spark™ makes Apache Spark™ easily accessible to .NET developers.
MIT License
2.02k stars 315 forks source link

"Stream is closed" error, but program finishes executing correctly #249

Open bamurtaugh opened 5 years ago

bamurtaugh commented 5 years ago

I'm running a .NET Spark app for batch processing with a selection of GitHub projects data. My program runs as expected up through making a Spark Sql call:

// ...Code creating spark session, reading into data frame, doing some sorting...
// Then move on to creating UDF, SQL call:
spark.Udf().Register<string, bool>("MyUDF", (date) => DateTest(date));
DataFrame dateDf = spark.Sql("SELECT *, MyUDF(dateView.updated_at) AS datebefore FROM dateView");

After the Sql query, I perform a Filter() and Drop() for some final processing:

DataFrame filteredDates = dateDf.Filter(dateDf["datebefore"] == true);

After I Show() each of these modified DataFrames, I get the error: ProcessStream() failed with exception: System.ArgumentException: The stream is closed. However, even with that error, both calls to Show() do execute successfully and see the correct output.

Why might I be receiving this error (twice)? Since I get the correct output, the error doesn't seem to be affecting anything?

rapoth commented 5 years ago

@bamurtaugh Thank you for reporting! Could you share your complete app, spark-submit command you are using and the full C# code?

bamurtaugh commented 5 years ago

My app is called mySparkAppAdv. The spark-submit command I'm using is:

spark-submit --class org.apache.spark.deploy.dotnet.DotnetRunner --master local bin\Debug\netcoreapp2.2\microsoft-spark-2.4.x-0.4.0.jar dotnet bin\Debug\netcoreapp2.2\mySparkAppAdv.dll

The app itself is:

using System;
using Microsoft.Spark.Sql;
using static Microsoft.Spark.Sql.Functions;

namespace mySparkAppAdv
    class Program
        static void Main(string[] args)
            SparkSession spark = SparkSession.Builder().AppName("GitHub and Spark Batch").GetOrCreate();

            DataFrame projectsDf = spark.Read()
                    .Schema("id INT, url STRING, owner_id INT, name STRING, descriptor STRING, language STRING, created_at STRING, forked_from INT, deleted STRING, updated_at STRING")

            // Drop any rows with NA values
            DataFrameNaFunctions dropEmptyProjects = projectsDf.Na();
            DataFrame cleanedProjects = dropEmptyProjects.Drop("any");

            // Remove unnecessary columns
            cleanedProjects = cleanedProjects.Drop("id", "url", "owner_id");

            // Get the average number of times each language has been forked
            DataFrame groupedDF = cleanedProjects.GroupBy("language").Agg(Avg(cleanedProjects["forked_from"]));

            // Sort by most forked languages first

            // Find projects updated since 10/20/15
            spark.Udf().Register<string, bool>("MyUDF", (date) => DateTest(date));
            DataFrame dateDf = spark.Sql("SELECT *, MyUDF(dateView.updated_at) AS datebefore FROM dateView");

            // Only keep more recently updated projects
            DataFrame filteredDates = dateDf.Filter(dateDf["datebefore"] == true);


        public static bool DateTest(string date)
            //  Remove invalid dates to avoid: System.FormatException: String '0000-00-00 00:00:00' was not recognized as a valid DateTime
            if (date.Equals("0000-00-00 00:00:00"))
                return false;

            DateTime convertedDate = Convert.ToDateTime(date);

            // 10/20/2015 
            DateTime referenceDate = new DateTime(2015, 10, 20);

            // > 0 means convertedDate (input from file) is later than 10/20/15
            if (DateTime.Compare(convertedDate, referenceDate) > 0)
                return true;
                return false;


After most of my program executes/produces output as expected, I get the following output after showing the grouped Dataframe (please note that the Dataframe appears correctly in my output console, just not when I pasted it as a comment here):

[2019-09-27T22:29:30.3677119Z] [MININT-M0C5UII] [Debug] [ConfigurationService] Using the environment variable to construct .NET worker path: C:\bin\Microsoft.Spark.Worker-0.4.0\Microsoft.Spark.Worker.exe.
DotnetWorker PID:[18248] Args:[-m pyspark.worker] SparkVersion:[2.4.1]
[2019-09-27T22:29:31.5888846Z] [MININT-M0C5UII] [Info] [SimpleWorker] RunSimpleWorker() is starting with port = 53308.
[2019-09-27T22:29:31.6375263Z] [MININT-M0C5UII] [Info] [TaskRunner] [0] Starting with ReuseSocket[False].
DotnetWorker PID:[12344] Args:[-m pyspark.worker] SparkVersion:[2.4.1]
[2019-09-27T22:29:31.8081895Z] [MININT-M0C5UII] [Info] [SimpleWorker] RunSimpleWorker() is starting with port = 53310.
[2019-09-27T22:29:31.8605830Z] [MININT-M0C5UII] [Info] [TaskRunner] [0] Starting with ReuseSocket[False].
|                name|          descriptor|  language|      created_at|forked_from|deleted|      updated_at|datebefore|
|           ruote-kit|RESTful wrapper f...|      Ruby| 12/8/2009 10:17|          2|      0|  11/5/2015 1:15|      true|
|           cocos2d-x|Port of cocos2d-i...|       C++| 3/12/2012 16:48|          6|      0|10/22/2015 17:36|      true|
|           cocos2d-x|Port of cocos2d-i...|         C| 4/23/2012 10:20|          6|      0| 11/1/2015 17:32|      true|
|       rake-compiler|Provide a standar...|      Ruby|  8/1/2012 18:33|   14556189|      0| 11/3/2015 19:30|      true|
|    cobertura-plugin|Jenkins cobertura...|      Java| 7/26/2012 18:46|     193522|      0| 11/1/2015 19:55|      true|
|     scala-vs-erlang|A performance tes...|    Erlang|12/25/2011 13:51|    1262879|      0| 10/22/2015 4:50|      true|
|              opencv|OpenCV GitHub Mirror|       C++|  8/2/2012 12:50|         29|      0| 10/26/2015 6:44|      true|
| redmine_git_hosting|A ChiliProject/Re...|      Ruby| 7/30/2012 12:53|         42|      0|10/28/2015 10:54|      true|
| redmine_git_hosting|A ChiliProject/Re...|      Ruby|10/26/2011 23:17|     207450|      0|10/27/2015 22:43|      true|
|      protobuf-cmake|CMake build suppo...|        \N|  8/2/2012 14:35|         61|      0| 10/31/2015 1:22|      true|
|willdurand.github...|        My new blog!|JavaScript|  8/2/2012 12:06|         84|      0|  11/4/2015 9:15|      true|
|               libuv|platform layer fo...|         C|  8/2/2012 12:57|         74|      0| 10/31/2015 8:21|      true|
|         cucumber-js|Pure Javascript C...|JavaScript| 6/21/2012 11:47|         96|      0| 11/4/2015 21:42|      true|
|             i18n-js|It's a small libr...|JavaScript|  8/2/2012 12:58|         92|      0|10/29/2015 13:19|      true|
|               libuv|platform layer fo...|         C|  7/9/2012 12:56|         74|      0|10/31/2015 20:53|      true|
|        angular-seed|Seed project for ...|JavaScript|  5/19/2012 6:30|        100|      0| 11/2/2015 14:21|      true|
|           addon-sdk|The Add-on SDK re...|    Python|11/29/2010 13:17|        127|      0|10/31/2015 20:10|      true|
|   xinput_calibrator|A generic touchsc...|       C++|  8/2/2012 12:59|        128|      0| 11/1/2015 14:27|      true|
|           addon-sdk|The Add-on SDK re...|JavaScript|  4/7/2011 17:58|        127|      0| 11/6/2015 11:03|      true|
|                ntpl|                ntpl|    Python|  8/1/2012 16:50|        134|      0|10/25/2015 16:03|      true|
only showing top 20 rows

[2019-09-27T22:29:34.7433042Z] [MININT-M0C5UII] [Error] [TaskRunner] [0] ProcessStream() failed with exception: System.ArgumentException: The stream is closed.
   at Microsoft.Spark.Utils.PythonSerDe.GetUnpickledObjects(Stream stream, Int32 messageLength) in /_/src/csharp/Microsoft.Spark/Utils/PythonSerDe.cs:line 49
   at Microsoft.Spark.Worker.Command.PicklingSqlCommandExecutor.ExecuteCore(Stream inputStream, Stream outputStream, SqlCommand[] commands) in D:\a\1\s\src\csharp\Microsoft.Spark.Worker\Command\SqlCommandExecutor.cs:line 133
   at Microsoft.Spark.Worker.TaskRunner.ProcessStream(Stream inputStream, Stream outputStream, Version version, Boolean& readComplete) in D:\a\1\s\src\csharp\Microsoft.Spark.Worker\TaskRunner.cs:line 153
[2019-09-27T22:29:34.7434683Z] [MININT-M0C5UII] [Error] [TaskRunner] [0] ProcessStream() failed with exception: System.ArgumentException: The stream is closed.
   at Microsoft.Spark.Utils.PythonSerDe.GetUnpickledObjects(Stream stream, Int32 messageLength) in /_/src/csharp/Microsoft.Spark/Utils/PythonSerDe.cs:line 49
   at Microsoft.Spark.Worker.Command.PicklingSqlCommandExecutor.ExecuteCore(Stream inputStream, Stream outputStream, SqlCommand[] commands) in D:\a\1\s\src\csharp\Microsoft.Spark.Worker\Command\SqlCommandExecutor.cs:line 133
   at Microsoft.Spark.Worker.TaskRunner.ProcessStream(Stream inputStream, Stream outputStream, Version version, Boolean& readComplete) in D:\a\1\s\src\csharp\Microsoft.Spark.Worker\TaskRunner.cs:line 153
[2019-09-27T22:29:34.7438227Z] [MININT-M0C5UII] [Error] [TaskRunner] [0] Exiting with exception: System.ArgumentException: The stream is closed.
   at Microsoft.Spark.Utils.PythonSerDe.GetUnpickledObjects(Stream stream, Int32 messageLength) in /_/src/csharp/Microsoft.Spark/Utils/PythonSerDe.cs:line 49
   at Microsoft.Spark.Worker.Command.PicklingSqlCommandExecutor.ExecuteCore(Stream inputStream, Stream outputStream, SqlCommand[] commands) in D:\a\1\s\src\csharp\Microsoft.Spark.Worker\Command\SqlCommandExecutor.cs:line 133
   at Microsoft.Spark.Worker.TaskRunner.ProcessStream(Stream inputStream, Stream outputStream, Version version, Boolean& readComplete) in D:\a\1\s\src\csharp\Microsoft.Spark.Worker\TaskRunner.cs:line 153
   at Microsoft.Spark.Worker.TaskRunner.Run() in D:\a\1\s\src\csharp\Microsoft.Spark.Worker\TaskRunner.cs:line 65
[2019-09-27T22:29:34.7447989Z] [MININT-M0C5UII] [Info] [TaskRunner] [0] Finished running 0 task(s).
[2019-09-27T22:29:34.7448496Z] [MININT-M0C5UII] [Info] [SimpleWorker] Ru[2019-09-27T22:29:34.7436884Z] [MININT-M0C5UII] [Error] [TaskRunner] [0] Exiting with exception: System.ArgumentException: The stream is closed.
   at Microsoft.Spark.Utils.PythonSerDe.GetUnpickledObjects(Stream stream, Int32 messageLength) in /_/src/csharp/Microsoft.Spark/Utils/PythonSerDe.cs:line 49
   at Microsoft.Spark.Worker.Command.PicklingSqlCommandExecutor.ExecuteCore(Stream inputStream, Stream outputStream, SqlCommand[] commands) in D:\a\1\s\src\csharp\Microsoft.Spark.Worker\Command\SqlCommandExecutor.cs:line 133
   at Microsoft.Spark.Worker.TaskRunner.ProcessStream(Stream inputStream, Stream outputStream, Version version, Boolean& readComplete) in D:\a\1\s\src\csharp\Microsoft.Spark.Worker\TaskRunner.cs:line 153
   at Microsoft.Spark.Worker.TaskRunner.Run() in D:\a\1\s\src\csharp\Microsoft.Spark.Worker\TaskRunner.cs:line 65
[2019-09-27T22:29:34.7447995Z] [MININT-M0C5UII] [Info] [TaskRunner] [0] Finished running 0 task(s).
[2019-09-27T22:29:34.7448511Z] [MININT-M0C5UII] [Info] [SimpleWorker] RunSimpleWorker() finished successfully
elvaliuliuliu commented 5 years ago

From what I see is that only when you run filteredDates.Show(); the error message will show up. If you comment this out, there will be no error.

bamurtaugh commented 5 years ago

That's interesting to observe. Do you have any idea why just showing this DataFrame in the same Spark Session would cause this error? I wonder if it could have something to do the stream closing/getting confused after the SQL call concludes.

elvaliuliuliu commented 5 years ago

Not sure, but I don't think show would cause this error. Could be Udf related issue.

luciantimar commented 3 years ago


I got a similar issue with Spark .Net 1.0 but the issue is only when I am using Udf functions. The Show method shows the correct results.

Is it something that I should worry about? or is just some cleaning exceptions because the log is showing that the "RunSimpleWorker() finished successfully" and also "DotnetRunner: .NET application exited successfully".

Java version 8 Spark 2.4.1 or Spark 3.0 .Net Core 3.1 Microsoft.Spark.Worker-1.0.0

The udf functions are

            Func<Column, Column> toTimeSpan = Functions.Udf<string, double>(str =>
                TimeSpan ts;
                if (TimeSpan.TryParse(str, out ts))
                    return ts.TotalSeconds;
                return 0;

            Func<Column, Column, Column> toDuration = Functions.Udf<string, string, double>((str1, str2) =>
                TimeSpan ts1;
                TimeSpan ts2;
                if (TimeSpan.TryParse(str1, out ts1) && TimeSpan.TryParse(str2, out ts2))
                    return ts2.TotalSeconds - ts1.TotalSeconds;
                return 0;


  DataFrame df1 = dataFrame
                .WithColumn("Start", toTimeSpan(dataFrame.Col("SessionStartTime")))
                .WithColumn("End", toTimeSpan(dataFrame.Col("SessionEndTime")))
                .WithColumn("Duration",toDuration(Functions.Col("SessionStartTime"), Functions.Col("SessionEndTime")))

|1010.0|60.0|     4|         98|       28|        00:07:00|      00:15:00|-111|         3|420.0| 900.0|   480.0|5000|5000|
only showing the top 20 rows

[2020-11-13T09:03:06.7403736Z] [N-20N3PF1C0NK5] [Error] [TaskRunner] [0] ProcessStream() failed with exception: System.IO.IOException: Unable to write data to the transport connection: An established connection was aborted by the software in your host machine..
 ---> System.Net.Sockets.SocketException (10053): An established connection was aborted by the software in your host machine.
   at System.Net.Sockets.NetworkStream.Write(Byte[] buffer, Int32 offset, Int32 size)
   --- End of inner exception stack trace ---
   at System.Net.Sockets.NetworkStream.Write(Byte[] buffer, Int32 offset, Int32 size)
   at System.IO.BufferedStream.Write(Byte[] array, Int32 offset, Int32 count)
   at Microsoft.Spark.Interop.Ipc.SerDe.Write(Stream s, Byte[] value, Int32 count) in /_/src/csharp/Microsoft.Spark/Interop/Ipc/SerDe.cs:line 283
   at Microsoft.Spark.Worker.Command.PicklingSqlCommandExecutor.WriteOutput(Stream stream, IEnumerable`1 rows, Int32 sizeHint) in D:\a\1\s\src\csharp\Microsoft.Spark.Worker\Command\SqlCommandExecutor.cs:line 191
   at Microsoft.Spark.Worker.Command.PicklingSqlCommandExecutor.ExecuteCore(Stream inputStream, Stream outputStream, SqlCommand[] commands) in D:\a\1\s\src\csharp\Microsoft.Spark.Worker\Command\SqlCommandExecutor.cs:line 158
   at Microsoft.Spark.Worker.Command.SqlCommandExecutor.Execute(Version version, Stream inputStream, Stream outputStream, PythonEvalType evalType, SqlCommand[] commands) in D:\a\1\s\src\csharp\Microsoft.Spark.Worker\Command\SqlCommandExecutor.cs:line 76
   at Microsoft.Spark.Worker.Command.CommandExecutor.Execute(Stream inputStream, Stream outputStream, Int32 splitIndex, CommandPayload commandPayload) in D:\a\1\s\src\csharp\Microsoft.Spark.Worker\Command\CommandExecutor.cs:line 65
   at Microsoft.Spark.Worker.TaskRunner.ProcessStream(Stream inputStream, Stream outputStream, Version version, Boolean& readComplete) in D:\a\1\s\src\csharp\Microsoft.Spark.Worker\TaskRunner.cs:line 154
[2020-11-13T09:03:06.7410655Z] [N-20N3PF1C0NK5] [Error] [TaskRunner] [0] Exitin20/11/13 11:03:06 INFO SparkUI: Stopped Spark web UI at
g with exception: System.IO.IOException: Unable to write data to the transport connection: An established connection was aborted by the software in your host machine..
 ---> System.Net.Sockets.SocketException (10053): An established connection was aborted by the software in your host machine.
   at System.Net.Sockets.NetworkStream.Write(Byte[] buffer, Int32 offset, Int32 size)
   --- End of inner exception stack trace ---
   at System.Net.Sockets.NetworkStream.Write(Byte[] buffer, Int32 offset, Int32 size)
   at System.IO.BufferedStream.Write(Byte[] array, Int32 offset, Int32 count)
   at Microsoft.Spark.Interop.Ipc.SerDe.Write(Stream s, Byte[] value, Int32 count) in /_/src/csharp/Microsoft.Spark/Interop/Ipc/SerDe.cs:line 283
   at Microsoft.Spark.Worker.Command.PicklingSqlCommandExecutor.WriteOutput(Stream stream, IEnumerable`1 rows, Int32 sizeHint) in D:\a\1\s\src\csharp\Microsoft.Spark.Worker\Command\SqlCommandExecutor.cs:line 191
   at Microsoft.Spark.Worker.Command.PicklingSqlCommandExecutor.ExecuteCore(Stream inputStream, Stream outputStream, SqlCommand[] commands) in D:\a\1\s\src\csharp\Microsoft.Spark.Worker\Command\SqlCommandExecutor.cs:line 158
   at Microsoft.Spark.Worker.Command.SqlCommandExecutor.Execute(Version version, Stream inputStream, Stream outputStream, PythonEvalType evalType, SqlCommand[] commands) in D:\a\1\s\src\csharp\Microsoft.Spark.Worker\Command\SqlCommandExecutor.cs:line 76
   at Microsoft.Spark.Worker.Command.CommandExecutor.Execute(Stream inputStream, Stream outputStream, Int32 splitIndex, CommandPayload commandPayload) in D:\a\1\s\src\csharp\Microsoft.Spark.Worker\Command\CommandExecutor.cs:line 65
   at Microsoft.Spark.Worker.TaskRunner.ProcessStream(Stream inputStream, Stream outputStream, Version version, Boolean& readComplete) in D:\a\1\s\src\csharp\Microsoft.Spark.Worker\TaskRunner.cs:line 154
   at Microsoft.Spark.Worker.TaskRunner.Run() in D:\a\1\s\src\csharp\Microsoft.Spark.Worker\TaskRunner.cs:line 66
[2020-11-13T09:03:06.7448127Z] [N-20N3PF1C0NK5] [Warn] [TaskRunner] [0] Exception while closing socket: System.IO.IOException: Unable to write data to the transport connection: An established connection was aborted by the software in your host machine..
 ---> System.Net.Sockets.SocketException (10053): An established connection was aborted by the software in your host machine.
   at System.Net.Sockets.NetworkStream.Write(Byte[] buffer, Int32 offset, Int32 size)
   --- End of inner exception stack trace ---
   at System.Net.Sockets.NetworkStream.Write(Byte[] buffer, Int32 offset, Int32 size)
   at System.IO.BufferedStream.Flush()
   at System.IO.BufferedStream.Dispose(Boolean disposing)
   at System.IO.Stream.Close()
   at System.IO.Stream.Dispose()
   at Microsoft.Spark.Network.DefaultSocketWrapper.Dispose() in /_/src/csharp/Microsoft.Spark/Network/DefaultSocketWrapper.cs:line 56
   at Microsoft.Spark.Worker.TaskRunner.Run() in D:\a\1\s\src\csharp\Microsoft.Spark.Worker\TaskRunner.cs:line 115
[2020-11-13T09:03:06.7449393Z] [N-20N3PF1C0NK5] [Info] [TaskRunner] [0] Finished running 0 task(s).
[2020-11-13T09:03:06.7449660Z] [N-20N3PF1C0NK5] [Info] [SimpleWorker] RunSimpleWorker() finished successfully