dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.57k stars 718 forks source link

Cross Language Client/Workers #586

Open mrocklin opened 7 years ago

mrocklin commented 7 years ago

The Dask.distributed dynamic task scheduler could be replicated across different languages with low-to-moderate effort. This would require someone to build Client and Worker objects in the other language that communicate to the same Scheduler, which contains most of the logic but is fortunately language agnostic. More specifically, there are three players in a dask.distributed cluster, only two of which would need to be rewritten:

  1. Client: something that users use to submit tasks to the scheduler. Would need to be rewritten but is fairly simple. Needs to know how to serialize functions, encode msgpack, and send data over a socket.
  2. Worker: a process running on a remote node that performs those actual tasks. Would need to be rewritten but is also fairly simple. Needs to know how to communicate over a socket, deserialize functions, and execute them asynchronously in some sort of thread pool.
  3. Scheduler: a process to coordinate the actions of all clients and workers, ensuring that the computation proceeds to completion under various stimuli. This is very complex but would not need to be rewritten as it is language agnostic.

About 90% of the complexity of dask.distributed is in the scheduler. Fortunately the scheduler is also language agnostic, and communicates only using msgpack and long bytestrings. It should be doable to re-implement the Client and Workers in another language like R or Julia if anyone has interest. This would require the following understanding in the other language:

  1. How to serialize and deserialize functions and variables in that language
  2. How to communicate over a network, hopefully in a non-blocking way
  3. How to evaluate functions using a separate thread pool
ViralBShah commented 7 years ago

Doing all these in Julia should be fairly straightforward. I also see an MsgPack.jl package

https://github.com/kmsquire/MsgPack.jl

  1. All julia objects can be serialized and deserialized in a straightforward way. Some tweaks may be necessary for performance when we get there.
  2. Julia has Tasks, which give a very nice way to do asynchronous communication.
  3. Evaluating in a threadpool may take some work as our multi-threading is in early stages, but we could easily have a pool of julia processes for now.

Are there code samples showing how to do 1, 2 and 3?

cc: @shashi @amitmurthy @kmsquire

mrocklin commented 7 years ago

Some relevant documentation:

  1. Dask.distributed protocol: http://distributed.readthedocs.io/en/latest/protocol.html . It looks like this page is not entirely comprehensive. It should give a good idea on how we send messages around though. I would be happy to go through and improve it if there is interest.
  2. Foundations document for Python developers: http://distributed.readthedocs.io/en/latest/foundations.html . This is mostly on how we organized things internally at the base level. It has nothing to do with how one might manage things in Julia, but is helpful if you want to learn from existing code.
  3. The worker code: https://github.com/dask/distributed/blob/master/distributed/worker.py . It's about 1000 lines of Python, only about half of which is actually necessary for a minimal implementation.
  4. The client code: https://github.com/dask/distributed/blob/master/distributed/client.py . It's about 2000 lines of Python, only about a fifth of which is probably necessary for a minimal implementation. Much of this is special features that have accrued over the past year.

I was looking at Julia tasks yesterday and indeed they seem quite nice. I'm a bit jealous :)

You could also have workers with a single thread for computation. That's a common mode of operation in Python as well when dealing with GIL-holding Pure Python code. However it is important that the worker process can handle incoming messages while computing though. Presumably Julia's asynchronous task API supports this?

It seems like there is some non-trivial interest in this. I'll spend some time scoping out a minimal API for clients and workers.

mrocklin commented 7 years ago

With Julia in particular I'm curious on how to have this satisfy other parallel programming protocols in the language. The main thing that Dask brings here is intelligent dynamic task scheduling. We've thought fairly hard about how to do this well. The APIs that Python Dask uses though might not be the right APIs for Julia-Dask. Python-dask generally uses delayed function calls and futures/promises.

mrocklin commented 7 years ago

Some general notes that came up in private conversation:

ViralBShah commented 7 years ago

Yes, Julia processes can handle asyncrhonous communication while computing, so long as they are executing Julia code. This is done through Julia Tasks. If a lot of time is spent in a C/Fortran library call, say BLAS, then that will starve the julia scheduler. The @async and @sync macros are quite handy.


