dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 718 forks source link

Registration of custom (de)serializer is recognized by dask_loads, dask_dumps, but not when computing graph #2469

Open zbarry opened 5 years ago

zbarry commented 5 years ago

I've created an object wrapper type to carry along metadata with otherwise typical data objects such as NumPy arrays. It seems that for whatever reason, after registering the associated (de)serializers, while the Distributed functions for serialization recognize this registration when called directly, during execution of the computational graph, these registrations are ignored, and Pickle is used instead.

Self-contained demonstration of issue:

Must be in two separate module files for whatever reason.

main.py

from imageobject import ImageObject
from distributed.protocol.serialize import dask_loads, dask_dumps

import numpy as np

from dask.distributed import Client, LocalCluster

import dask.bag as db

def get_imageobject(*args, **kwargs):
    return ImageObject(np.zeros(6))

if __name__ == '__main__':

    # #######################
    # serialize manually

    im = get_imageobject()

    im_ser = dask_loads(*dask_dumps(im))

    print(np.all(im == im_ser))

    # #########################################
    # serialize through Dask graph execution

    cluster = LocalCluster()

    with Client(cluster) as client:
        im_ser = client.compute(
            db.from_sequence([None, None, None]).map(get_imageobject),
            sync=True
        )

    print(np.all(im == im_ser))

imageobject.py

import wrapt
import dill

from typing import Any, Dict, Tuple, List, Union
from distributed.protocol.serialize import dask_serialize, dask_deserialize

class ImageObject(wrapt.ObjectProxy):
    def __init__(self, object_to_wrap: Any):
        super().__init__(object_to_wrap)

    def serialize(self):
        print('ser')

        obj_dict = dict(
            dataobject_type=type(self),
            object_to_wrap=self.__wrapped__,
        )

        return dill.dumps(obj_dict)

    @classmethod
    def deserialize(cls, serialized_dict: Union[bytes, str]):
        print('deser')

        obj_dict = dill.loads(serialized_dict)

        dataobject_type = obj_dict['dataobject_type']
        object_to_wrap = obj_dict['object_to_wrap']

        return dataobject_type(object_to_wrap)

def dask_serialize_imageobject(dataobject: ImageObject) -> Tuple[Dict, List[bytes]]:
    header = {}
    frames = [dataobject.serialize()]

    return header, frames

def dask_deserialize_imageobject(header: Dict, frames: List[bytes]) -> ImageObject:
    return ImageObject.deserialize(frames[0])

dask_serialize.register(ImageObject)(dask_serialize_imageobject)
dask_deserialize.register(ImageObject)(dask_deserialize_imageobject)

Execution output:

/scratch/anaconda3/envs/beads/bin/python /data/dtk-pipeline/scripts/test_dask_serialization.py
ser
deser
True
distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
  File "/scratch/anaconda3/envs/beads/lib/python3.7/site-packages/distributed/protocol/core.py", line 132, in loads
    value = _deserialize(head, fs, deserializers=deserializers)
  File "/scratch/anaconda3/envs/beads/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 184, in deserialize
    return loads(header, frames)
  File "/scratch/anaconda3/envs/beads/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 81, in serialization_error_loads
    raise TypeError(msg)
TypeError: Could not serialize object of type list.
Traceback (most recent call last):
  File "/scratch/anaconda3/envs/beads/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 38, in dumps
    result = pickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
TypeError: can't pickle ImageObject objects

Which seems to just keep popping up in an endless loop.

mrocklin commented 5 years ago

The dumps function expects a message where specially-serializable objects are called out with the to_serialize function like the following

from distributed.protocol import to_serialize
msg = {'x': to_serialize(123)}

Everything not highlighted with to_serialize is just sent through with normal non-fancy serialization.

Internally Dask applies to_serialize to every piece of user data that is appropriate, so normally you don't need to care.

If you're just writing a test for your serialization function then I recommend using the distributed.protocol.serialize and distributed.protocol.deserialize functions instead of dumps/loads, which handle more complex compound objects.

zbarry commented 5 years ago

Hmm, not sure if I completely follow, but if I do, I should mention that my (flawed) check for serialization passes. It's the .compute() line that fails.

zbarry commented 5 years ago

I just created a fresh environment with:

conda install -c defaults -c conda-forge dask distributed pip install dill

And my new error that pops up is:

