dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.57k stars 717 forks source link

User-defined spans #7860

Closed crusaderky closed 1 year ago

crusaderky commented 1 year ago

Dask users expect some sort of grouping of tasks that is more granular than the lifetime of a whole cluster but more coarse than TaskGroup. Currently, this is delivered through Computation objects, which (AFAIK) are used by third parties only (such as Coiled).

A Computation is defined as the union of all TaskGroups that were created since the cluster last became idle. Unlike TaskGroups, which are forgotten as soon as all of their tasks are forgotten, Computations are retained semi-indefinitely in a long fixed-size deque (and retain a strong reference to all the forgotten TaskGroups).

Computations fall short of their intended goal:

I propose scrapping the Computation class and replace it with a Span class. What follows is an early design draft and an invitation to discussion.

High level design

Information can be retrieved from Spans by SchedulerPlugins, like it happens today for Computations. Built-in use of Spans (e.g. by the Bokeh dashboard) is out of scope for the time being.

Usage example

client = Client()

@dask.span("base_collection")
def base_collection():
    a = da.random.random(...)
    return a + 1

with dask.span("my_workflow"):
    a = base_collection()
    with dask.span("step2"):
        b = a.sum()
    c = b * 2

c.compute()

Low level design

dask.span()

The new context manager dask.span is a simple wrapper around dask.annotate. A new annotation span is a tuple of all span tags so far. dask.span appends to the end of it on enter and removes from the end on exit.

In the above example, layers will have the following annotations:

In the unhappy event that optimization strips away all task annotations, either the client or the scheduler will re-add the client id. This is a temporary hack - annotations should never be stripped to begin with.

Server side

Everything about spans can be encapsulated in a module that is separate from distributed.scheduler.

This module defines a new SchedulerPlugin, SpanPlugin, and a standalone class Span.

SpanPlugin contains the structures

Each Span class contains links to its direct children plus arbitrary metadata.

When update_graph() receives a snippet of source code, the SpanPlugin posts it to all leaf Spans that are attached to the tasks.

Unlike Computation objects, Spans don't link to TaskGroups. Instead, individual tasks can contribute to them through SpanPlugin.transition(). This allows recoding, e.g.:

These pieces of information should be each separated into

Fine performance metrics