help?> @async
  @async

  Like @schedule, @async wraps an expression in a Task and adds it to the
  local machine's scheduler queue. Additionally it adds the task to the set of
  items that the nearest enclosing @sync waits for. @async also wraps the
  expression in a let x=x, y=y, ... block to create a new scope with copies of
  all variables referenced in the expression.

help?> @sync
  @sync

  Wait until all dynamically-enclosed uses of @async, @spawn, @spawnat and
  @parallel are complete. All exceptions thrown by enclosed async operations
  are collected and thrown as a CompositeException.
ViralBShah commented 7 years ago

Here's a simple example using Julia's download() function to download some URLs, which calls curl/wget concurrently using Julia's tasks:

julia> URLs = 
3-element Array{String,1}:
 "www.google.com"
 "www.amazon.com"
 "www.yahoo.com" 

julia> @sync for u in ["www.google.com", "www.amazon.com", "www.yahoo.com"]
           @async download(u)
       end
  % Total    % Received % Xferd   A v%e rTaogtea lS  p e e%d  R e cTeiimvee d   %  TXifmeer d     A vTeirmaeg e  CSuprereedn t 
  T i m e         T i m e           T i m e     C u r r e n t 
    D l o a d     U p l o a d       T o t a l       S p e n t      D lLoeafdt    USppleoeadd
t a l0      S p e0n t      0  L e f t  0  S p e e0d 
0    0        0  0         00   - - : -0- : - -  0 - - : - -0  :  -%  - T  o- t-0a: l-  - :  -   - % 0   R - -e :c0-e-i:v-e-d  -%- :X-f-e:r-d-   -A-v:e-r-a:g-e-  S p e e d0   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   261  100   261    0     0   1619      0 --:--:-- --:--:-- --:--:--  1631
100   304  100   304    0     0   1346      0 --:--:-- --:--:-- --:--:--  1351
100 12584    0 12584    0     0  33003      0 --:--:-- --:--:-- --:--:-- 33003
100   231  100   231    0     0    365      0 --:--:-- --:--:-- --:--:--   366
100    17  100    17    0     0     21      0 --:--:-- --:--:-- --:--:--    21
100  1378  100  1378    0     0    618      0  0:00:02  0:00:02 --:--:--  1065
100  372k    0  372k    0     0   150k      0 --:--:--  0:00:02 --:--:--  229k
ViralBShah commented 7 years ago

To me the larger question will be how Julia users can use these capabilities. That will eventually need a parallel array/dataframe implementation. The good news is that most Julia array code targets AbstractArrays and increasingly, AbstractDataframes are also falling into place. Thus DaskArrays and DaskDataFrames can transparently provide alternate implementations and be leveraged by users - everything will work out under the hood with multiple dispatch.

mrocklin commented 7 years ago

To me the larger question will be how Julia users can use these capabilities. That will eventually need a parallel array/dataframe implementation.

I agree that a parallel array/dataframe implementation composes well with this, however I would disagree with the term "need". A surprisingly large amount of Dask's use today has little to do with the dask.array and dask.dataframe implementations and a lot to do with arbitrary task scheduling. Big business definitely likes "Big Data Frames" as a database surrogate on Hadoop, but outside of that there is a lot of use of Dask for arbitrary dynamic task scheduling. This was somewhat unexpected and a large pivot of the project from Dask's original implementation as a parallel array.

Here is a slide deck from a recent PyData conference stressing this topic if it's of interest: http://matthewrocklin.com/slides/pydata-dc-2016#/

ViralBShah commented 7 years ago

That's interesting. Thanks for the link - just went through it. So what do users typically use Dask parallelism for today? This would be quite interesting personally for me - but not sure if this is the right place to ask that question. Any pointers would help.

mrocklin commented 7 years ago

These links have some additional information:

http://dask.pydata.org/en/latest/use-cases.html http://matthewrocklin.com/blog/work/2016/08/16/dask-for-institutions

mrocklin commented 7 years ago

