dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.57k stars 715 forks source link

Error transitioning task from 'processing' to 'memory' when using annotations #5937

Open jacobtomlinson opened 2 years ago

jacobtomlinson commented 2 years ago

What happened:

Tasks which are currently persisting cause annotated tasks to fail.

What you expected to happen:

Annotated and non-annotated tasks should run concurrently.

Minimal Complete Verifiable Example:

Optional: Create a new environment with latest Dask

$ conda create -n daskwait -c conda-forge dask distributed ipython -y

$ conda activate daskwait

$ ipython
import dask
import dask.array as da
from dask.distributed import LocalCluster, Client, Scheduler, Nanny, wait

# Create a scheduler
scheduler = await Scheduler()

# Launch a worker with no resources
nanny = await Nanny(scheduler.address)

# Launch a worker with 1 FOO resources
foo_nanny = await Nanny(scheduler.address, resources={"FOO": 1})

# Connect a  client
client = await Client(scheduler.address, asynchronous=True)

# Create some data
arr = da.random.random((10_000, 10_000), chunks=(1000, 1000)).persist()

# Use the annotated workers for compute
with dask.annotate(resources={'FOO': 1}):
    result = arr.mean().persist()

await wait(result)
distributed.scheduler - ERROR - 'FOO'
Traceback (most recent call last):
  File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/scheduler.py", line 2845, in transition_processing_memory
    _remove_from_processing(self, ts)
  File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/scheduler.py", line 8021, in _remove_from_processing
    state.release_resources(ts, ws)
  File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/scheduler.py", line 3479, in release_resources
    ws._used_resources[r] -= required
