dotnet / spark

.NET for Apache® Spark™ makes Apache Spark™ easily accessible to .NET developers.
https://dot.net/spark
MIT License
2.02k stars 314 forks source link

[FEATURE REQUEST]: Encoders #154

Closed jatcar95 closed 5 years ago

jatcar95 commented 5 years ago

Description I'm creating an application using .NET for Spark and Kafka. In a different application, I have a C# object that is serialized and sent to Kafka. In my Spark application, I want to deserialize the object and perform some actions on it using built-in Spark functions.

Problem I can't figure out how to get my object into a format usable by Spark. I created a UDF that deserializes and returns the object, but due to limitations on the return types of UDFs, that doesn't work. I could create a separate UDF to retrieve each field of the serialized object, but that would mean deserializing it each time I needed to access a field. And that feels like poor style anyway. I could do something like convert it to JavaScript and then parse the JavaScript into Spark SQL representation, but that also would occur some overhead.

Encoders In Spark 2.3.0, there is a Java class called "Encoder" that is used to transform Java objects to and from the internal Spark SQL representation. Documentation is here: https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/Encoder.html. This seems like it does what I need. However, it doesn't seem that .NET for Spark supports the Encoder class.

Can we get support for Encoders in a future release? And until then, are there any workarounds for my problem? It seems like it would be a pretty common use case - using fields of a C# object with Spark functions - but I can't find a good way to do it.

Thanks Edited to add direct link to Encoders documentation.

imback82 commented 5 years ago

Support for the custom Encoders is not feasible since they need to run on JVM. Can you check #117 if it addresses your scenario? This feature will be available in coming weeks. Also, can you provide a specific example to see if there is another workaround?

jatcar95 commented 5 years ago

I think the 117 scenario may work. I'm a bit confused on how the schema is used, but I will comment there.

Specifically, I have a data type that represents a log for a single query. It has lots of fields, but let's suppose it's just this:

class ImpressionData
{
    public DateTime Timestamp { get; set; }
    public Guid ImpressionGuid { get; set; }
    public string DataCenter { get; set; }
    public bool IsMobile { get; set; }

    static ImpressionData DeSerialize(byte[] data) { ... }
}

This gets serialized in another application, sent to Kafka, and should be deserialized in Spark. So far, I have a DF that reads in the raw bytes from Kafka:

var rawLogs = spark.ReadStream()
    .Format("kafka")
    .Option("kafka.bootstrap.servers", "...")
    .Option("subscribe", "my_topic")
    .Option("startingOffsets", "latest")
    .Load()
    .SelectExpr("CAST(value AS BINARY) AS value");

My original idea was to have a UDF to deserialize it like so:

Func<Column, Column> DeSerialize = Udf<byte[], ImpressionData>
(
    (data) =>
    {
        return ImpressionData.DeSerialize(data);
    }
);

var deserializedLogs = rawLogs.Select(DeSerialize(rawLogs["value"]);

But this doesn't work because of the limitations of return types. Plus, ideally what I want is not one column with my entire object in it, but multiple columns - one for each field in the object.

I want to run aggregations on this data, like the number of mobile queries in a 5-min bucket, per data center. I think I could write the logic for that given the correct DF, but getting there is where I'm struggling.

My latest idea is to convert it to JSON as well and return that, then do something with that JSON string - although this seems inefficient.

Func<Column, Column> DeSerialize = Udf<byte[], string>
(
    (data) =>
    {
        return JsonConvert.SerializeObject(ImpressionData.DeSerialize(data));
    }
);
imback82 commented 5 years ago

When #117 is ready, it should address your scenario right? It will look something like the following:

var schema = new StructType(new[] {
    new StructField("Timestamp", new TimestampType()),
    new StructField("ImpressionGuid", new StringType()),
    new StructField("DataCenter", new StringType()),
    new StructField("IsMobile", new BooleanType()) });

// The following API is still work in progress.
// Note that you should return object[] where each array element maps to the field.
var udf = Udf<byte[], object[]>((data) => ImpressionData.DeSerialize(data), schema);

// Then you can do
var newDf = rawLogs.Select(udf(rawLogs["value"]).As("ImpressionData"));
newDf.Select(newDf["ImpressionData.DataCenter"]).Show();

// and so on
jatcar95 commented 5 years ago

Yeah, I think that should work! Thanks, looking forward to it