This project seems to be communicating to the dask-scheduler from Julia: https://github.com/invenia/DaskDistributedDispatcher.jl

iamed2 commented 7 years ago

@mrocklin Yup that's our project :)

ViralBShah commented 7 years ago

@mrocklin You should come to JuliaCon next week in Berkeley if you are in the region!

mrocklin commented 7 years ago

I would definitely swing by if I was in the area. I've moved away from the bay area though. Next time there is an event on the US East Coast I'll try to swing by.

On Sun, Jun 18, 2017 at 4:01 AM, Viral B. Shah notifications@github.com wrote:

@mrocklin https://github.com/mrocklin You should come to JuliaCon next week in Berkeley if you are in the region!

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/586#issuecomment-309263021, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszDAppN9Jmh1kWfq8OZdfHdtnjgnqks5sFNlIgaJpZM4KXtOj .

dselivanov commented 7 years ago

Didn't know scheduler is language agnostic. Will check whether it is possible to create small proof of concept for R.

mrocklin commented 7 years ago

I'm very glad to hear it @dselivanov . Please let us know if there is anything that causes frustration on the Dask side.

niallrobinson commented 6 years ago

Hi @dselivanov - did you get anywhere with this? I had a conversation with someone who said that RStudio put the work in to make R play with Spark. Apparently, they ended up using PySpark to glue R to Spark. I wonder if they'd have the appetite to do the same for Dask - we might find that some of the work has already been done.

dselivanov commented 6 years ago

@niallrobinson this is not true - sparkR and sparklyr work without python.

Regarding dask for R - I haven't had time to create proof of concept. But I'm pretty sure it is possible. The main challenge is how to communicate over a network in a non-blocking way.

mrocklin commented 6 years ago

My hope is that R has some non-blocking concurrent framework. If not then we would have to communicate in separate threads and use queues.

dselivanov commented 6 years ago

It looks like https://github.com/HenrikBengtsson/future can be useful. @HenrikBengtsson what is your opinion - you've done quite a lot in this field?

dselivanov commented 6 years ago

@mrocklin would be super useful if you can point to the document/scheme on what is needed to implement very minimal "big data 'hello world' " - word count. Something like

  1. start with 2 workers w1, w2 and client c
  2. Client c send message to scheduler to read data
  3. scheduler sends message to workers w1, w2 to read data and assign results to keys k1_1, k2_1
  4. scheduler send message to workers to make local reduce - local word counts. Assign results to k1_2, k2_2.
  5. scheduler send message to partition results by word (say 2 hash partitions for each worker) - {k1_3, k1_4}, {k2_3, k2_4}
  6. scheduler send message to workers to send results to peers:
    • w1 sends k1_3 to w2
    • w2 sends k2_4 to w1
  7. scheduler asks for local reduce

Something like above but with technical details will be super useful for the start.

mrocklin commented 6 years ago

These links may help people to get started on this:

http://distributed.readthedocs.io/en/latest/journey.html http://distributed.readthedocs.io/en/latest/protocol.html https://github.com/invenia/Dispatcher.jl

On Mon, Apr 16, 2018 at 2:19 AM, Dmitriy Selivanov <notifications@github.com

wrote:

@mrocklin https://github.com/mrocklin would be super useful if you can point to the document/scheme on what is needed to implement very minimal "big data 'hello world' " - word count. Something like

  1. start with 2 workers w1, w2 and client c
  2. Client c send message to scheduler to read data
  3. scheduler sends message to workers w1, w2 to read data and assign results to keys k1_1, k2_1
  4. scheduler send message to workers to make local reduce - local word counts. Assign results to k1_2, k2_2.
  5. scheduler send message to partition results by word (say 2 hash partitions for each worker) - {k1_3, k1_4}, {k2_3, k2_4}
  6. scheduler send message to workers to send results to peers:
    • w1 sends k1_3 to w2
    • w2 sends k2_4 to w1
  7. scheduler asks for local reduce

Something like above but with technical details will be super useful for the start.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/586#issuecomment-381491443, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszL8sqK3IUhYu1AgIx6buJFpJ8d4cks5tpDf-gaJpZM4KXtOj .