KeyError: 'FOO'
distributed.scheduler - ERROR - Error transitioning "('random_sample-7038b7ac935ddd67754c4d9b22877b64', 5, 0)" from 'processing' to 'memory'
Full traceback ```python-traceback distributed.scheduler - INFO - Clear task state distributed.scheduler - INFO - Scheduler at: tcp://10.51.100.43:41979 distributed.scheduler - INFO - dashboard at: :8787 distributed.nanny - INFO - Start Nanny at: 'tcp://10.51.100.43:42819' distributed.worker - INFO - Start worker at: tcp://10.51.100.43:43427 distributed.worker - INFO - Listening to: tcp://10.51.100.43:43427 distributed.worker - INFO - dashboard at: 10.51.100.43:33969 distributed.worker - INFO - Waiting to connect to: tcp://10.51.100.43:41979 distributed.worker - INFO - ------------------------------------------------- distributed.worker - INFO - Threads: 12 distributed.worker - INFO - Memory: 93.04 GiB distributed.worker - INFO - Local Directory: /home/jtomlinson/Scratch/dask-worker-space/worker-4eyrd3zl distributed.worker - INFO - ------------------------------------------------- distributed.scheduler - INFO - Register worker distributed.scheduler - INFO - Starting worker compute stream, tcp://10.51.100.43:43427 distributed.worker - INFO - Registered to: tcp://10.51.100.43:41979 distributed.worker - INFO - ------------------------------------------------- distributed.core - INFO - Starting established connection distributed.core - INFO - Starting established connection distributed.nanny - INFO - Start Nanny at: 'tcp://10.51.100.43:38997' distributed.worker - INFO - Start worker at: tcp://10.51.100.43:33137 distributed.worker - INFO - Listening to: tcp://10.51.100.43:33137 distributed.worker - INFO - dashboard at: 10.51.100.43:35407 distributed.worker - INFO - Waiting to connect to: tcp://10.51.100.43:41979 distributed.worker - INFO - ------------------------------------------------- distributed.worker - INFO - Threads: 12 distributed.worker - INFO - Memory: 93.04 GiB distributed.worker - INFO - Local Directory: /home/jtomlinson/Scratch/dask-worker-space/worker-wpq_e4ft distributed.worker - INFO - ------------------------------------------------- distributed.scheduler - INFO - Register worker distributed.scheduler - INFO - Starting worker compute stream, tcp://10.51.100.43:33137 distributed.worker - INFO - Registered to: tcp://10.51.100.43:41979 distributed.worker - INFO - ------------------------------------------------- distributed.core - INFO - Starting established connection distributed.core - INFO - Starting established connection distributed.scheduler - INFO - Receive client connection: Client-9e96453c-a160-11ec-940c-80e82ccdc37c distributed.core - INFO - Starting established connection dask.array distributed.scheduler - ERROR - 'FOO' Traceback (most recent call last): File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/scheduler.py", line 2845, in transition_processing_memory _remove_from_processing(self, ts) File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/scheduler.py", line 8021, in _remove_from_processing state.release_resources(ts, ws) File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/scheduler.py", line 3479, in release_resources ws._used_resources[r] -= required KeyError: 'FOO' distributed.scheduler - ERROR - Error transitioning "('random_sample-7038b7ac935ddd67754c4d9b22877b64', 5, 0)" from 'processing' to 'memory' Traceback (most recent call last): File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/scheduler.py", line 2277, in _transition recommendations, client_msgs, worker_msgs = func(key, *args, **kwargs) File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/scheduler.py", line 2845, in transition_processing_memory _remove_from_processing(self, ts) File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/scheduler.py", line 8021, in _remove_from_processing state.release_resources(ts, ws) File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/scheduler.py", line 3479, in release_resources ws._used_resources[r] -= required KeyError: 'FOO' distributed.core - ERROR - 'FOO' Traceback (most recent call last): File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/core.py", line 586, in handle_stream handler(**merge(extra, msg)) File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/scheduler.py", line 5466, in handle_task_finished r: tuple = self.stimulus_task_finished(key=key, worker=worker, **msg) File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/scheduler.py", line 4905, in stimulus_task_finished r: tuple = parent._transition(key, "memory", worker=worker, **kwargs) File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/scheduler.py", line 2277, in _transition recommendations, client_msgs, worker_msgs = func(key, *args, **kwargs) File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/scheduler.py", line 2845, in transition_processing_memory _remove_from_processing(self, ts) File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/scheduler.py", line 8021, in _remove_from_processing state.release_resources(ts, ws) File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/scheduler.py", line 3479, in release_resources ws._used_resources[r] -= required KeyError: 'FOO' distributed.scheduler - INFO - Remove worker distributed.core - INFO - Removing comms to tcp://10.51.100.43:43427 distributed.utils - ERROR - 'FOO' Traceback (most recent call last): File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/utils.py", line 681, in log_errors yield File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/scheduler.py", line 4516, in add_worker await self.handle_worker(comm=comm, worker=address) File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/scheduler.py", line 5607, in handle_worker await self.handle_stream(comm=comm, extra={"worker": worker}) File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/core.py", line 586, in handle_stream handler(**merge(extra, msg)) File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/scheduler.py", line 5466, in handle_task_finished r: tuple = self.stimulus_task_finished(key=key, worker=worker, **msg) File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/scheduler.py", line 4905, in stimulus_task_finished r: tuple = parent._transition(key, "memory", worker=worker, **kwargs) File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/scheduler.py", line 2277, in _transition recommendations, client_msgs, worker_msgs = func(key, *args, **kwargs) File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/scheduler.py", line 2845, in transition_processing_memory _remove_from_processing(self, ts) File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/scheduler.py", line 8021, in _remove_from_processing state.release_resources(ts, ws) File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/scheduler.py", line 3479, in release_resources ws._used_resources[r] -= required KeyError: 'FOO' distributed.core - ERROR - Exception while handling op register-worker Traceback (most recent call last): File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/core.py", line 520, in handle_comm result = await result File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/scheduler.py", line 4516, in add_worker await self.handle_worker(comm=comm, worker=address) File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/scheduler.py", line 5607, in handle_worker await self.handle_stream(comm=comm, extra={"worker": worker}) File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/core.py", line 586, in handle_stream handler(**merge(extra, msg)) File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/scheduler.py", line 5466, in handle_task_finished r: tuple = self.stimulus_task_finished(key=key, worker=worker, **msg) File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/scheduler.py", line 4905, in stimulus_task_finished r: tuple = parent._transition(key, "memory", worker=worker, **kwargs) File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/scheduler.py", line 2277, in _transition recommendations, client_msgs, worker_msgs = func(key, *args, **kwargs) File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/scheduler.py", line 2845, in transition_processing_memory _remove_from_processing(self, ts) File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/scheduler.py", line 8021, in _remove_from_processing state.release_resources(ts, ws) File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/scheduler.py", line 3479, in release_resources ws._used_resources[r] -= required KeyError: 'FOO' tornado.application - ERROR - Exception in callback functools.partial(. at 0x7fc6d1304280>, exception=KeyError('FOO')>) Traceback (most recent call last): File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/tornado/ioloop.py", line 741, in _run_callback ret = callback() File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/tornado/tcpserver.py", line 331, in gen.convert_yielded(future), lambda f: f.result() File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/comm/tcp.py", line 530, in _handle_stream await self.comm_handler(comm) File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/core.py", line 520, in handle_comm result = await result File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/scheduler.py", line 4516, in add_worker await self.handle_worker(comm=comm, worker=address) File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/scheduler.py", line 5607, in handle_worker await self.handle_stream(comm=comm, extra={"worker": worker}) File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/core.py", line 586, in handle_stream handler(**merge(extra, msg)) File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/scheduler.py", line 5466, in handle_task_finished r: tuple = self.stimulus_task_finished(key=key, worker=worker, **msg) File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/scheduler.py", line 4905, in stimulus_task_finished r: tuple = parent._transition(key, "memory", worker=worker, **kwargs) File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/scheduler.py", line 2277, in _transition recommendations, client_msgs, worker_msgs = func(key, *args, **kwargs) File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/scheduler.py", line 2845, in transition_processing_memory _remove_from_processing(self, ts) File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/scheduler.py", line 8021, in _remove_from_processing state.release_resources(ts, ws) File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/scheduler.py", line 3479, in release_resources ws._used_resources[r] -= required KeyError: 'FOO' distributed.worker - INFO - Connection to scheduler broken. Reconnecting... distributed.batched - INFO - Batched Comm Closed Scheduler local=tcp://10.51.100.43:45970 remote=tcp://10.51.100.43:41979> Traceback (most recent call last): File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/batched.py", line 93, in _background_send nbytes = yield self.comm.write( File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/tornado/gen.py", line 762, in run value = future.result() File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/comm/tcp.py", line 247, in write raise CommClosedError() distributed.comm.core.CommClosedError distributed.worker - INFO - ------------------------------------------------- distributed.scheduler - INFO - Unexpected worker completed task. Expected: None, Got: , Key: ('random_sample-7038b7ac935ddd67754c4d9b22877b64', 5, 0) distributed.scheduler - ERROR - 'NoneType' object has no attribute 'address' Traceback (most recent call last): File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/scheduler.py", line 2810, in transition_processing_memory worker_msgs[ts._processing_on.address] = [ AttributeError: 'NoneType' object has no attribute 'address' distributed.scheduler - ERROR - Error transitioning "('random_sample-7038b7ac935ddd67754c4d9b22877b64', 5, 0)" from 'processing' to 'memory' Traceback (most recent call last): File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/scheduler.py", line 2277, in _transition recommendations, client_msgs, worker_msgs = func(key, *args, **kwargs) File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/scheduler.py", line 2810, in transition_processing_memory worker_msgs[ts._processing_on.address] = [ AttributeError: 'NoneType' object has no attribute 'address' distributed.utils - ERROR - 'NoneType' object has no attribute 'address' Traceback (most recent call last): File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/utils.py", line 681, in log_errors yield File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/scheduler.py", line 4450, in add_worker t: tuple = parent._transition( File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/scheduler.py", line 2277, in _transition recommendations, client_msgs, worker_msgs = func(key, *args, **kwargs) File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/scheduler.py", line 2810, in transition_processing_memory worker_msgs[ts._processing_on.address] = [ AttributeError: 'NoneType' object has no attribute 'address' distributed.core - ERROR - Exception while handling op register-worker Traceback (most recent call last): File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/core.py", line 520, in handle_comm result = await result File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/scheduler.py", line 4450, in add_worker t: tuple = parent._transition( File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/scheduler.py", line 2277, in _transition recommendations, client_msgs, worker_msgs = func(key, *args, **kwargs) File "/home/jtomlinson/miniconda3/envs/rapids-22.02/lib/python3.8/site-packages/distributed/scheduler.py", line 2810, in transition_processing_memory worker_msgs[ts._processing_on.address] = [ AttributeError: 'NoneType' object has no attribute 'address' ```

