dfdx / Spark.jl

Julia binding for Apache Spark
Other
205 stars 39 forks source link

RFC: PySpark-like, DataFrame-only API #104

Closed dfdx closed 2 years ago

dfdx commented 2 years ago

I'm working on a nearly complete rewrite of the Spark.jl and want to get some feedback before releasing it. The summary of the changes:

Why

At its current state, Spark.jl is terribly outdated and pretty unstable. A lot of effort is needed to keep the old RDD interface working, but almost nobody uses this API nowadays. At the same time, DataFrame API in main is limited to just a few core methods and is hardly suitable for any real project.

PySpark-compatible API

There are thousands of tutorials, answers and discussions for PySpark out there, as well as hundreds of Pythonists looking for familiar API in the Julia land. So let's just reuse these materials. Examples of this new API:

using Spark.SQL

spark = SparkSession.builder.
        appName("Hello").
        master("local").
        config("some.key", "some-value").
        getOrCreate()

df = spark.read.text("README.md")
df.value.contains("Julia")

SQL and Streaming API

In this initial effort I'm going to implement >= 50% of the SQL and Structured Streaming API, including all the core data types and enough functions to copy-paste most Python tutorials. The long tail of other functions is time-consuming, but otherwise easy to add too.

No RDD

I'm going to discontinue support for the RDD API including JuliaRDD that we used to run custom Julia code on a cluster. The communication protocol between JVM and Julia in that RDD has been terribly outdated, but most importantly, we haven't found a way to reliably manage the state of the Julia workers in a variety of possible runtimes. See this and this for more details.

UDFs via compilation to Java

As described in these issues, the best alternative for running complex Julia scripts on a cluster is Kubernetes. However, to support simple data transformations, the new API also features a Julia-to-Java compiler. Example:

f = s -> lowercase(s)
f_udf = udf(f, "Hi!")
r = jcall2(f_udf.judf, "call", JString, (JString,), "Big Buddha Boom!")
@test convert(String, r) == f("Big Buddha Boom!")

f_udf.judf is a fully functional Java object implementing UDF1 interface that can be passed to any matching Java methods without Julia binary installed on a cluster.

On a lower level, one can also compile any Java classes from a string, including a class starting a Julia server (e.g. via juliacaller). But managing the lifecycle of this server is up to a user and out of scope of this initial effort.

One big breaking change

Along with RDD support, we lose DataFrames.jl compatibility and other contributions. I apologize for this. PRs to bring them back are highly welcome.


I'm currently close to finishing the SQL interface and look forward for the Structured Streaming API. I plan to finish it in 2 weeks or less, and then tag the new version. Comments are highly welcome.

dfdx commented 2 years ago

I think that directly exposing Java or Scala to users would solve many problems.

I just realized I've never mentioned it and the code in compiler.jl is too obfuscated for an occasional reader, but actually you can embed Java classes. Here's an example:

import Spark: create_instance, jcall2

src = """
               package julia.compiled;

               public class Hello {

                   public String hello(String name) {
                       return "Hello, " + name;
                   }

                   public void goodbye() {
                       System.out.println("Goodbye!");
                   }
               }
           """
obj = create_instance(src)     # compile the class and instantiate the object in one hop, see also create_class
jresult = jcall2(hello, "hello", JString, (JString,), "Bob")
result = convert(String, jresult)    # "Hello, Bob"

There are certain pitfalls with objects created this way, but from the JVM perspective they are totally valid. As a particular example, udf() function creates an object that implements one of the Spark's UDFn interfaces and can be passed to any applicable Java method.

Compiling Scala should not be much harder, but Scala adds a lot of magic on top of JVM primitives and thus is generally harder to work with via JNI.


however it's bit sad to see the "just run this Julia function in parallel" functionality go

The problem with this definition is uncertainty of "this Julia function". Is this function a pure Julia or has package dependencies? What version of Julia itself is required to run it, and is it already installed on workers? How long will it take to call this function compared to the overhead of launching Julia? And compared to transferring sys image to all workers? How much data it reads from the source and writes to the sink?

