dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.56k stars 715 forks source link

Getting `concurrent.futures._base.CancelledError` from simple binary tree built from futures #4612

Open cjnolet opened 4 years ago

cjnolet commented 4 years ago

I'm getting the following exception from a binary tree that I'm building from futures (based largely on an example in the dask docs that uses delayed instead of futures).

Here's the function in question:

from dask.distributed import default_client
from toolz import first

def tree_reduce(objs, func=sum):

    while len(objs) > 1:
        new_objs = []
        n_objs = len(objs)
        for i in range(0, n_objs, 2):
            inputs = objs[i:i + 2]
            obj = get_client().submit(func, inputs)
            new_objs.append(obj)
        wait(new_objs)
        objs = new_objs

    return first(objs)

And my reproducible test:

    n_parts = 15
    client = Client(cluster)

    a = client.scatter(range(n_parts))
    b = tree_reduce(a)
    b = b.result()

    assert(sum(range(n_parts)) == b)

The exception is intermittent and happens about 50% of the time:

self = <tornado.gen.Runner object at 0x7f2cf6ce5710>

    def run(self) -> None:
        """Starts or resumes the generator, running until it reaches a
        yield point that is not ready.
        """
        if self.running or self.finished:
            return
        try:
            self.running = True
            while True:
                future = self.future
                if future is None:
                    raise Exception("No pending future")
                if not future.done():
                    return
                self.future = None
                try:
                    exc_info = None

                    try:
>                       value = future.result()
E                       concurrent.futures._base.CancelledError

I've tried doing a dask.distributed.wait after each level of the tree. I've also tried waiting for b in the reproducible example before calling result. I'm completely stumped as to why this is happening. I'm running this on a workstation with 2x V100s.

I believe I'm doing something very wrong but I can't figure out what it is.

mrocklin commented 4 years ago

Thanks for the issue @cjnolet . I'm sorry to hear that you've had a frustrating time.

If you can provide a minimal reproducible example I'd be happy to run through it and see if I can reproduce the problem. You're really close to this with your current code, but it looks like you have a custom function func in there, it's not clear how you're creating a cluster (hopefully a bare LocalCluster() will do), and there are also missing imports. Anything you can do to make this process copy-pastable would be welcome.

You mention that you are using two V100s. Is it necessary to use GPUs to reproduce the problem? If so, I might divert you to RAPIDS folks.

cjnolet commented 4 years ago

Absolutely, @mrocklin. Actually, I am using the LocalCUDACluster but this is a simple pytest that's not storing any data on GPUs. I just verified that it is reproducible with a LocalCluster

Oddly enough, this issue seems exclusive to the use of client.scatter. This example seems to work fine:

    from dask_cuda import LocalCUDACluster
    from dask.distributed import Client

    cluster = LocalCUDACluster(protocol="tcp")
    client = Client(cluster)

    def s(x):
        return x

    n_parts = 15
    a = [client.submit(s, i) for i in range(n_parts)]
    b = tree_reduce(a)
    b = b.result()

    print(str(b))

    assert(sum(range(n_parts)) == b)

This one seems to fail intermittently:

    from dask_cuda import LocalCUDACluster
    from dask.distributed import Client

    cluster = LocalCUDACluster(protocol="tcp")
    client = Client(cluster)

    n_parts = 15
    a = client.scatter(range(n_parts))
    b = tree_reduce(a)
    b = b.result()

    print(str(b))

    assert(sum(range(n_parts)) == b)

I have also tried using a different reduction function. The default in the tree_reduce example in the description is the Python built-in sum function but I tried to create my own and still got the intermittent cancelledError exceptions:

    from dask_cuda import LocalCUDACluster
    from dask.distributed import Client

    cluster = LocalCUDACluster(protocol="tcp")
    client = Client(cluster)

    def add(parts): return sum(parts)

    n_parts = 15
    a = client.scatter(range(n_parts))
    b = tree_reduce(a, func=add)
    b = b.result()

    print(str(b))

    assert(sum(range(n_parts)) == b)
mrocklin commented 4 years ago