Anything else we need to know?:

If I add a wait to the data generation before moving to the annotated section things work. But this means that we have a synchronisation point here that isn't ideal.

...

# Create some data
arr = da.random.random((10_000, 10_000), chunks=(1000, 1000)).persist()

await wait(arr)

# Use the annotated workers for compute
with dask.annotate(resources={'FOO': 1}):
    result = arr.mean().persist()

await wait(arr)

Environment:

Cluster Dump State:
quasiben commented 2 years ago

I'm able to reproduce but not when I manually create the cluster and separately connect with client

quasiben commented 2 years ago

I think what's happening here is that annotation is initially applied correctly to the dask layers in for mean (@rjzamora showed me a few tricks!). We can see that with the following:

In [42]: arr = da.random.random((10_000, 10_000), chunks=(1000, 1000))
    ...:
    ...: # Use the annotated workers for compute
    ...: with dask.annotate(resources={"FOO": 1}):
    ...:     result = arr.mean()

In [43]: result.dask
Out[43]:
HighLevelGraph with 6 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x10de05e80>
 0. random_sample-c79caaeb508f0375f8313ede30300804
 1. mean_chunk-eeca2912866909c9659bb48928748826
 2. mean_combine-partial-f96108f16300ce4e31953f304bf89e8a
 3. mean_combine-partial-74c4e5e4ebe3ca4d290d1ee469004c03
 4. mean_combine-partial-28ce6f709a0bde77d0d9e92006055c52
 5. mean_agg-aggregate-a395f045817315d8e7bb1b32626d0da6

