dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.57k stars 716 forks source link

Spans with distinct names get merged #7975

Open ntabris opened 1 year ago

ntabris commented 1 year ago

If I do this, I get two spans:

import distributed
from distributed.spans import span

client = distributed.Client()

def double(x):
    return x * 2

with span("double-3"):
    client.submit(double, 3)

# wait a bit

with span("double-5"):
    client.submit(double, 5)

But if I don't wait a bit and just run this code all at once, I get a single span named double-3. That doesn't seem right to me.

I noticed this because I was trying to submit tasks in a loop so I could make them distinct spans, like so:

futures = []
for i in range(10):
    with span(f"double-{i}"):
        f = client.submit(double, i)
    futures.append(f)

But of course that fails in the same way: I get a single double-0 span.

(I'm trying to figure out how to map a function over some data and have each of those tasks a distinct span.)

ntabris commented 1 year ago

@crusaderky I think this isn't the desired behavior. But if it is the desired behavior, I find this confusing.

crusaderky commented 1 year ago

This is a known issue caused by the two tasks falling into the same TaskGroup. Specifically,

This is a known limitation we consciously built on top for the sake of code complexity and development velocity. We have no plans to fix it.

The workaround is to manually specify task keys so that the part before the first dash (if any) is different:

def double(x):
    return x * 2

with span("double-3"):
    f3 = client.submit(double, 3, key="d3")
with span("double-5"):
    f5 = client.submit(double, 5, key="d5")

The above snippet will create two separate spans.

If you omit the key, both tasks will end up in TaskGroup double, which in turn will be associated to the first-seen span double-3.

crusaderky commented 1 year ago

(I'm trying to figure out how to map a function over some data and have each of those tasks a distinct span.)

The driving idea behind spans is that task prefixes and, even worse, task groups are too fine for most users, and dask should offer a more coarse aggregation that can be easily mapped to the client-side workflow.

map() creates a series of tasks from an iterable of inputs. These tasks, by design, are in the same TaskGroup and cannot be in separate spans. This is conceptually identical to when you have a dask.dataframe, dask.array, or dask.bag split over different partitions and you call an embarassingly parallel function on it.

Neither spans nor fine performance metrics were designed to offer a granularity of a single task, and coercing them to do so would cause a multiple order-of-magnitude increase in the amount of data they produce.

ntabris commented 1 year ago

Okay, thanks for the explanations.

I think I expect(ed) spans to be a way of user-defined groupings of activity. That's not what it is, and I think what it is instead is hard to understand for someone like me who doesn't usually think about dask internals.

crusaderky commented 1 year ago

I think I expect(ed) spans to be a way of user-defined groupings of activity. That's not what it is

It's exactly what it is. It's just not meant to give you a granularity of a single task.