Traceback (most recent call last):
  File "/data/dtk-pipeline/scripts/test_dask_serialization.py", line 29, in <module>
    db.from_sequence([None, None, None]).map(get_imageobject).compute(sync=True)
  File "/scratch/anaconda3/envs/pipe/lib/python3.7/site-packages/dask/base.py", line 156, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/scratch/anaconda3/envs/pipe/lib/python3.7/site-packages/dask/base.py", line 397, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/scratch/anaconda3/envs/pipe/lib/python3.7/site-packages/dask/multiprocessing.py", line 192, in get
    raise_exception=reraise, **kwargs)
  File "/scratch/anaconda3/envs/pipe/lib/python3.7/site-packages/dask/local.py", line 501, in get_async
    raise_exception(exc, tb)
  File "/scratch/anaconda3/envs/pipe/lib/python3.7/site-packages/dask/compatibility.py", line 111, in reraise
    raise exc.with_traceback(tb)
  File "/scratch/anaconda3/envs/pipe/lib/python3.7/site-packages/dask/local.py", line 274, in execute_task
    result = dumps((result, id))
  File "/scratch/anaconda3/envs/pipe/lib/python3.7/site-packages/dask/multiprocessing.py", line 26, in _dumps
    return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
  File "/scratch/anaconda3/envs/pipe/lib/python3.7/site-packages/cloudpickle/cloudpickle.py", line 931, in dumps
    cp.dump(obj)
  File "/scratch/anaconda3/envs/pipe/lib/python3.7/site-packages/cloudpickle/cloudpickle.py", line 284, in dump
    return Pickler.dump(self, obj)
  File "/scratch/anaconda3/envs/pipe/lib/python3.7/pickle.py", line 437, in dump
    self.save(obj)
  File "/scratch/anaconda3/envs/pipe/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/scratch/anaconda3/envs/pipe/lib/python3.7/pickle.py", line 771, in save_tuple
    save(element)
  File "/scratch/anaconda3/envs/pipe/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/scratch/anaconda3/envs/pipe/lib/python3.7/pickle.py", line 816, in save_list
    self._batch_appends(obj)
  File "/scratch/anaconda3/envs/pipe/lib/python3.7/pickle.py", line 843, in _batch_appends
    save(tmp[0])
  File "/scratch/anaconda3/envs/pipe/lib/python3.7/pickle.py", line 524, in save
    rv = reduce(self.proto)
NotImplementedError: object proxy must define __reduce_ex__()

Conda env:

# Name                    Version                   Build  Channel
blas                      1.0                         mkl  
bokeh                     1.0.4                    py37_0  
bzip2                     1.0.6             h14c3975_1002    conda-forge
ca-certificates           2018.11.29           ha4d7672_0    conda-forge
certifi                   2018.11.29            py37_1000    conda-forge
click                     7.0                      py37_0  
cloudpickle               0.6.1                    py37_0  
cytoolz                   0.9.0.1          py37h14c3975_1  
dask                      1.0.0                      py_0    conda-forge
dask-core                 1.0.0                    py37_0  
dill                      0.2.8.2                   <pip>
distributed               1.25.2                py37_1000    conda-forge
docopt                    0.6.2                     <pip>
freetype                  2.9.1                h8a8886c_1  
heapdict                  1.0.0                    py37_2  
intel-openmp              2019.1                      144  
jinja2                    2.10                     py37_0  
jpeg                      9b                   h024ee3a_2  
jsonpickle                0.9.6                     <pip>
libedit                   3.1.20170329         h6b74fdf_2  
libffi                    3.2.1                hd88cf55_4  
libgcc-ng                 8.2.0                hdf63c60_1  
libgfortran-ng            7.3.0                hdf63c60_0  
libpng                    1.6.36               hbc83047_0  
libstdcxx-ng              8.2.0                hdf63c60_1  
libtiff                   4.0.9                he85c1e1_2  
locket                    0.2.0                    py37_1  
markupsafe                1.1.0            py37h7b6447c_0  
mkl                       2019.1                      144  
mkl_fft                   1.0.10           py37ha843d7b_0  
mkl_random                1.0.2            py37hd81dba3_0  
more-itertools            5.0.0                     <pip>
msgpack-python            0.5.6            py37h6bb024c_1  
munch                     2.3.2                     <pip>
ncurses                   6.1                  he6710b0_1  
numpy                     1.15.4           py37h7e9f1db_0  
numpy-base                1.15.4           py37hde5b4d6_0  
olefile                   0.46                     py37_0  
openssl                   1.0.2p            h14c3975_1002    conda-forge
packaging                 18.0                     py37_0  
pandas                    0.23.4           py37h04863e7_0  
partd                     0.3.9                    py37_0  
pillow                    5.4.1            py37h34e0f95_0  
pip                       18.1                     py37_0  
psutil                    5.4.8            py37h7b6447c_0  
py-cpuinfo                4.0.0                     <pip>
pyparsing                 2.3.0                    py37_0  
python                    3.7.1             hd21baee_1000    conda-forge
python-dateutil           2.7.5                    py37_0  
pytz                      2018.7                   py37_0  
pyyaml                    3.13             py37h14c3975_0  
readline                  7.0                  h7b6447c_5  
sacred                    0.7.4                     <pip>
setuptools                40.6.3                   py37_0  
six                       1.12.0                   py37_0  
sortedcontainers          2.1.0                    py37_0  
sqlite                    3.26.0               h7b6447c_0  
tblib                     1.3.2                    py37_0  
tk                        8.6.8                hbc83047_0  
toolz                     0.9.0                    py37_0  
tornado                   5.1.1            py37h7b6447c_0  
wheel                     0.32.3                   py37_0  
wrapt                     1.11.0                    <pip>
xz                        5.2.4                h14c3975_4  
yaml                      0.1.7                had09818_2  
zict                      0.1.3                    py37_0  
zlib                      1.2.11               h7b6447c_3  
dhirschfeld commented 5 years ago

