dotnet / spark

.NET for Apache® Spark™ makes Apache Spark™ easily accessible to .NET developers.
https://dot.net/spark
MIT License
2.02k stars 312 forks source link

[FEATURE REQUEST]: Consider adding support for common RDD operations #284

Open joperezr opened 4 years ago

joperezr commented 4 years ago

I know that this issue has been closed as won't fix in the past (#101), but I just wanted to add some roadblocks that I've been hitting while porting some existing scala code into Spark.NET. For my scenario, we have some data input that we don't have a control off, and in order to use it for our processing, we need to make some transformations into the data (using map/reduce operations). We can't really persist these transformed version of the data, because it changes constantly, so we have to re-transform it again. In Scala we are able to make this transformations using the convenient RDD Apis that scala/spark provide like map/reduce, which are very fast. To give some context, it takes about 20 seconds to make these transformations on the input data (which is a 150MB file). Code also looks very simple, like:

var myDataFrame
      .rdd
      .map(r=>(((r.getInt(0), r.getInt(1)), (r.getInt(2), r.getString(3), r.getString(4)))))
      .reduceByKey((r1, r2)=>if(r1._1>r2._1) r2 else r1)
      .map(r=>(r._1._1, TreeMap(r._1._2->r._2)))
      .reduceByKey(_ ++ _)
      .collectAsMap

So without too much detail, what this is doing is transforming the data by merging the first two columns, and then reducing to get all the rows that have similar first two columns, and do some calculations on them, and then doing another grouping with more calculations. As I said this takes about 20 seconds in Scala. In order to get the exact same result in Spark.NET is much more complicated, as column transformation can't easily be done without using a UDF (as opposed to just calling map) and similar for the reduce operations, we require complex UDF logic using process-to-process comunication and Apache Arrow in order to get the same output. Because of the way that UDFs run in Spark.NET (meaning that Spark.NET code calls into Java, which spins off a worker process, which reflects into the UDF to finally execute it) these transformations are super costly. In the end, it takes about 20 minutes to run the same transformations when using Spark.NET.

Even if we don't want to bring all of the RDD support, could we consider adding map/reduce operations that could just run in proc on the Java side so we avoid all of the process-to-process overhead causing these operations to be super expensive? I also understand that this might not be trivial at all, given that what you pass in to the transform operations are lambdas, which we don't really have a good way of passing using our JVM Bridge, but have we thought of enabling scenarios like this one?

imback82 commented 4 years ago

Can you paste the .NET code that takes 20 minutes?

Even if we don't want to bring all of the RDD support, could we consider adding map/reduce operations that could just run in proc on the Java side so we avoid all of the process-to-process overhead causing these operations to be super expensive?

If you define a lambda for map/reduce in C#, it needs to run on CLR. You can take a look at how basic RDD (internal) is done in this repo. If you want to run everything on Java side, you can take a look at Sql.Functions if you can leverage any of those.

joperezr commented 4 years ago

Sure, here is the .NET code performing the same transformation than the above scala code:

            // In scala, this is .map(r=>(((r.getInt(0), r.getInt(1)), (r.getInt(2), r.getString(3), r.getString(4)))))
            Func<Column, Column, Column> MergeTwoColumns = Udf<int?, int?, string>((keyOctet, maxThirdOctet) => JsonSerializer.Serialize(new[] { keyOctet.Value, maxThirdOctet.Value }));
            Func<Column, Column, Column, Column> MergeThreeColumns = Udf<int?, string, string, string>((minThirdOctet, countryIsoCode, cityName) => JsonSerializer.Serialize(new[] { minThirdOctet.Value.ToString(), countryIsoCode, cityName }));
            DataFrame geoLookup = geoLookupTemp
                .Select(MergeTwoColumns(geoLookupTemp["KeyOctet"], geoLookupTemp["MaxThirdOctet"]).As("_1"), MergeThreeColumns(geoLookupTemp["MinThirdOctet"], geoLookupTemp["CountryIsoCode"], geoLookupTemp["CityName"]).As("_2"));

            // In scala this is .reduceByKey((r1, r2)=>if(r1._1>r2._1) r2 else r1)
            DataFrame groupedGeoLookup = geoLookup
                .GroupBy(geoLookup["_1"])
                .Apply(
                    new StructType(new[]
                    {
                        new StructField("_1", new StringType()),
                        new StructField("_2", new StringType())
                    }),
                    r => KeepMinimumValueForKey(r, "_1", "_2"));

            // In scala this is .map(r=>(r._1._1, TreeMap(r._1._2->r._2)))
            Func<Column, Column> CreateDictionaryKey = Udf<string, int>((str) => JsonSerializer.Deserialize<int[]>(str)[0]);
            Func<Column, Column, Column> CreateDictionaryValue = Udf<string, string, string>((key, value) => JsonSerializer.Serialize(new[] { JsonSerializer.Deserialize<int[]>(key)[1].ToString(), value }));
            DataFrame TreeMap = groupedGeoLookup
                .Select(CreateDictionaryKey(groupedGeoLookup["_1"]).As("_1"), CreateDictionaryValue(groupedGeoLookup["_1"], groupedGeoLookup["_2"]).As("_2"));

            // In scala this is .reduceByKey(_ ++ _)
            DataFrame reducedTreeMap = TreeMap
                .GroupBy(TreeMap["_1"])
                .Apply(
                    new StructType(new[]
                    {
                        new StructField("_1", new IntegerType()),
                        new StructField("_2", new StringType())
                    }),
                    r => ReduceByKeyDictionary(r, "_1", "_2"));