Thanks @cjnolet . In the future if you're able to provide a copy-pasteable example that anyone can run that would help. For example, if you don't need LocalCUDACluster then please just don't include it in the example. That way people who don't have access to GPUs (like me) can help you without having to manually modify your example. Also, if you're able to provide something that a maintainer can just copy and paste into an IPython session it really helps out a lot. Most of us aren't actually paid to help you with these things, and we go through a lot of issues, so anything you can do to get the time expectation on our end to under a minute is very welcome.

I tried cobbling together a copy-pastable example from your comments below:

from dask.distributed import get_client, wait, Client, LocalCluster
from toolz import first

def tree_reduce(objs, func=sum):

    while len(objs) > 1:
        new_objs = []
        n_objs = len(objs)
        for i in range(0, n_objs, 2):
            inputs = objs[i:i + 2]
            obj = get_client().submit(func, inputs)
            new_objs.append(obj)
        wait(new_objs)
        objs = new_objs

    return first(objs)

cluster = LocalCluster()
client = Client(cluster)

def add(parts): return sum(parts)

n_parts = 15
a = client.scatter(range(n_parts))
b = tree_reduce(a, func=add)
b = b.result()

print(str(b))

assert(sum(range(n_parts)) == b)

Unfortunately it seems to work fine on my machine. I ran this ten times and each time it was fine.

So I think that the next step here is for you to try to provide a copy-pasteable example that a maintainer can run that fails.

(Sorry for the github issue lecture. It's been a busy day.)

cjnolet commented 4 years ago

My apologies, @mrocklin. Thank you for being patient with me.

Here's a slightly modified script that fails for me everytime. It seems the number of workers is important (On my tests, I have only 2 workers and in our environment it's not always trivial to increase the number of workers).

from dask.distributed import get_client, wait, Client, LocalCluster

from toolz import first

#import pytest

def tree_reduce(objs, func=sum):

    while len(objs) > 1:
        new_objs = []
        n_objs = len(objs)
        for i in range(0, n_objs, 2):
            inputs = objs[i:i + 2]
            obj = get_client().submit(func, inputs)
            new_objs.append(obj)
        wait(new_objs)
        objs = new_objs

    return first(objs)

#@pytest.mark.parametrize("n_parts", [1, 2, 5, 10, 15])
def test(n_parts):

    client = Client(cluster)

    def add(parts): return sum(parts)

    a = client.scatter(range(n_parts))
    b = tree_reduce(a, func=add)
    b = b.result()

    print(str(b))
    assert(sum(range(n_parts)) == b)

if __name__ == "__main__":

    cluster = LocalCluster(n_workers=2)

    for i in range(25):

        for n_parts in [1, 2, 5, 10, 15]:
            test(n_parts)
mrocklin commented 4 years ago

Ah, that's perfect. Thank you @cjnolet . I'm able to reproduce. I'll try to take a look at this this weekend.

mrocklin commented 4 years ago

Here is a simpler failure. I've removed the tree reduction.

from dask.distributed import Client, LocalCluster

async def test(n_parts, client):
    a = await client.scatter(range(n_parts), hash=True)
    future = client.submit(sum, a)
    await future

async def f():
    async with Client(n_workers=2, processes=False, asynchronous=True) as client:
        for i in range(25):
            for n_parts in [1, 2, 5, 10, 15]:
                print(n_parts)
                await test(n_parts, client)

if __name__ == "__main__":
    import asyncio
    asyncio.get_event_loop().run_until_complete(f())

I think that the problem here has to do with the scattered futures, which get deleted and recreated in quick succession. There is probably some subtle race condition in how the client and scheduler are counting references. @jacobtomlinson , this might be something that you enjoy digging into. It should give some understanding of internals, and should help out other NVIDIA folks.

cjnolet commented 4 years ago

Thanks @mrocklin! I verified that your new script fails every time when n_parts=2

Our CI always executes our tests against the bleeding edge versions of dask and distributed. It is currently blocked by the sudden presence of CancelledError in several of our algorithms. It seems that these errors started about a week ago and we're seeing the issue even in tests / code that haven't changed recently.

In addition to the cancelledError, I'm also seeing some strange subsequent errors in dask array/cudf from_delayed() that an instance of a cudf.Dataframe was expected but a Future was found instead. Sometimes I'm able to fix these errors by not having a future.result() overwrite a variable that was holding a reference to a future, for example:

