Open dfdx opened 3 years ago
Spark / Spark.jl is remarkably bad at managing external dependencies. While for JVM languages one can simply create and distribute an uberjar, Python, Julia and other languages have to use various workarounds.
By default, PySpark uses cloudpickle to serialize code sent to executors. Cloudpickle does pretty good job extracting code dependencies in the same project, but it doesn't handle installed libraries.
The recommended approach to distribute Python libraries is to pack them into a big .tgz or .zip archive and attach to a Spark session. It works with Virtualenv and Conda environments as long as dependencies don't cross the their boundaries (i.e. no system dependencies).
As far as know, in Julia we have neither Cloudpickle, nor tools for packaging isolated environments. Moreover, to run Spark.jl on an existing cluster, Julia itself should be already installed. Cloud providers like Amazon EMR support bootstrap actions (like this for Julia), but they should be run during cluster creation and then apply to all applications, potentially creating dependency conflicts.
One popular approach to create isolated environments is to use custom Docker containers in a Kubernetes cluster. A user can pack whatever code they need into a set of containers and deploy them to Kubernetes in runtime, without any restrictions or prerequisites.
When we apply a Julia function to a piece of data in Spark.jl, this piece goes through the following places: data source -> JVM executor -> file or socket -> Julia -> file or socket -> JVM executor -> data sink
. For reasonably simple functions, data serialization and transfer between JVM and Julia (or any other language) can make a major part of execution time.
Alternative (though not necessary better) design is to allow workers in different languages. Say, we define a worker protocol - a set of network interfaces a running worker should satisfy to become a part of the cluster. The worker is then responsible for running application executors in the best possible way and passing data to/from them.
The downside of this design is that the protocol and various utilities (e.g. data sources) need to be implemented in each supported programming language. Thus a sensible approach would be to focus on Julia-only implementation, but keep this option in mind.
In Spark, a user has little control of an executor lifecycle - there's no dedicated set up and tear down steps, the executor is created when Spark decides it's needed and destroyed when Spark thinks it's no longer useful.
When executing parallel code in Julia, it would be great to set up the project environment on the worker/executor, keep it around during application execution and destroy only when the user thinks it's no longer needed.
Another use case is managing state. Say, we want to run multiple simulations in parallel, dumping the results to the driver from time to time. In Spark.jl, we can run simulations with something like this:
rdd = ...
all_results = []
for i=1:100
results = map_partitions(rdd, data -> ...)
push!(all_results, results.collect())
end
But what if we don't want to drop the state of the simulation between iterations? In Spark, we need to use shared variables/accumulators or just pass the results to and from workers. If we could keep executors stateful, it wouldn't be an issue at all.
The downside of having the state is increased requirements for node reliability - if an executor fails, the whole computation becomes invalid. This can be mitigated by checkpointing as in large ML model training.
This project comes from my frustration caused by development and maintaining Spark.jl. While Spark is one of the leading frameworks for distributed data processing, it certainly doesn't cover all possible use cases for parallel computing. Here I'm going to explore alternative approaches and see if we need a new framework.