Closed mrocklin closed 5 months ago
I believe @madsbk and @jakirkham did a fair amount of work around serialization generally and specifically for HLGs. If we relaxed the requirement for say 1)
how would this make it easier ? I suppose we could remove the __Serialized__
checking in various places -- this is probably the discussion we need to have.
As an aside, have you heard any folks express any security concerns ? I have not
Are these causing havoc? Do you have some examples you can share?
At least the way this was implemented was Dask just creates dict
s and list
s in __dask_distributed_pack__
for example. We do call to_serialize
on objects that need more serialization by Distributed. So the content created for serialization is pretty simple and doesn't leave a lot of room for things falling down.
That said, given we are thinking about how to make things easier, I wonder if it makes sense to have some very simple barebones framework for serialization in Dask that Distributed then extends. One advantage of this is we wouldn't need to import
things like to_serialize
. Another advantage is other local schedulers (using concurrent.future
's based APIs) could potentially benefit from serialization. Just a thought 🙂
I like these constraints, but perhaps they're causing more havoc than they're worth. It might be time to reconsider these constraints and instead allow the client and scheduler to communicate by pickle. I think that this would allow us to remove a lot of currently painful code, and accelerate high level graph work in the future.
I agree, this would remove a lot of complexity. Must importantly, it would remove all the code surrounding the partial-deserialization of the HLG. I think the protocol could be boiled down to: 1) pickle HLG while keeping host/device buffers out-of-band 2) unpickle HLG
I think pickle is a red herring. We could allow the scheduler to un-pickle things and it wouldn't solve our problem—it would just allow us to ignore the problem by being sloppier.
Generating tasks usually requires inlining user data or arbitrary functions into the task-tuples.
This is trivial on the client; you insert the thing into the tuple, then eventually pickle the whole tuple and send it to the scheduler. It remains an opaque binary blob on the scheduler, and eventually gets un-pickled wholesale on the worker.
But if you're generating the tasks on the scheduler, you need the client to send you those user-defined things to get inserted into the graph.
Currently, HLG layers tend to wrap these things in to_serialize
and return them in the ambiguously-defined __dask_distributed_pack__
interface. When received on the scheduler, they're Serialized
objects—opaque binary blobs.
The problem (illustrated in detail here) is that when you stick Serialized
objects into arbitrarily-nested tuples, and then serialize the whole thing using a different serialization method (worker.dumps_task
) or odd conditional logic, and then deserialize the whole thing in the worker comms, and then _maybe_deserialize_task
that, very ambiguous behavior occurs. In some cases, the nested Serialized
objects come out as actual objects; in other cases, they remain Serialized
objects. Since it's not written down anywhere what the expected behavior is, you don't know whether you're dealing with a bug or user error, which makes developing against the API very unproductive.
With this proposal, we'd instead just un-pickle the user-defined things on the scheduler, stick them into the tasks, and then re-serialize the entire task. It would be just like on the client. It certainly would make the logic easier!
But that's just sloppy! We'd be de-serializing things just in order to immediately re-serialize them! We already do sloppy things like this today when transferring spilled-to-disk data (see end of https://github.com/dask/distributed/issues/4424#issuecomment-829743388) and should move away from it. Sure, you might say "we can assume the data being inserted into the graph is small". That may often be true. But it's user input. Users will do things wrong. By passing too large of an argument, or an argument with some oddly slow custom pickle hooks that make an authenticated network call to your database, is it acceptable that a user could lock up the scheduler and destabilize the cluster? "You shouldn't do that" isn't a good answer for stability problems—you just shouldn't be able to to do that.
So I don't think the problem has much to do with pickle. The problem has to do with:
dumps_task
and _deserialize
)Serialize
and Serialized
objects__dask_distributed_pack__
and __dask_distributed_unpack__
interfaces. Why are they not inverses of each other (makes it harder to test)? What are they required or allowed to return? Who is responsible for serialization, and where does it happen?to_serialize
or CallableLazyImport
s, even ending up with pickle.loads
inside tasks because figuring out an alternative is just too annoying)I think the currently-painful code could be resolved by
Serialized
-like objects into other objects. Something in the same spirit as pickle5 (perhaps even just pickle5), where there are two streams (the pickled stuff, and the stuff you encountered while pickling that should be handled out-of-band), and the ability for the first stream to reference things in the second stream by pointers. In our case, it might be a stream of "things to unpack on the scheduler", and "things to not unpack on the scheduler". I actually have a feeling we could leverage pickle5 to do this "partial deserialization" if we wanted.I personally don't care much about the security, versioning, or rewriting arguments. I just care about stability and performance.
I could certainly be convinced that deserializing things on the scheduler only to immediately re-serialize them is very low overhead, and worth it for the simplification of the developer experience.
I just think it's possible to have both a simple developer experience and no unnecessary deserialization if we take the time to define the interfaces and protocols well. But I'm fine if we decide that's not worth the time.
The problem is actually less about nested serialization, and more just about the representation of a task, and the fact that tasks have special serialization logic that's disjoint from normal comms serialization.
Really the core pain point is that HLG code should emit two different representations of its tasks, depending on whether it's running locally or on the scheduler.
(callable, "keyref", literal, (subtask_callable, ["keyref"], ...))
{"function": pickled_bytes, "args": pickled_bytes, "kwargs": pickled_bytes}
and explicitly list the dependencies the tasks.To make things worse, the pickled_bytes
shouldn't contain Serialize
/Serialized
objects (or any form of nested serialization), because the un-pickling code path doesn't know about them. But the canonical way to get data from the client to the scheduler involves it arriving in a Serialized
. So there's pretty much no clean way to insert already-serialized data into the worker's representation of a task, besides pre-picking the bytes on the client in your __dask_distributed_pack__
method and basically handling serialization yourself.
And if you want to use the normal-tuple representation instead (which is also supported on the scheduler; it just gets wrapped in to_serialize
), you have to contend with the fact that any nested Serialized
objects within the task may or may not get unpacked properly when they reach the worker, depending on the level of nesting, order in the collection, etc. And even if you do get this to work, the serialized functions aren't cached like they are in dict-format, so it's less efficient.
Jim, Mads, James and I met this morning. We listed a few requirements:
These requirements could be cleaned up. A lot of them are of the form "we shouldn't be doing this thing that we were doing" instead of saying "the solution should have such and such a property".
I think that the next step here is to see some design proposals on what we could do. Right now, most conversation is of the form "we're currently doing X, which is bad because of Y and Z". I think that these conversations are helpful in highlighting problems. We've also had them for a while now and it's probably time to move on towards constructive plans.
I'll provide a base plan, which I hope others can surplant with better plans.
We continue using non-strict messages. We use msgpack to traverse them. We label known user data with to_serialize
and handle that with msgpack handlers. We accept other kinds of unknown data types with pickle (this is new!) and handle those with msgpack handlers.
This is exactly the same as the current system, except that now we allow for arbitrary objects with pickle. It's fast in the common case, easy to implement (it's mostly what we have today), and allows us to avoid all of the crap that evolved out of HighLevelLayers (which seemed to cause some cascade of serialized data within serialize data which seems unpleasant).
On the open questions it does not provide scheduler security, and chooses loose vs strict specification.
I think that it covers all of the requirements listed above. I think that it allows us to clean up a bunch of crap that evolved out of HLGs. It doesn't achieve the goals of better specification, but will I think result in a net improvement on simplicity.
I'm not pushing for this approach, but I think that it suits our needs.
So if there was a message, like the following:
msg = {
"op": "foo",
"data": [to_serialize(my_numpy_array)],
"object": my_object,
}
msgpack would go through this, pack up the entire thing as it does today, but special handle both msg["data"][0]
with dask serialization (as it does today) as well as msg["object"]
with pickle serialization. Maybe these live in a separate list of buffers (as happens today) or maybe they're in-lined for pickle.
So let's say that the user creates a layer
layer = MyLayer("metadata", 123, my_numpy_array)
msg = {
"op": "update-single-layer",
"layers": [layer],
}
We're not trying to call this thing user data, so the scheduler just gets a complete copy of it unpickled locally. This includes the numpy array, which was handled with pickle rather than Dask serialization (we don't try to traverse and blend between pickle and dask serialization).
This is a little bit dangerous because, as Gabe points out, my_numpy_array
could be big or awkward in some way. My guess is that it's also not possible to both protect the scheduler from user data, and also manage graph construction on the scheduler. My guess is that we'll need user data in order to construct graphs in many cases (or not, if folks can come up with a counter-proposal).
I want to reiterate that I'm not pushing for the plan above. I would like to see designs that are better than that plan. If no designs materialize in, say, a month, then I would recommend that we empower people to go ahead and implement the plan above. (it should be, I think, mostly a net improvement on what we have today).
cc @Kobzol @spirali (in case you have thoughts here given we are starting to rethink serialization a bit)
It seems this issue is no longer just about HLG layer serialization getting to use pickle, or even about HLG serialization specifically, but about the serialization system within distributed in general. Should we update the title and description accordingly? Or maybe make https://github.com/dask/distributed/issues/5581#issuecomment-1005072159 into a new issue?
I don't have enough knowledge about the inner workings of the current Dask version to talk about details, but in general, when we were working on the Dask scheduler written in Rust, there were two broad problems with the communication protocol and serialization that we have faced:
1) The serialization/protocol was too "polymorphic". I suppose that it was caused by "organic growth" of the (de)serialization code over time, which is natural. Tasks were represented in multiple ways, which we were mostly discovering by some new Dask client code crashing our scheduler. It was sometimes difficult to work with and understand things that were serialized in a nested way. It was difficult to reconstruct the messages, since they were containing instructions like "put this object into the first item of a list contained in a field named foo contained in...", which was near impossible to do in a statically typed language, which was also why we had to slightly modify the serialization interface for our needs.
It would be nice if things were represented in an unified way (if possible), without a lot of edge cases. Although I haven't looked at the Dask implementation for some time, so it's possible that some of these points are no longer true.
2) The serialization/protocol was mostly undocumented. This is mostly an orthogonal issue to a specific serialization implementation, but if it will be rewritten in a fundamental way, it might be worth it to properly document its interface and potential gotchas to simplify future efforts of cross-language Dask components and also to make it easier to understand.
In a call @jcrist also mentioned an alternate solution which was "just use pickle everywhere", both for system messages (currently using msgpack) as well as user data (currently using Dask's serialization).
Short term, one way to work towards this would be to switch out Dask serialization to use Pickle everywhere with some custom registered pickle handlers.
From a call today it seemed like a path forward might be the following phases:
Additionally, @jcrist (I think) mentioned that we might want to opt-in to pickle, similar to how we opt-in to dask serialization with the to_serialize
function. There might be a to_pickle
function as well. This would help us to have thing fail quickly and add more structure.
I'll also say that there was increased comfort with the proposal above on the call, at least along the lines of "well it seems easy and it's clearly better than what we have now".
There was still significant interest in having something more rigid/predictable, but that seemed like it would likely take several months to design and build.
We're still open to proposals here, but right now the msgpack+(dask-serialization, pickle) approach is looking more likely.
Would it make sense to move some of the serialization logic to Dask itself? This might make it easier to handle things with msgpack + pickle there. If we need more customization, we could allow for the serialization functionality in Dask to be stubbed out and leverage that in Distributed
Basic question: Is the current plan to allow pickle.loads
to be called on the scheduler (or at least consider such behavior)? I don’t personally have a strong stance on this, but I don’t quite understand the consensus on this detail.
Basic question: Is the current plan to allow
pickle.loads
to be called on the scheduler (or at least consider such behavior)? I don’t personally have a strong stance on this, but I don’t quite understand the consensus on this detail.
That's my understanding from @mrocklin's comments above, but perhaps he can clarify
That's my understanding from @mrocklin's comments above, but perhaps he can clarify
Sorry - I should have followed up on this. Matt confirmed offline that the plan is indeed to consider allowing pickle.loads
on the scheduler (if it gains us enough simplification in HLGs or otherwise). Mads' PR demonstrates one possible solution, and I intend to explore how such a design can simplify HLG packing/unpacking logic.
Note that dask#8672 demonstrates how using pickle (via #5728) would certainly help simplify the existing Layer
definitions in dask/dask (current PR removes ~350 lines of code, and we can certainly simplify further).
2024-02-28 09:25:22,819 - distributed.protocol.pickle - ERROR - Failed to serialize <ToPickle: HighLevelGraph with 1 layers. <dask.highlevelgraph.HighLevelGraph object at 0x7f934c1a4eb0>
. Traceback (most recent call last): File "/home/ubuntu/Mistral/new/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 63, in dumps result = pickle.dumps(x, **dump_kwargs) _pickle.PicklingError: Can't pickle <function generate_answer at 0x7f9528e1c820>: it's not the same object as utility.generate_answer
i am facing above error
2024-02-28 09:25:22,819 - distributed.protocol.pickle - ERROR - Failed to serialize <ToPickle: HighLevelGraph with 1 layers. <dask.highlevelgraph.HighLevelGraph object at 0x7f934c1a4eb0> 0. generate_answer-7a2386c0-ac88-4dc3-bf86-a3caf2a20b76
. Traceback (most recent call last): File "/home/ubuntu/Mistral/new/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 63, in dumps result = pickle.dumps(x, **dump_kwargs) _pickle.PicklingError: Can't pickle <function generate_answer at 0x7f9528e1c820>: it's not the same object as utility.generate_answer
i am facing above error
Same issue. Did you get it resolved?
No, i didn't get the solution yet
Seeing this, too:
File "/scratch/develop/venv/lib/python3.9/site-packages/distributed/protocol/serialize.py", line 379, in serialize
raise TypeError(msg, str_x) from exc
TypeError: ('Could not serialize object of type HighLevelGraph', '<ToPickle: HighLevelGraph with 1 layers.\n<dask.highlevelgraph.HighLevelGraph object at 0x7f017fb5b790>\n 0. 139644510442944\n>')
dask/distributed versions are 2024.2.0
My problem was solved by downgrading dask/distributed to version 2023.3.2, the version before the big change in the way how HighLevelGraphs are serialized. All versions after and including 2023.4.0 fail when facing HighLevelGraphs which serialize to objects larger than 2GB (more or less). The failure is reflected by an error message like this:
Traceback (most recent call last):
File "/scratch/develop/venv/lib/python3.9/site-packages/distributed/comm/utils.py", line 34, in _to_frames
return list(protocol.dumps(msg, **kwargs))
File "/scratch/develop/venv/lib/python3.9/site-packages/distributed/protocol/core.py", line 109, in dumps
frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
File "/scratch/develop/venv/lib/python3.9/site-packages/msgpack/__init__.py", line 36, in packb
return Packer(**kwargs).pack(o)
File "msgpack/_packer.pyx", line 294, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 300, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 297, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 202, in msgpack._cmsgpack.Packer._pack
ValueError: bytes object is too large
The problem seems to be that msgpack is attempting to serialize a huge object in distributed/protocol/core.py:dumps()
, it handles the TypeError case by trying other serializers, but cannot deal with the object too large case.
The too large payload problem should've been addressed by https://github.com/dask/distributed/pull/8507 which has been released in 2024.2.1
Sorry, I still see the error, even with dask/distributed 2024.2.1
2024-03-28 22:15:16,208 - distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
File "/scratch/focht/Issue_xgb_task_graph/venv-dasknew/lib/python3.10/site-packages/distributed/protocol/core.py", line 109, in dumps
frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
File "/scratch/focht/Issue_xgb_task_graph/venv-dasknew/lib/python3.10/site-packages/msgpack/__init__.py", line 36, in packb
return Packer(**kwargs).pack(o)
File "msgpack/_packer.pyx", line 294, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 300, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 297, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 202, in msgpack._cmsgpack.Packer._pack
ValueError: bytes object is too large
The relevant code is in msgpack is here and it fails because ITEM_LIMIT > 2**32 - 1
. The test for your fix #8507 tests for 2**31+1
.
I'm getting a similar pickling issue for a HighLevelGraph with 20 layers and 21890 keys (created from an xarray apply_ufunc) - dask/distributed version 2024.5.2
TypeError Traceback (most recent call last)
Cell In[10], line 4
1 from dask.diagnostics import ProgressBar
3 with ProgressBar():
----> 4 out_ds = out_ds.compute(scheduler='processes')
6 out_ds
File ~/.conda/envs/zeus/lib/python3.10/site-packages/xarray/core/dataarray.py:1178, in DataArray.compute(self, **kwargs)
1153 """Manually trigger loading of this array's data from disk or a
1154 remote source into memory and return a new array.
1155
(...)
1175 dask.compute
1176 """
1177 new = self.copy(deep=False)
-> 1178 return new.load(**kwargs)
File ~/.conda/envs/zeus/lib/python3.10/site-packages/xarray/core/dataarray.py:1146, in DataArray.load(self, **kwargs)
1126 def load(self, **kwargs) -> Self:
1127 """Manually trigger loading of this array's data from disk or a
1128 remote source into memory and return this array.
1129
(...)
1144 dask.compute
1145 """
-> 1146 ds = self._to_temp_dataset().load(**kwargs)
1147 new = self._from_temp_dataset(ds)
1148 self._variable = new._variable
File ~/.conda/envs/zeus/lib/python3.10/site-packages/xarray/core/dataset.py:862, in Dataset.load(self, **kwargs)
859 chunkmanager = get_chunked_array_type(*lazy_data.values())
861 # evaluate all the chunked arrays simultaneously
--> 862 evaluated_data: tuple[np.ndarray[Any, Any], ...] = chunkmanager.compute(
863 *lazy_data.values(), **kwargs
864 )
866 for k, data in zip(lazy_data, evaluated_data):
867 self.variables[k].data = data
File ~/.conda/envs/zeus/lib/python3.10/site-packages/xarray/namedarray/daskmanager.py:86, in DaskManager.compute(self, *data, **kwargs)
81 def compute(
82 self, *data: Any, **kwargs: Any
83 ) -> tuple[np.ndarray[Any, _DType_co], ...]:
84 from dask.array import compute
---> 86 return compute(*data, **kwargs)
File ~/.conda/envs/zeus/lib/python3.10/site-packages/dask/base.py:661, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
658 postcomputes.append(x.__dask_postcompute__())
660 with shorten_traceback():
--> 661 results = schedule(dsk, keys, **kwargs)
663 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
File ~/.conda/envs/zeus/lib/python3.10/site-packages/cloudpickle/cloudpickle.py:1479, in dumps(obj, protocol, buffer_callback)
1477 with io.BytesIO() as file:
1478 cp = Pickler(file, protocol=protocol, buffer_callback=buffer_callback)
-> 1479 cp.dump(obj)
1480 return file.getvalue()
File ~/.conda/envs/zeus/lib/python3.10/site-packages/cloudpickle/cloudpickle.py:1245, in Pickler.dump(self, obj)
1243 def dump(self, obj):
1244 try:
-> 1245 return super().dump(obj)
1246 except RuntimeError as e:
1247 if len(e.args) > 0 and "recursion" in e.args[0]:
TypeError: cannot pickle 'weakref.ReferenceType' object
@yoninachmany I assume you should receive this error message with pretty much every version of dask. At least in your example, you are using processes
to schedule your computation. In this case, we've always had to serialize the graph to send it to the subprocess.
If this did work for you on another version, please open a new ticket, ideally with a reproducer.
About what I'm seeing, it looks like something in your code is using weakrefs which cannot be serialized. I suspect this is some custom code on your end. You can probably figure out which layer is the culprit by doign something like
from distributed.protocol import dumps
for name, layer in out_ds.dask.layers.items():
try:
dumps(layer)
except:
print(f"{name} is the culprit")
Thanks for the pointer - FYI msgpack failing to serialize the 'Blockwise'/'MaterializedLayer' objects in the layer that's vectorizing the function from xr.apply_ufunc and the fallback, cloudpickle, can't pickle a 'weakref.ReferenceType' object
Related to the use of metpy/pint units
cloudpickle, can't pickle a 'weakref.ReferenceType' object
Related to the use of metpy/pint units
The pickle vs cloudpickle thing here is not that important. I suggest to inspect whatever function you're putting into your ufunc and make sure that the objects don't use weakref. Maybe this is a pint thing, I haven't worked much with this.
@efocht sorry for the late reply
Sorry, I still see the error, even with dask/distributed 2024.2.1
I might have a fix for this in https://github.com/dask/distributed/pull/8684 but I still need to run perf tests.
I'm going to close this issue now since this was actually completed more than a year ago in https://github.com/dask/distributed/pull/7564
I understand that there may be follow up issues with this (as we saw above) but I would ask anybody who is encountering issues to open a new one (and link back to this if you like)
Right now serialization of high level graphs is a bit of a mess. It was evolved organically and without much architectural thought. This seems to be a source of technical debt.
A lot of the challenge to serializing high level graphs is in order to keep the scheduler protected in a few ways ...
I like these constraints, but perhaps they're causing more havoc than they're worth. It might be time to reconsider these constraints and instead allow the client and scheduler to communicate by pickle. I think that this would allow us to remove a lot of currently painful code, and accelerate high level graph work in the future.
cc @quasiben and team