dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 718 forks source link

Serialize data within tasks #2110

Open mrocklin opened 6 years ago

mrocklin commented 6 years ago

Currently if we submit data within a task like the following:

x = np.array([1, 2, 3])
future = client.submit(func, x)

Then we construct a task like (func, x) and then call pickle on this task. We don't do custom serialization once we contruct tasks. This is mostly because stuff in tasks is rarely large, and traversing tasks can increase our overhead in the common case. Typically we encourage people to use dask.delayed or something similar to mark data

x = np.array([1, 2, 3])
x = dask.delayed(x)
future = client.submit(func, x)

But this is error prone and requires rarely held expertise.

We might again consider traversing arguments.

dhirschfeld commented 6 years ago

Just a note to mention that none of the below work:

fut = client.submit(echo, obj)
obj_fut = client.scatter(obj)
fut = client.submit(echo, obj_fut)
delayed_obj = delayed(obj)
fut = client.submit(echo, delayed_obj)

All three methods of submitting a function to run on the cluster fail, going through the exact same warn_dumps ⟶ pickle.dumps code path.

Failing test cases for each are currently part of PR #2115

dhirschfeld commented 6 years ago

Update: Scattering does work, but only if you ensure the custom serialization is imported on all workers.

obj_fut = client.scatter(obj)

...will fail with the below error if distributed.protocol.arrow isn't explicitly imported.

  File "C:\dev\src\distributed\distributed\protocol\serialize.py", line 157, in serialize
    raise TypeError(msg)
TypeError: Could not serialize object of type RecordBatch

If you import distributed.protocol.arrow in the client process but not in the workers it fails with the below error:

  File "c:\dev\src\distributed\distributed\core.py", line 448, in send_recv
    raise Exception(response['text'])
Exception: Serialization for type pyarrow.lib.RecordBatch not found

...so to get it to actually work I need to run:

def init_arrow():
    from distributed.protocol import arrow
    return None

init_arrow()
client.run(init_arrow)
obj_fut = client.scatter(obj)
fut = client.submit(echo, obj_fut)
result = fut.result()
assert obj.equals(result)
dhirschfeld commented 6 years ago

The question I have is Is this by design? i.e. is it intended that the user has to initialise the serialisation support on all of the workers?

In the case of an adaptive cluster I guess this could be supported by using the --preload option for any new workers.

mrocklin commented 6 years ago

Yes, something like preload is probably the right way to handle this today, assuming that it's not already in library code.

Eventually it would be nice to allow clients to register functions to be run at worker start time with the scheduler that could be passed to workers as they start up.

  1. Client registers func with the scheduler as a preload operation
  2. Scheduler holds on to func
  3. Scheduler also sends func to all workers to have them run func
  4. Workers run func
  5. New worker arrives, tells Scheduler that it exists
  6. As part of saying "Hi" the scheduler also gives the worker func and tells it to run it

If anyone is interested in implementing this let me know and I'll point to the various locations in the client, scheduler, and worker, where these changes would have to be made. It's a modest amount of work and would be a good introduction to the distributed system.

dhirschfeld commented 6 years ago

I moved the above to a new issue as I think it's a separate concern.

jakirkham commented 4 years ago

The idea of handling external serialization more simply is brought up in issue ( https://github.com/dask/distributed/issues/3831 ) as well.