dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.56k stars 712 forks source link

The buffer of embedded numpy variables is deep-copied in client->scheduler comms #8608

Open crusaderky opened 2 months ago

crusaderky commented 2 months ago

import distributed
import numpy as np

if __name__ == "__main__":
    with distributed.Client(n_workers=2, processes=False, protocol="tcp") as client:
        a, b = client.has_what()
        x = client.submit(np.random.random, 1024, key="x", workers=[a])
        y = client.submit(lambda x: x, x, key="y", workers=[b])
        y.result()

When the buffer reaches distributed.protocol.serialize.pickle_loads, buffers[0] is a bytes object. This causes pickle_loads to deep-copy the buffer in order to honour the writeable flag of the original.

To verify, add at the top of pickle_loads:

print(header["writeable"], [ensure_memoryview(b).readonly for b in buffers])

What's causing me a migraine is:

crusaderky commented 2 months ago

Ok, found the difference. the deep-copy is NOT tripped on the transfer of the task output from a to b; it's the random seed that's sent from the client to scheduler. :facepalm:

crusaderky commented 2 months ago

This issue applies to all embedded variables that are sent from client to scheduler, e.g.

x = client.submit(lambda x: x + 1, np.random.random(1024), key="x", workers=[a])