dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.57k stars 717 forks source link

Custom serializer support for dumps_function() #4010

Open xKHUNx opened 4 years ago

xKHUNx commented 4 years ago

I'm having a use case which requires me to parallelize nested Cython functions using Dask, and from the documentation, I've learned that the we can set the serializers/deserializers for a Client object. This means I can implement my own serializer, in this case, dill, to parallelize my nested functions. It looks like this.

import dill
from distributed.protocol.serialize import register_serialization_family

def dill_dumps(x):
    header = {'serializer': 'dill'}
    frames = [dill.dumps(x)]
    return header, frames

def dill_loads(header, frames):
    if len(frames) > 1:  # this may be cut up for network reasons
        frame = ''.join(frames)
    else:
        frame = frames[0]
    return dill.loads(frame)

register_serialization_family('dill', dill_dumps, dill_loads)

...

client = Client(address='tcp://0.0.0.0:8786', set_as_default=False, serializers=['dill'])

However, when I'm using map() from the client, I am still unable to serialize it, and upon inspecting the error message, it turns out it is using pickle (which will not work for nested function) in dumps_function() to serialize my function in the class.

Here's the error message:

Traceback (most recent call last):
  File "training_engine.py", line 2226, in training_engine.train.train_and_update
  File "accml.py", line 459, in accml.AccML.fit
  File "pipeline_selector.py", line 621, in pipeline_selector.ClassifierPipelineSelector.fit
  File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1779, in map
    actors=actor,
  File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 2590, in _graph_to_futures
    "tasks": valmap(dumps_task, dsk3),
  File "/usr/local/lib/python3.7/site-packages/toolz/dicttoolz.py", line 83, in valmap
    rv.update(zip(iterkeys(d), map(func, itervalues(d))))
  File "/usr/local/lib/python3.7/site-packages/distributed/worker.py", line 3354, in dumps_task
    return {"function": dumps_function(task[0]), "args": warn_dumps(task[1:])}
  File "/usr/local/lib/python3.7/site-packages/distributed/worker.py", line 3318, in dumps_function
    result = pickle.dumps(func)
  File "/usr/local/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 51, in dumps
    result = cloudpickle.dumps(x, **dump_kwargs)
  File "/usr/local/lib/python3.7/site-packages/cloudpickle/cloudpickle.py", line 1148, in dumps
    cp.dump(obj)
  File "/usr/local/lib/python3.7/site-packages/cloudpickle/cloudpickle.py", line 491, in dump
    return Pickler.dump(self, obj)
  File "/usr/local/lib/python3.7/pickle.py", line 437, in dump
    self.save(obj)
  File "/usr/local/lib/python3.7/pickle.py", line 535, in save
    self.save_global(obj, rv)
  File "/usr/local/lib/python3.7/site-packages/cloudpickle/cloudpickle.py", line 897, in save_global
    Pickler.save_global(self, obj, name=name)
  File "/usr/local/lib/python3.7/pickle.py", line 960, in save_global
    (obj, module_name, name)) from None
_pickle.PicklingError: Can't pickle <cyfunction ClassifierPipelineSelector.fit.<locals>.dask_fit at 0x7fe2b25de870>: it's not found as pipeline_selector.dask_fit

I would like to request for the feature to use the custom serializer, as set from the Client object.

mrocklin commented 4 years ago

I would like to request for the feature to use the custom serializer, as set from the Client object.

I agree that that would be useful in many cases. One would need to think about how to communicate that custom serializer to all of the parts of the cluster. There is also an issue in that many people use the serializers functionality in order to restrict custom serialization. For example some people don't want to allow pickle deserialization in the scheduler for security reasons.

I think that in the end security will probably take precedence over convenience, and you'll be asked to specify your serializer on each Worker/Scheduler when you set them up.