This consistently has a cancelled error:

X = [client.submit(...) for x in X]
X = client.compute(X, sync=True)

This errors less frequently:

X = [client.submit(...) for x in X]
X_local = client.compute(X, sync=True)

Based on the example, I'm thinking this workaround is not fixing the issue, but just lowering the chances for the potential race condition.

mrocklin commented 4 years ago

The error that you have uncovered here happens when

  1. We create some futures by scattering data (not normal computations)
  2. We delete those futures
  3. We recreate those exact same futures a millisecond later

I don't have enough information on your other fail cases to know if they are the same unfortunately. If you're not scattering things then it might be something different. In that case I would encourage you to raise another issue with a minimal reproducible example.

quasiben commented 4 years ago

I looked at this and there is a race condition but I am not sure that we can easily resolve inside of dask. The race condition being that as we are scattering, we are deleting at the same time. I think there are a few options:

1) when we scatter wait a tick 2) when we scatter call transitions directly (it's not this call but something like, scheduler -- complete your tasks 3) ask user to wait a tick in between scatter/submit calls 4) uniqueness around scattered keys (this could be fairly limiting for other tasks but would avoid collisions)

mrocklin commented 4 years ago

Dask should be robust to this. We shouldn't rely on users to be careful here.

I think that someone needs to look deeply at how we currently count references and send task-released messages around. I think that @jacobtomlinson would be a good person for this if he has time.

On Wed, Mar 25, 2020 at 1:17 PM Benjamin Zaitlen notifications@github.com wrote:

I looked at this and there is a race condition but I am not sure that we can easily resolve inside of dask. The race condition being that as we are scattering, we are deleting at the same. I think there are a few options:

  1. when we scatter wait a tick
  2. when we scatter call transitions directly (it's not this call but something like, scheduler -- complete your tasks
  3. ask user to wait a tick in between scatter/submit calls
  4. uniqueness around scattered keys (this could be fairly limiting for other tasks but would avoid collisions)

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/dask/issues/6027#issuecomment-604063575, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTGY4SBYUIDQ6QCDTHLRJJRGLANCNFSM4LP3ZN2Q .

mrocklin commented 4 years ago

In other words. This isn't something that we want to work around or avoid. This is something that we should dive into. Diving into it is hard though and will require some previous experience with the internals of the distributed scheduler.

On Wed, Mar 25, 2020 at 1:25 PM Matthew Rocklin mrocklin@gmail.com wrote:

Dask should be robust to this. We shouldn't rely on users to be careful here.

I think that someone needs to look deeply at how we currently count references and send task-released messages around. I think that @jacobtomlinson would be a good person for this if he has time.

On Wed, Mar 25, 2020 at 1:17 PM Benjamin Zaitlen notifications@github.com wrote:

I looked at this and there is a race condition but I am not sure that we can easily resolve inside of dask. The race condition being that as we are scattering, we are deleting at the same. I think there are a few options:

  1. when we scatter wait a tick
  2. when we scatter call transitions directly (it's not this call but something like, scheduler -- complete your tasks
  3. ask user to wait a tick in between scatter/submit calls
  4. uniqueness around scattered keys (this could be fairly limiting for other tasks but would avoid collisions)

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/dask/issues/6027#issuecomment-604063575, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTGY4SBYUIDQ6QCDTHLRJJRGLANCNFSM4LP3ZN2Q .

jacobtomlinson commented 4 years ago

I am looking at this now. Although I'm feeling a little in at the deep end with this. Some pointers to where I should start looking would be appreciated.

mrocklin commented 4 years ago

Yeah, it's non-trivial. I would probably start by printing out or otherwise recording every time the client and scheduler think that the task has been created or cancelled. I think that by looking through that list that you'll be able to eventually spot something fishy.

On Wed, Mar 25, 2020 at 1:27 PM Jacob Tomlinson notifications@github.com wrote:

I am looking at this now. Although I'm feeling a little in at the deep end with this. Some pointers to where I should start looking would be appreciated.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/dask/issues/6027#issuecomment-604068965, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTDL2CQ2LHHZP6NQHKTRJJSMDANCNFSM4LP3ZN2Q .

jacobtomlinson commented 4 years ago

I've created another further simplified reproducer.

from dask.distributed import Client

async def f():
    async with Client(
        n_workers=1, processes=False, asynchronous=True, silence_logs=False
    ) as client:

        d = [0, 1]
        a = await client.scatter(d, hash=True)
        print(await client.submit(sum, a))

        del a

        d = [1, 2]
        a = await client.scatter(d, hash=True)
        print(await client.submit(sum, a))

if __name__ == "__main__":
    import asyncio

    asyncio.get_event_loop().run_until_complete(f())

If we scatter some data and delete it, but then immediately scatter some more data with a common value and try to operate on it we see the CancelledError.

This is because the following chain of events happen:

The issue seems to be a result of the time delay between the client deleting a future and the worker deleting the data. The data is recreated during that time.

I feel like the logic here is reasonable. If the user deletes some data it should be deleted. If they recreate it again then they pay a performance penalty compared with not deleting it.

Perhaps one solution to this would be to introduce some reference counting for keys on the worker. When a key is created in Worker.update_data a count should be incremented. And then when it is deleted in Worker.delete_data it should be decremented and deleted only if the count gets to zero.

I'll start working on a PR for this.

dkhokhlov commented 3 years ago

Getting the same concurrent.futures._base.CancelledError on LocalCluster. I am using client.run() to run a function on all workers in sync mode. Error is intermittent, repro 1 in 10 runs. What is nature of this error?

13:03:59 Traceback (most recent call last): 13:03:59 File "xxxxxxxxxx/lib/python3.7/site-packages/distributed/comm/core.py", line 298, in _ 13:03:59 write = await asyncio.wait_for(comm.write(local_info), 1) 13:03:59 File "xxxxxxxxxx/lib/python3.7/asyncio/tasks.py", line 435, in wait_for 13:03:59 await waiter 13:03:59 concurrent.futures._base.CancelledError

latest dask release from conda

martindurant commented 3 years ago

Moved to distributed. This seems to still be an issue.

mvashishtha commented 1 year ago

@jacobtomlinson thank you for looking into the bug.

This seems like a really serious bug. Right now it's causing tests I'm adding to Modin on dask to fail often (but not always). Dask users should be able to delete data with a given value and re-store data with the same value.

I'll start working on a PR for this.

Is there any workaround? What happened to #3641?

mvashishtha commented 1 year ago

Right now it's causing tests I'm adding to Modin on dask to fail often (but not always).

It turns out I should have been scattering my objects with hash=False. Once I started doing that, I worked around the bug.

Still, the bug with hash=True seems pretty bad.

jacobtomlinson commented 1 year ago

Yeah there hasn't been much activity here for a couple of years. #3641 wasn't an ideal solution and we ended up not going with it.

It's interesting that not many folks are pinging the issue of giving a 👍, maybe most folks are working around it or not running into it. I'm guessing this is why it hasn't been resolved yet.

cc @grainger @gjoseph92 @crusaderky @hendrikmakait in case they have any interest in this

gjoseph92 commented 1 year ago

I just tried @jacobtomlinson's reproducer from https://github.com/dask/distributed/issues/4612#issuecomment-604417345 and it still fails.

2022-08-01 15:54:39,440 - distributed.scheduler - INFO - State start
2022-08-01 15:54:39,443 - distributed.scheduler - INFO - Clear task state
2022-08-01 15:54:39,444 - distributed.scheduler - INFO -   Scheduler at: inproc://192.168.0.16/3895/1
2022-08-01 15:54:39,444 - distributed.scheduler - INFO -   dashboard at:            localhost:8787
2022-08-01 15:54:39,454 - distributed.worker - INFO -       Start worker at: inproc://192.168.0.16/3895/4
2022-08-01 15:54:39,454 - distributed.worker - INFO -          Listening to:         inproc192.168.0.16
2022-08-01 15:54:39,454 - distributed.worker - INFO -          dashboard at:         192.168.0.16:50082
2022-08-01 15:54:39,454 - distributed.worker - INFO - Waiting to connect to: inproc://192.168.0.16/3895/1
2022-08-01 15:54:39,454 - distributed.worker - INFO - -------------------------------------------------
2022-08-01 15:54:39,454 - distributed.worker - INFO -               Threads:                         16
2022-08-01 15:54:39,454 - distributed.worker - INFO -                Memory:                  32.00 GiB
2022-08-01 15:54:39,454 - distributed.worker - INFO -       Local Directory: /var/folders/rs/wdnmv5lj02x7sh19rg3nyfyr0000gn/T/dask-worker-space/worker-rpvr7b4y
2022-08-01 15:54:39,454 - distributed.worker - INFO - -------------------------------------------------
2022-08-01 15:54:39,468 - distributed.scheduler - INFO - Register worker <WorkerState 'inproc://192.168.0.16/3895/4', name: 0, status: init, memory: 0, processing: 0>
2022-08-01 15:54:39,469 - distributed.scheduler - INFO - Starting worker compute stream, inproc://192.168.0.16/3895/4
2022-08-01 15:54:39,469 - distributed.core - INFO - Starting established connection
2022-08-01 15:54:39,470 - distributed.worker - INFO -         Registered to: inproc://192.168.0.16/3895/1
2022-08-01 15:54:39,470 - distributed.worker - INFO - -------------------------------------------------
2022-08-01 15:54:39,470 - distributed.core - INFO - Starting established connection
2022-08-01 15:54:39,473 - distributed.scheduler - INFO - Receive client connection: Client-89bac764-11e4-11ed-8f37-acde48001122
2022-08-01 15:54:39,474 - distributed.core - INFO - Starting established connection
1
2022-08-01 15:54:39,686 - distributed.scheduler - INFO - User asked for computation on lost data, sum-7b273f79c27b78e8b6f23bc2fa389c28
2022-08-01 15:54:39,698 - distributed.scheduler - INFO - Remove client Client-89bac764-11e4-11ed-8f37-acde48001122
2022-08-01 15:54:39,698 - distributed.scheduler - INFO - Remove client Client-89bac764-11e4-11ed-8f37-acde48001122
2022-08-01 15:54:39,699 - distributed.scheduler - INFO - Close client connection: Client-89bac764-11e4-11ed-8f37-acde48001122
2022-08-01 15:54:39,701 - distributed.worker - INFO - Stopping worker at inproc://192.168.0.16/3895/4
2022-08-01 15:54:39,702 - distributed.scheduler - INFO - Remove worker <WorkerState 'inproc://192.168.0.16/3895/4', name: 0, status: closing, memory: 0, processing: 0>
2022-08-01 15:54:39,703 - distributed.core - INFO - Removing comms to inproc://192.168.0.16/3895/4
2022-08-01 15:54:39,703 - distributed.scheduler - INFO - Lost all workers
2022-08-01 15:54:39,703 - distributed.worker - INFO - Connection to scheduler broken. Closing without reporting. ID: Worker-6adb788a-61cb-4125-8b34-e95b17dde3cd Address inproc://192.168.0.16/3895/4 Status: Status.closing
2022-08-01 15:54:39,705 - distributed.scheduler - INFO - Scheduler closing...
2022-08-01 15:54:39,705 - distributed.scheduler - INFO - Scheduler closing all comms
Traceback (most recent call last):
  File "/Users/gabe/dev/dask-playground/reuse-futures.py", line 23, in <module>
    asyncio.get_event_loop().run_until_complete(f())
  File "/Users/gabe/.pyenv/versions/3.9.1/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
    return future.result()
  File "/Users/gabe/dev/dask-playground/reuse-futures.py", line 17, in f
    print(await client.submit(sum, a))
  File "/Users/gabe/dev/distributed/distributed/client.py", line 295, in _result
    raise exception
concurrent.futures._base.CancelledError: sum-7b273f79c27b78e8b6f23bc2fa389c28

Notice the User asked for computation on lost data message in the middle: https://github.com/dask/distributed/blob/4b89e26c044da2bd5ceb9367568a27911589cdf7/distributed/scheduler.py#L3885-L3896

That's where the CancelledError is coming from as well.

My gut feeling here is that this is mostly a client-side issue, not with the scheduler or worker. I haven't looked at this for more than 2min so this is just a guess. But this feels familiar to me with how the client gets reference-counting wrong in a concurrent setting, especially when passing from blocking to event-loop code. A guess:

  1. When the del a statement completes, the futures are not actually deleted or dereferenced. At least two event loop cycles still have to happen for that to happen on the client. For them to actually be released on the scheduler, there's then the wait for a BatchedSend interval to pass, network time, the scheduler handing the request, etc.
  2. client-releases-keys (the client->scheduler message) happens over BatchedSend, which ensures message ordering. scatter happens over RPC, which is not ordered. So it's probable, on a single machine like this, that the second scatter RPC call will reach the scheduler before the first client-releases-keys message.
  3. scatter on the scheduler doesn't do much validation. It doesn't catch the fact that it's been told to scatter a key that it already knows about, which should be a red flag.
  4. So basically, the second scatter runs, then the first client-releases-keys arrives and deletes the key that we just re-scattered (this is the key problem; the order of these got flipped because they use different RPC mechanisms with different orderings)
  5. submit(sum, a) is guaranteed to happen after client-releases-keys, since they both use batched comms. So at that point, a has just been deleted from the scheduler, then you try to make a new task that depends on it, hence the "User asked for computation on lost data".
maxbane commented 1 year ago

I think I have another (simpler?) reproducer. Posted in #3703, but maybe I should have posted here instead since this issue has more recent activity and they seem to be duplicates.

from dask import distributed
cluster = distributed.LocalCluster(
    n_workers=4, threads_per_worker=2, host="0.0.0.0",
    scheduler_port=0, dashboard_address=':0'
)
client = distributed.Client(cluster)

def nullity(x):
    return None

def identity(x, y):
    return x

for i in range(100):
    # after a few iterations, we'll die with either CancelledError or KilledWorker
    print(f"Iteration {i}")
    y = client.submit(
        nullity,
        client.scatter("smoochies")
    ).result()

    client.submit(
        identity,
        client.scatter("smoochies"),
        y
    ).result()
fjetter commented 1 year ago

I can confirm that @gjoseph92 s analysis is correct and this is an ordering issue. I see two options

  1. Scatter uses the batched comm as well and follows ordering. I'm actually not sure how bad of an idea it is to use the batched comm for the entire data stream. Maybe that's less bad than it sounds. However, it is also entirely possible to just split up the scatter process into two steps. We're doing this already for the direct path, see https://github.com/dask/distributed/blob/aca9a5e75ceb0c66faa6dc6db2f6bfd40ab2e8ee/distributed/client.py#L2400-L2406 when communication to workers is allowed, we are scattering to workers and once they all have their data, we're letting the scheduler know the data exists via the update_data call. Note that the update_data call is also flawed because it still uses an unordered pooled RPC instead of an ordered batched stream, i.e. if we changed update_data to a batched message, this would likely work for direct. We could implement the same thing for indirect by using the same logic but proxying over the scheduler

  2. We could introduce entity tags to the futures and sign every client-wants/releases message, similar to what https://github.com/dask/distributed/pull/7463 introduces for worker<->scheduler communication

fjetter commented 1 year ago

Decoupling scatter_to_workers from update_data as suggested above in 1) indeed resolves the race that triggers the cancelled error but it introduces a similar race condition on worker side resulting in data loss. I think by implementing something like 2.) we can avoid this entire class of race conditions. This would require modifications to client_releases_keys and client_desires_keys but IIUC not on worker side.

ofk123 commented 4 months ago

@mvashishtha, Would you say hash=False is a safe workaround? I see setting client.scatter(..., hash=False) does avoid the CancelledError, at least in the few attempts I tried on our current workflow, which is similar to the MRE below. Would like to be sure about potential flaws.

Reproducer:

from distributed import Client, LocalCluster
cluster=LocalCluster()
client=Client(cluster)

def function1(x):
    return x*2

def function2(x):
    return x*3

def run1(client, arr):
    arr_sc = client.scatter(arr)
    list_of_submitted_tasks = []
    for _ in range(100):
        submitted = client.submit(function1, arr_sc, pure=False)
        list_of_submitted_tasks.append(submitted)
    gathered = client.gather(list_of_submitted_tasks)
    del arr_sc, list_of_submitted_tasks, submitted

def run2(client, arr):
    arr_sc = client.scatter(arr)
    list_of_submitted_tasks = []
    for _ in range(100):
        submitted = client.submit(function2, arr_sc, pure=False)
        list_of_submitted_tasks.append(submitted)
    gathered = client.gather(list_of_submitted_tasks)
    del arr_sc, list_of_submitted_tasks, submitted

for i in range(30):
    arr = np.array([1., 2., 3.])*i
    run1(client, arr)
    run2(client, arr)

Erroring with:

---------------------------------------------------------------------------
CancelledError                            Traceback (most recent call last)
Cell In[86], line 28
     26 arr = np.array([1., 2., 3.])*i
     27 run1(client, arr)
---> 28 run2(client, arr)

Cell In[86], line 22, in run2(client, arr)
     20     submitted = client.submit(function2, arr_sc, pure=False)
     21     list_of_submitted_tasks.append(submitted)
---> 22 gathered = client.gather(list_of_submitted_tasks)
     23 del arr_sc, list_of_submitted_tasks, submitted

File /srv/conda/envs/notebook/lib/python3.10/site-packages/distributed/client.py:2291, in Client.gather(self, futures, errors, direct, asynchronous)
   2289 else:
   2290     local_worker = None
-> 2291 return self.sync(
   2292     self._gather,
   2293     futures,
   2294     errors=errors,
   2295     direct=direct,
   2296     local_worker=local_worker,
   2297     asynchronous=asynchronous,
   2298 )

File /srv/conda/envs/notebook/lib/python3.10/site-packages/distributed/utils.py:339, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    337     return future
    338 else:
--> 339     return sync(
    340         self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    341     )

File /srv/conda/envs/notebook/lib/python3.10/site-packages/distributed/utils.py:406, in sync(loop, func, callback_timeout, *args, **kwargs)
    404 if error:
    405     typ, exc, tb = error
--> 406     raise exc.with_traceback(tb)
    407 else:
    408     return result

File /srv/conda/envs/notebook/lib/python3.10/site-packages/distributed/utils.py:379, in sync.<locals>.f()
    377         future = asyncio.wait_for(future, callback_timeout)
    378     future = asyncio.ensure_future(future)
--> 379     result = yield future
    380 except Exception:
    381     error = sys.exc_info()

File /srv/conda/envs/notebook/lib/python3.10/site-packages/tornado/gen.py:769, in Runner.run(self)
    766 exc_info = None
    768 try:
--> 769     value = future.result()
    770 except Exception:
    771     exc_info = sys.exc_info()

File /srv/conda/envs/notebook/lib/python3.10/site-packages/distributed/client.py:2155, in Client._gather(self, futures, errors, direct, local_worker)
   2153     else:
   2154         raise exception.with_traceback(traceback)
-> 2155     raise exc
   2156 if errors == "skip":
   2157     bad_keys.add(key)

CancelledError: function2-170f5763-5a5a-4d36-8257-5ec2d782149f

My current workaround is to return the future in run1(), and reuse it as an argument in run2(). Something like:

def run1(client, arr, return_scattered=None):
    arr_sc = client.scatter(arr)
    list_of_submitted_tasks = []
    for _ in range(100):
        submitted = client.submit(function1, arr_sc, pure=False)
        list_of_submitted_tasks.append(submitted)
    gathered = client.gather(list_of_submitted_tasks)
    if return_scattered:
        del list_of_submitted_tasks, submitted
        return arr_sc
    del arr_sc, list_of_submitted_tasks, submitted

def run2(client, arr, scattered=None):
    if not scattered:
        arr_sc = client.scatter(arr)
    else:
        arr_sc = scattered
    list_of_submitted_tasks = []
    for _ in range(100):
        submitted = client.submit(function2, arr_sc, pure=False)
        list_of_submitted_tasks.append(submitted)
    gathered = client.gather(list_of_submitted_tasks)
    del arr_sc, list_of_submitted_tasks, submitted

for i in range(30):
    arr = np.array([1., 2., 3.])*i
    arr_sc = run1(client, arr, return_scattered=True)
    run2(client, arr, scattered=arr_sc)

But would be nice to avoid the return.