Depending on the answers, the optimal solution may be to create a Julia server on a worker or launch Julia on each batch, trace and serialize function or transfer the whole package, use Arrow to pass data between processes or manipulate objects right in the JVM memory, etc. The RDD implementation has always been faulty and simplistic, if we want to do better, we need to come up with very specific requirements for running Julia on Spark workers.

Thus before we have these requirements, we can:

  1. Create UDFs for simple cases.
  2. Run whatever code in Java, including the code to launch Julia workers with properties tailored towards a specific task.
  3. Run Julia in Docker on Kubernetes :)

I'd really like to have this one back, I'll happily contribute the Arrow interop again, if you'd like.

It will be really great! I'm really sorry to drop all the cool contributions we had so far, but hopefully we will get them added again for the new API too.

I think it would be much preferable to get a Julia-native DataFrame from .collect instead of the Vector{Row}.

One of the main design choices in the new API is to copy Python API to enable people simply copy-paste thousands of PySpark examples out their. If we change signature of some methods, users will have to figure out the correct usage, which will drive them away. I'd like to avoid it.

On the other hand, nobody stops us from having e.g. .collect(Table) or .collect_df(). We only need one-way API compatibility after all.

Similarly calling createDataFrame(...) with a Julia Table would be nicer than having to create the data Row by Row.

This is another example where we can use Julia multiple dispatch to create as many convenience methods as we need.

I'm working with, maybe naming this one SparkDataFrame would avoid some confusion?

How about Spark.DataFrame? :)

dfdx commented 2 years ago

I just learnt two things:

These two things make me prioritize custom Julia code runner again. I'm thinking of juliacaller for control flow and Arrow for data flow. But there are still many open questions, so I'm going to first release the new DataFrame API without custom Julia runners anyway.

exyi commented 2 years ago

One of the main design choices in the new API is to copy Python API to enable people simply copy-paste thousands of PySpark examples out their. If we change signature of some methods, users will have to figure out the correct usage, which will drive them away. I'd like to avoid it.

On the other hand, nobody stops us from having e.g. .collect(Table) or .collect_df(). We only need one-way API compatibility after all.

Cool, I'll look into adding collect_df then :)

I looked into how PySpark handles that, and they use a tiny Python class for Row which is not a wrapper for the Scala Row: https://github.com/apache/spark/blob/master/python/pyspark/sql/types.py#L1781. I think that's slightly preferable, since the Julia-Java function calls are not exactly efficient. We could then use Arrow and convert the Table into a Vector of these Rows. Alternatively, the PySpark Row behaves almost like a Julia named tuple so maybe we could return Vector{named tuple} (arrow had a function for that too, AFAIK)?

These two things make me prioritize custom Julia code runner again. I'm thinking of juliacaller for control flow and Arrow for data flow. But there are still many open questions, so I'm going to first release the new DataFrame API without custom Julia runners anyway.

Sure, do that. I think that most users will just want fast access some hdfs files and/or do some data wrangling in Spark SQL - for these use cases, this approach is pretty much perfect.

dfdx commented 2 years ago

I looked into how PySpark handles that, and they use a tiny Python class for Row which is not a wrapper for the Scala Row

That's interesting. I agree we can optimize it this way, but it will take time to re-implement all the methods involving rows, so perhaps in this initial release. But the most interesting questions is why people even want Row to be performant. The only two reasons I used Row in practice are in examples like spark.createDataFrame(...) or to inspect a couple of rows. Honestly, copying data from workers to the driver itself sounds suspicious for me.

Anyway, I believe implementing specialized and more data-efficient functions like collect(df, DataFrames.DataFrame) / collect_df(df) in addition to the existing ones is a good start.

dfdx commented 2 years ago

To add to the collect(DataFrames.DataFrame) / collect_df() discussion, PySpark also supports a method with a very clear name - .toPandas(). Likewise, we can have e.g. .to(DataFrames.DataFrame). Although, I don't have a clear preference here, so the final decision is up to the person who implements it.