// Here is the code for the Vector UDFs

        private static Apache.Arrow.RecordBatch ReduceByKeyDictionary(
            Apache.Arrow.RecordBatch records,
            string groupFiledName,
            string stringFieldName)
        {
            int stringFieldIndex = records.Schema.GetFieldIndex(stringFieldName);
            Apache.Arrow.StringArray stringValues = records.Column(stringFieldIndex) as Apache.Arrow.StringArray;

            List<string[]> result = new List<string[]>();
            for (int i = 0; i < stringValues.Length; ++i)
            {
                string current = stringValues.GetString(i);
                string[] strArr = JsonSerializer.Deserialize<string[]>(current);
                result.Add(strArr);
            }

            var x = JsonSerializer.Serialize(result.ToArray());
            int groupFieldIndex = records.Schema.GetFieldIndex(groupFiledName);
            Apache.Arrow.Field groupField = records.Schema.GetFieldByIndex(groupFieldIndex);

            return new Apache.Arrow.RecordBatch(
                new Apache.Arrow.Schema.Builder()
                    .Field(groupField)
                    .Field(f => f.Name(stringFieldName).DataType(Apache.Arrow.Types.StringType.Default))
                    .Build(),
                new Apache.Arrow.IArrowArray[]
                {
                    records.Column(groupFieldIndex),
                    new Apache.Arrow.StringArray.Builder().Append(x).Build()
                },
            1);
        }

        private static Apache.Arrow.RecordBatch KeepMinimumValueForKey(
            Apache.Arrow.RecordBatch records,
            string groupFiledName,
            string stringFieldName)
        {
            int stringFieldIndex = records.Schema.GetFieldIndex(stringFieldName);
            Apache.Arrow.StringArray stringValues = records.Column(stringFieldIndex) as Apache.Arrow.StringArray;
            int min = int.MaxValue;
            string minString = string.Empty;

            for (int i = 0; i < stringValues.Length; ++i)
            {
                string current = stringValues.GetString(i);
                int currentValue = int.Parse(JsonSerializer.Deserialize<string[]>(current)[0]);
                if (currentValue <= min)
                {
                    min = currentValue;
                    minString = current;
                }
            }

            int groupFieldIndex = records.Schema.GetFieldIndex(groupFiledName);
            Apache.Arrow.Field groupField = records.Schema.GetFieldByIndex(groupFieldIndex);

            return new Apache.Arrow.RecordBatch(
                new Apache.Arrow.Schema.Builder()
                    .Field(groupField)
                    .Field(f => f.Name(stringFieldName).DataType(Apache.Arrow.Types.StringType.Default))
                    .Build(),
                new Apache.Arrow.IArrowArray[]
                {
                    records.Column(groupFieldIndex),
                    new Apache.Arrow.StringArray.Builder().Append(minString).Build()
                },
                1);
        }
imback82 commented 4 years ago

Also, are you running on Windows or Linux?

joperezr commented 4 years ago

Running on Windows for both the .NET and the Scala runs, and against a local spark cluster running on my machine. For the scala code, I ran the code inside the spark-shell. For the .NET code, I ran using spark-submit very similar to what the instructions of the examples in this repo suggest, and I used both the worker and the app running in .NET Core 2.1.

imback82 commented 4 years ago

Because of the way that UDFs run in Spark.NET (meaning that Spark.NET code calls into Java, which spins off a worker process, which reflects into the UDF to finally execute it) these transformations are super costly. In the end, it takes about 20 minutes to run the same transformations when using Spark.NET.

I don't believe the 20 minute cost is mostly from shipping data from JVM to CLR and back to JVM (we have run lots of benchmarks and didn't see 60x difference. we will share more result in Spark Summit next week, but this is what we shared in the last Spark Summit: https://devblogs.microsoft.com/dotnet/introducing-net-for-apache-spark/#performance). I think it's mostly due to how UDF is written.

joperezr commented 4 years ago

I think it's mostly due to how UDF is written.

For the example above, do you mind pointing out what is wrong? BTW all the Serialization code happening in the UDFs is due to not having support for things like ValueTuples or Dictionaries which are needed by the above example. If there is an easier way to perform those UDFs, I'm very interested in learning :)

imback82 commented 4 years ago

Maybe you can do something similar to https://github.com/dotnet/spark/pull/205/files? There is still one UDF involved, but most of the operations is done on the JVM.

Niharikadutta commented 4 years ago

@joperezr Were you able to get this issue resolved? Please let us know if you need any further assistance on this from us.

joperezr commented 3 years ago

I don't work on that codebase any more but AFAIK there wasn't a good way at the time to perform map or reduce operations very easily other than performing custom UDFs which ended up not being very performant. A lot of Scala code out there still relies heavily on map-reduce operations over data so I think that it would be good to add anything that helped towards those scenarios.