In [44]: from dask.utils_test import hlg_layer

In [45]: hlg_layer(result.dask, "random_sample").annotations

In [46]: hlg_layer(result.dask, "mean_").annotations
Out[46]: {'resources': {'FOO': 1}}

Note that I've removed the persist calls. Above the annotations are correctly applied only to the mean_ prefix layers and not the random_sample layer. When the persist() calls are added back, we essentially have only one layer

In [50]: arr = da.random.random((10_000, 10_000), chunks=(1000, 1000)).persist()
    ...:
    ...: # Use the annotated workers for compute
    ...: with dask.annotate(resources={"FOO": 1}):
    ...:     result = arr.mean().persist()

In [51]: result.dask
Out[51]:
HighLevelGraph with 1 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x1844bf1f0>
 0. mean_agg-aggregate-188659ca8917dd3c92677f087bd12817

With only one layer and dependencies on random_sample, I think the packing infrastructure will apply the annotation everywhere: https://github.com/dask/dask/blob/5bd2bbabe4923869ef6d95585d93f36684b5e29c/dask/highlevelgraph.py#L1051-L1067

In the above link, self.get_all_external_keys() will return a set of keys including: random_sample and mean_* and the annotation {FOO: 1} will be applied globally

@gjoseph92 or @madsbk does this seem correct to you ? I think this is a bug or at least something we want to avoid

gjoseph92 commented 2 years ago

I'm surprised the annotation even made it to the scheduler without optimization.fuse.active: False https://github.com/dask/dask/issues/7036. I'd expect the fusion that's turning a multi-layer HLG into a single materialized mean_agg-aggregate layer would drop the annotations entirely, not apply them to every task; guess I'm wrong.

Regardless of whatever HLGs are doing, there's clearly a scheduler bug. That KeyError should never be able to happen, no matter how messed up the annotations are. For release_resources to fail in this way, I think that means consume_resources was never called for that worker.

Taking a skim through the work-stealing code, I don't see anywhere in stealing.py that worker resources get consumed when a task gets sent to them. We check that resources are available, but I don't see anywhere that we actually use them up. @fjetter is this correct? Does work-stealing not properly consume worker resources?

Even so, given that you have only 1 worker with the FOO resource, how could a task requiring FOO get stolen onto that worker—there's nowhere else it could run!

Haven't tried running your example yet, just reading code; we'll need to look into this more later.

quasiben commented 2 years ago

That is surprising it's not getting removed given https://github.com/dask/dask/issues/7036 -- I had forgotten that was the case.

I don't think this is a work stealing issue as the layers are incorrectly annotated before it gets to the scheduler, right ? You can see the graph being incorrectly manipulated on the client side before getting to the scheduler (I did this with breakpoints in the client and in dask/highlevelgraph.py

jacobtomlinson commented 2 years ago

Even so, given that you have only 1 worker with the FOO resource, how could a task requiring FOO get stolen onto that worker—there's nowhere else it could run!

I think this is just a result of me condensing the problem into an MRE. In my actual setup I had 2 workers with the annotation, and 10 without it.

rjzamora commented 2 years ago

Just a note that I was able to reproduce this, and the problem goes away when you move the persist outside the dask.annotate(resources={'FOO': 1}) block. When persist is in this block, the annotation is treated as a global annotation (not just a layer-specific annotation).

gjoseph92 commented 2 years ago

When persist is in this block, the annotation is treated as a global annotation

That's really interesting. Is that intentional, or maybe a HLG bug where during the optimization process, a final materialized HLG layer gets instantiated, which automatically picks up the config.set(annotations=...) and assigns them to the whole materialized layer?

In my actual setup I had 2 workers with the annotation

This would make much more sense to explain my theory about a work-stealing bug.


I think there are two orthogonal problems here:

  1. Something related to HLG annotations in dask/dask. That's what @quasiben was more focused on.
  2. distributed.scheduler.release_resources should never be failing with KeyError, regardless of what annotations dask/dask throws at it. This is a dask/distributed bug. It may happen to be triggered by whatever's going on with HLG annotations, but I believe it's a separate bug. And given @jacobtomlinson's update, I think I can see a way it could be caused by work-stealing.
rjzamora commented 2 years ago

That's really interesting. Is that intentional, or maybe a HLG bug where during the optimization process, a final materialized HLG layer gets instantiated, which automatically picks up the config.set(annotations=...) and assigns them to the whole materialized layer?

This feels like a bug, but it actually seems to be the "expected" behavior (and I don't think it is related to optimization). When you call persist, the graph eventually makes its way to _graph_to_futures on the client. As Ben pointed out, the config annotations (which will currently include resources={"FOO": 1}) are merged into the global HLG annotations here. These global annotations are then packed with every layer in dask_distributed_pack.