This sounds similar to my arrow issue - see:

TL;DR; In that issue it was suggested that #2110 was the underlying cause.

zbarry commented 5 years ago

Little update for clarification re: what that error above is.

NotImplementedError: object proxy must define __reduce_ex__() is referring to an exception purposefully thrown by the wrapt package explicitly when Pickle tries to pickle a wrapt.ObjectProxy instance:

https://github.com/GrahamDumpleton/wrapt/issues/102

In my particular case, I've just implemented __reduce_ex__() myself as a workaround to the issue posted here (but noting in case anyone else stumbles upon this thread and would like to try to get around their similar issue by just implementing Pickle-based custom serialization for their objects).

milesgranger commented 5 years ago

Think this issue is related:

        class Foo:
            """Some class which **cannot** be pickled"""
            def __init__(self, bar):
                self.bar = bar

            def __setstate__(self, state):
                raise ValueError('Seriously, I cannot be pickled!')

        @dask_serialize.register(Foo)
        def special_serializer(x, *args, **kwargs):
            # ... magic way of serializing Foo into List[bytes]
            return {'serializer': 'special_serde'}, serialized_foo

        @dask_deserialize.register(Foo)
        def special_deserializer(header, frames):
            # ... magic way of deserializing into Foo
            return deserialized_foo

        register_serialization_family('special_serde', special_serializer, special_deserializer)
        client = Client(serializers=['dask', 'special_serde'], deserializers=['dask', 'special_serde'], processes=False)

        @delayed
        def some_func(_foo):
            return 1 + 1

        val = some_func(Foo(2))
        val.compute()

Will always raise the ValueError set in Foo. I think I have followed the example here pretty closely. swapping between extending the dask family, or registering my own with register_serialization_family or not, or both, or different permutations of (de)serializers using dask, msgpack, etc.

Thanks!

calebho commented 5 years ago

@milesgranger I've been having the same problem, so after a bit of searching I believe this section in the docs implies you cannot define custom serialization for "computational tasks":

There are three kinds of messages passed through the Dask network:

  1. Small administrative messages like “Worker A has finished task X” or “I’m running out of memory”. These are always serialized with msgpack.
  2. Movement of program data, such as Numpy arrays and Pandas dataframes. This uses a combination of pickle and custom serializers and is the topic of the next section
  3. Computational tasks like f(x) that are defined and serialized on client processes and deserialized and run on worker processes. These are serialized using a fixed scheme decided on by those libraries. Today this is a combination of pickle and cloudpickle.

It's unclear to me in (3) whether the computation task refers to just f or f and its argument x. If just f, then what is responsible for serializing x? If f and x, how would one customize their serialization?

TomAugspurger commented 5 years ago

@mrocklin do you have thoughts on this? It seems like https://github.com/dask/distributed/issues/2110 is the root cause?

mrocklin commented 5 years ago

Not for the issue in the original post. That's not sending ImageObjects from the client to the workers by passing them in as parameters as in #2110 , it's creating them on workers and then passing them between workers, which should be clean as far as I know.

Maybe there's an issue in that they're wrapped in lists/tuples due to Dask Bag? I would need to check to see how we serialize lists and tuples, and whether or not we traverse into them (I wouldn't be surprised either way).

I'm personally unlikely to look into this, at least for the next few weeeks.

mcguipat commented 5 years ago

@milesgranger I've been having the same problem, so after a bit of searching I believe this section in the docs implies you cannot define custom serialization for "computational tasks":

There are three kinds of messages passed through the Dask network:

  1. Small administrative messages like “Worker A has finished task X” or “I’m running out of memory”. These are always serialized with msgpack.
  2. Movement of program data, such as Numpy arrays and Pandas dataframes. This uses a combination of pickle and custom serializers and is the topic of the next section
  3. Computational tasks like f(x) that are defined and serialized on client processes and deserialized and run on worker processes. These are serialized using a fixed scheme decided on by those libraries. Today this is a combination of pickle and cloudpickle.

It's unclear to me in (3) whether the computation task refers to just f or f and its argument x. If just f, then what is responsible for serializing x? If f and x, how would one customize their serialization?

@mrocklin It'd be great to get clarification on this question. This seems to have been raised in several issues and I have yet to find clarification as to whether or not custom serialization is intended to be applied to the arguments of a task. @milesgranger example is quite relevant.

mrocklin commented 5 years ago

I think that applying custom serialization/deserialization makes sense in many cases for arguments of a task. I don't think that it happens today. I think that one would have to be careful because there are likely common cases where this would disrupt performance significantly. It may still be worth it though. Maybe you should raise an issue for thisin particular so that it doesn't keep coming up in scattered places?

mcguipat commented 5 years ago

@mrocklin I've done as you suggested: https://github.com/dask/distributed/issues/2953