On workers, execute fine performance metrics (https://github.com/dask/distributed/issues/7665#issuecomment-1492194340) shall change

from {("execute", prefix, activity, unit): value} to {("execute", span, prefix, activity, unit): value}

e.g. from {("execute", "random", "thread-cpu", "seconds"): 12.3} to {("execute", (<client id>, "my_workflow", "base_collection"), "random", "thread-cpu", "seconds"): 12.3}

This won't substantially change the cardinality of the data, unless the workflows creates and destroys many clients for some weird reason.

When metrics are transferred from worker to scheduler through the heartbeat,

A @property on the Span objects allows recursively aggregating on the fly the metrics from children and grandchildren.

Note: the heartbeat may arrive when all tasks that point to a Span have already been forgotten.

Lifecycle

Spans are modest in size and in most cases can just be retained indefinitely. This is potentially problematic for long-lived servers which receive connections from many short-lived clients.

To solve this problem, we could define an expiration timeout for each Span, e.g. 6 hours, that starts ticking when the last task belonging to it is forgotten by the scheduler. When a Span is forgotten, this should also clear all matching fine performance metrics from the scheduler and the workers.

Deprecation cycle

We would like to retire Computation objects. Who uses them today? What kind of deprecation cycle do we need to adopt?

Implementation steps

(to be replaced with links to PRs / sub-issues)

CC @fjetter @hendrikmakait @ntabris

hendrikmakait commented 1 year ago

Thanks for writing this up, @crusaderky. I'm really looking forward to adding spans! Given that we were not very happy with the term Computation, I'd like to bike-shed the term Span at some point. I'm not sure if I love or hate it due to its conceptual proximity to ordinary spans and traces in observability.

fjetter commented 1 year ago

This module defines a new SchedulerPlugin, SpanPlugin, and a standalone class Span.

I suggest to not do this as a plugin. Specifically, I would expect that we have to change the signature to update_graph for this to work, e.g.

update_graph(..., span: Span | dict, ...)

At this stage I would not include this in the plugin API (the signatures are not identical). I also expect the changes to tasks/spans/etc. to be somewhat entangled with the current TaskState creation logic such that the plugin API may be hindering.

Unlike Computation objects, Spans don't link to TaskGroups. Instead, individual tasks can contribute to them through SpanPlugin.transition(). This allows recoding, e.g.:

We haven't talked about this. I suggest to not do this in the first iteration and not do anything with transitions. At this point I cannot tell how useful all of this additional information truly is and I suggest to keep it simple at first.

Lifecycle

Again something I would defer to later. I agree that we might want to clean them up eventually but we're not doing this right now for Computations and we may not want to do this for Spans either. At this point I find it premature to deal with this.


Implementation steps

[dask/dask] dask.span context manager

might be easier to start with this in dask.distributed. So far, I don't see the value in dask/dask beyond the import statement. We may want to change things along the way and keeping it in the same repo might be easier for now.

SpanPlugin and Span classes; post notable events to them

I suggest to just start with the Span and don't think about the plugin right now. Let's keep the steps here simple, please. Introducing the span, initializing it on the client and submitting it to the scheduler will already be some work.

propagate span context to tasks (to be used by worker clients)

Unless this actually causes problems I suggest to delay this as well. This is just a small fraction of use cases and I'd rather be fast with getting the basic infrastructure setup than dealing with worker_clients.


Overall, I would strongly encourage us to take very small steps. There is still sufficient uncertainty without specifying that much so early.

ntabris commented 1 year ago

A span is defined by the client using a context manager

I feel like a lot of the "easy way to define a higher-level intelligible unit of work" could also be useful for instrumenting library code, so I'd be inclined to have a design that allows spans to be defined by client, but also allows spans to be defined (say) inside dask code. If we want to leave that open, that probably affects some implementation decisions.

jacobtomlinson commented 1 year ago

Sounds great! I can imagine a couple of use cases where I would find this useful.

  1. Tracking stages in a workload. For example if I have a suite of benchmarks it would be nice to track where each one starts/ends.
  2. Tracking smaller stages within a computation. Perhaps something like trials in an XGBoost HPO workload. I think this is what @ntabris is suggesting.

I love the idea of nesting them.

I would be keen to chat further about how we can expose this data. Could we show this on the Dask dashboard? Can we export this to Grafana? Could we include something in a performance report?

ntabris commented 1 year ago

@jacobtomlinson yes, those are exactly use cases that we're thinking about. Another related potential use is being able to attribute the "fine metrics" (that @crusaderky has been working on) to user-intelligible units of work.

crusaderky commented 1 year ago

Following further discussion:

crusaderky commented 1 year ago

Following further discussion:

implementation in #7882.

crusaderky commented 1 year ago

I would like to set the definition-of-done of this issue to:

I'd like to keep these out of scope for now and open them as follow-up issues:

@ntabris Could you play with #7885 and confirm everything is as you expected for the purpose of integrating with the Coiled dashboard?

hendrikmakait commented 1 year ago

The definition-of-done seems mostly fine to me. I think I'd prefer to deprecate computations once

Propagate span context to tasks (to be used by worker clients)

has been implemented. This feels like the one additional step needed to make spans feel coherent across the board. I don't care about the deprecation/propagation being part of the DoD though.

crusaderky commented 1 year ago

Unless you explicitly set a span, worker clients will add tasks to the currently running default span though - there's no regression from computations.

hendrikmakait commented 1 year ago

True, there's no regression, but to me, deprecating computations means promoting spans instead. For this, I'd prefer steps to be coherently propagated for a smooth UX. We could also promote default spans only, that'd be fine for me as well.

crusaderky commented 1 year ago

Follow-ups: