dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.57k stars 716 forks source link

Custom Serialization for Task Args #2953

Open mcguipat opened 5 years ago

mcguipat commented 5 years ago

The arguments of a task submitted to the scheduler are currently serialized using pickle and will not use any custom serialization ( warn_dumps ⟶ pickle.dumps https://github.com/dask/distributed/issues/2110#issuecomment-405069639). This is also demonstrated by the below example.

        class Foo:
            """Some class which **cannot** be pickled"""
            def __init__(self, bar):
                self.bar = bar

            def __setstate__(self, state):
                raise ValueError('Seriously, I cannot be pickled!')

        @dask_serialize.register(Foo)
        def special_serializer(x, *args, **kwargs):
            # ... magic way of serializing Foo into List[bytes]
            return {'serializer': 'special_serde'}, serialized_foo

        @dask_deserialize.register(Foo)
        def special_deserializer(header, frames):
            # ... magic way of deserializing into Foo
            return deserialized_foo

        register_serialization_family('special_serde', special_serializer, special_deserializer)
        client = Client(serializers=['dask', 'special_serde'], deserializers=['dask', 'special_serde'], processes=False)

        @delayed
        def some_func(_foo):
            return 1 + 1

        val = some_func(Foo(2))
        val.compute()

Will always raise the ValueError set in Foo. Originally posted by @milesgranger in https://github.com/dask/distributed/issues/2469#issuecomment-457245041

jakirkham commented 5 years ago

cc @mrocklin (since it looks like you requested this issue be raised)

mrocklin commented 5 years ago

This was my resposne:

I think that applying custom serialization/deserialization makes sense in many cases for arguments of a task. I don't think that it happens today. I think that one would have to be careful because there are likely common cases where this would disrupt performance significantly. It may still be worth it though.

mcguipat commented 5 years ago

This was my resposne:

I think that applying custom serialization/deserialization makes sense in many cases for arguments of a task. I don't think that it happens today. I think that one would have to be careful because there are likely common cases where this would disrupt performance significantly. It may still be worth it though.

What specifically would you see disrupting performance significantly? This would be the cascading lookup to detect serialization overrides on each argument? So long as there is not a performance disruption in the case where there are no overrides present, it's really up to the user if they want to incur the overhead of applying overrides. I would think this is possible to accomplish.

mrocklin commented 5 years ago

Right, so I think that we agree that there are two important cases here:

future = client.submit(func, my_big_object)  # want to serialize separately

future = client.submit(func, 123)  # don't want to serialize separately

So we would need a clear and generic way to differentiate one from the other that works under most contexts.

We do this currently by checking nbytes(arg) I think (though I would have to check to verify).

in the case where there are no overrides present

It's not entirely clear to me how to check for this. There are a few different seriailization families. Also you might (?) want to handle nesting within tuples/lists/dicts.

We would also want to apply this uniformly across the various APIs, like submit (shown above) and also dask collections like array/delayed/dataframe.