Closed crusaderky closed 1 year ago
Note: it's been observed that there's occasional overlap between Computation objects. This should not happen, and needs to be treated as a blocker to this ticket.
Here's an example Coiled cluster w/ overlapping computations, though it may be an odd case: https://cloud.coiled.io/clusters/194048/overview?account=dask-engineering
Here's the relevant code: https://github.com/dask/distributed/blob/b83c05572e366fe7b1723c1a3a3957ab53a2f27b/distributed/scheduler.py#L4322-L4327
If the cluster is idle, compute() and submit() create a new Computation. Otherwise, they attach to the last one chronologically. The easiest way to get overlapping computations is to have a task transition from memory to processing/queued/no-worker, e.g. in case of worker crash. When it does, it will retain its original Computation, while in the meantime other computations may have been started.
@fjetter @hendrikmakait how much do you trust self.total_occupancy <= 1e-9
as an indicator of "the cluster is idle"?
Can you think of other use cases where the cluster becomes idle, then there's a new brand new compute() call, and after that an old task resumes accruing activity?
@ntabris @hendrikmakait I don't think we have anything on our dashboard highlighting hard worker restarts (e.g. not graceful shutdown, not calls client.restart()
) and/or recomputed tasks?
The worker crash use case is a bit of a problem. Because on one hand, adding to a computation that is no longer the latest is the correct thing to do in this case. On the other hand, though, we don't have this kind of information with the metrics coming from the heartbeat, so we can only attach to the latest computation.
Another use case for overlapping computations:
>>> x = client.submit(lambda: 1, key="x", resources={"XXX": 1})
>>> y = client.submit(lambda: 1, key="y")
>>> client.cluster.scheduler.computations
deque([<Computation cb97f633-68e3-45d5-9684-3bc670b357ae: Tasks: no-worker: 1>,
<Computation 0562da4e-71a2-4573-80dc-c9a10abedfdd: Tasks: memory: 1>],
maxlen=100)
Again, this logic is sound but is not compatible with metrics apportion.
Yet another way to get overlapping computations: Tasks are associated to computations through their task group. A user could manually submit tasks that end up being detected as the same task group, but are actually submitted separately. This shouldn't happen in dask.array or dask.dataframe.
I don't think we have anything on our dashboard highlighting hard worker restarts (e.g. not graceful shutdown, not calls client.restart()) and/or recomputed tasks?
Not really, no. What are the ways I can tell these occurred?
I don't think we have anything on our dashboard highlighting hard worker restarts (e.g. not graceful shutdown, not calls client.restart()) and/or recomputed tasks?
Not really, no. What are the ways I can tell these occurred?
None of these are nice or intuitive - we could use a dedicated prometheus counter.
Hm, okay, yeah, I don't really want to rely much on indirect signals like that. Those would be pretty hard to query on.
@fjetter @ntabris @hendrikmakait
This conversation has become very fragmented, so I'll recap everything that has been said so far here.
As an intermediate Dask user, I want to
[a] Here, a computation is what a novice to intermediate dask user would intuitively intend as a monolithic group of work; which means
compute()
, or a call to persist()
immediately followed by wait()
; orsubmit()
calls, followed by a wait()
, gather()
, or as_completed()
to wait for all futures; orcompute()
and/or persist
calls, which are awaited at the end all together. The most typical practical use case here is when a user calls persist()
halfway through the graph definition, doesn't wait for it, continues defining the graph on top of it and finally calls compute()
on the final collection.We are currently sending from the workers to the scheduler this information through the heartbeat:
execute
, broken down by task prefix and activity (e.g. unspill, task-cpu, etc.)gather_dep
and get_data
, broken down by activity onlyOn the scheduler, we have Computation
objects, which are aggregation of TaskGroup
s. At the moment, every time the scheduler receives a compute()
, persist()
, or submit()
call from the client,
Computation
and attaches the new TaskGroups to it;Computation
and appends the new TaskGroups to it.Naively, one would think that by construction Computation
objects never overlap, as there must be a moment of complete quiet on the cluster in order to change to a new one.
I believe there are at least three use cases which causes computations to overlap:
The phenomenon of computations overlap is an obstacle to apportioning fine performance metrics to Computation
objects:
Metrics for execute
are currently grouped by task prefix. We could easily change them to be grouped by TaskGroup, at the cost of extra verbosity. However, metrics reach the scheduler via the heartbeat, which may arrive after the TaskGroup has been forgotten. So we'd need to move them from the heartbeat to the batched comms, which would substantially increase the worker->scheduler chatter as we'd have no other option but to send them into disaggregated form.
Metrics for gather_dep
and get_data
can't even be automatically assigned to a TaskGroup due to bundling. I am aware there are some heuristics that pick an arbitrary task of the bundle but I always considered them a ugly hack so I'm not terribly happy to use them for this new feature.
Finally, and also most importantly, a very important piece of the metrics is the idle time. At the moment you can calculate it by hand as idle time = end-to-end elapsed time as seen by the client * number of threads - sum(execute metrics)
but it would be both straightforward and very useful to bake it into the metrics themselves, so that it can be finely broken down:
The moment you have 2+ computations active at the same time, they will steal time from each other. You can trivially apportion idle time to "whatever one computation we're currently busy with right now" but you can't apportion it to a TaskGroup.
I don't have a measure of how much idle time we currently have in coiled/benchmarks. I expect it to be frequently substantial, and heavily use case-dependent.
We plan to introduce a context manager (#7795) which will artificially glue everything together in a single computation. It should be noted that the main motive behind it is to tag computations; the gluing effect is just a nice side-effect. I don't think we should rely on users to add this context manager to their code.
@fjetter suggests we should scrap the scheduler-side machinery that reuses a Computation
if the cluster is not idle. Instead, we should add an ID on the client side, with a somewhat rapidly expiring timeout which is reset at every call to compute
, persist
, or submit
. Once the timeout expires, any further call will be apportioned to a new Computation.
This design would make multitenancy scenarios more readable, at the cost of having massive amounts of "idle" time which can only be explained by looking at all the other computations going on at the same time. It would also, in my opinion, be problematic when the "multitenancy" is actually a single (human) user invoking get_client()
from a seceded task - which I suspect may be a fairly common use case. We could group parent and child together by propagating the "computation id" from original client to worker_client.
Just throwing around a thought: Should all this detailed and analytics-focused functionality even live on the scheduler? The scheduler is both an operational component and the most prone to becoming a bottleneck in large-scale scenarios. Since what we are trying to do here is some mix of collecting and processing traces/metrics, a dedicated component might make more sense here. This could potentially help us address some of the complications we face with the current state.
@fjetter, @hendrikmakait, @ntabris and I had an extensive offline discussion about this. The emerging consensus is that Computations are not fit for purpose - neither for fine performance metrics nor everything else - and should be scrapped and replaced.
This conversation continues on https://github.com/dask/distributed/issues/7860
This ticket will now be closed by apportioning the fine performance metrics to Spans.
Computation objects are commonly used by third-party Scheduler plugins (e.g. Coiled Analytics) to visualize data. They can be crudely defined as everything that happened to the cluster between two moments when it was idle. As there should be only one active at any given time, it makes perfect sense to apportion the metrics to the Computation objects, in addition to the Scheduler's global ever-increasing counter.