dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 718 forks source link

Unique keys runs slow with time #5965

Open ericman93 opened 2 years ago

ericman93 commented 2 years ago

I have a DAG that I want to run in parallel multiple times, but each time with a different input params Having the same key for all of the parallel runs isn't working, because it will only run it once with the first run params My solution for that was to generate a guid as a key for each task But this issue is that having guid is the key creates huge number of tasks, and the scheduler becomes slower with time Trying to have a pool of keys solved the issue, but this isn’t the solution I want to have

Once the task finishes, I don't care about it anymore so if deleting the keys is an option and will solve the issue, it will be great

import asyncio
from random import randint
from uuid import uuid4
import time
from dask.distributed import Client

def add(x, y):
    return x + y

async def do_once(n):
    def get_key(name):
        # return name # not working, running one once
        # return f'{name}_{uuid4().hex}' # become slow after few minutes
        return f'{name}_{n}'  # works good

    async with Client(address='0.0.0.0:8786', asynchronous=True) as client:
        n_key = get_key('n')
        x_key = get_key('x')
        y_key = get_key('y')
        z_key = get_key('z')
        ran = randint(0, 100)

        dsk = {
            n_key: n,
            x_key: (add, n_key, 1),
            y_key: (add, n_key, ran),
            z_key: (add, x_key, y_key)
        }

        futures = client.get(dsk, keys=[z_key], sync=False)
        res = await futures[0]

        assert res == ((n + 1) + (n + ran)), f"{res} != {((n + 1) + (n + ran))}"

async def main():
    while True:
        start = time.time()
        futures = [do_once(j) for j in range(20)]
        await asyncio.gather(*futures)

        print(f'iteration took {time.time() - start}')

asyncio.run(main())

Environment:

ericman93 commented 2 years ago

I was looking at the code and saw that when entering task they are grouped by the prefix (that is splitter by -) so I change get_key method to

    def get_key(name):
        return f'{name}-{n}'

and had much better results

but can I clean the stats somehow? lets say that the scheduler is running for 4 days now, I don't want the old stats

fjetter commented 2 years ago
  1. The tasks should all be deleted as soon as you close your internal client (i.e. leave the contextmanager) or the future instances are garbage collected
  2. Instead of writing your own graph, I would suggest using Client.submit

e.g.

async with Client(address='0.0.0.0:8786', asynchronous=True) as client:
    ran = randint(0, 100)
    f1 = client.submit(add, n, 1)
    f2 = client.submit(add, n, ran)
    f3 = client.submit(add, f1, f2)
    res = await f3

    assert res == ((n + 1) + (n + ran)), f"{res} != {((n + 1) + (n + ran))}"

Do you have a specific reason why you are building the graph yourself? The submit/map API is much more user friendly.

This will re-use keys since, by default, we assume functions to be pure, i.e. side effect free, deterministic and therefore cachable. If this is not true, use the keyword pure=False for all your submit calls an we will generate unique keys for you

ericman93 commented 2 years ago

The tasks should be deleted from the client or from the scheduler?

I used graph because I wanted to build the graph only once and reuse it each time with different parameters In this case, I'll always change the value of n key in the dict and run it Another issue I had is that when submitting the function it start running, and I wanted all functions to start together

I had changes in my use case and now I'm building the DAG everytime, so I'll try to back to submits Are there performance differences between graph and submit? My tasks are not pure by the way

gjoseph92 commented 2 years ago

One thing to notice: looks like you're opening 20 different clients at once, then submitting the "same" (same keys) operation from each of them. That should work, it's just unusual.

Could you share the results of your iteration took statement? What kind of increase in runtime are we talking about here?

the scheduler becomes slower with time

feels related to https://github.com/dask/distributed/issues/5960. We've noticed this exact behavior before (re-running the same operation is slower and slower each time) but never figured out why: https://github.com/dask/distributed/issues/4987#issuecomment-877473670.

Are there performance differences between graph and submit?

Graph will actually be a tiny bit more efficient, since it's one network call instead of 4. Using dask.delayed would be the more common thing here though; writing a graph by hand and calling client.get like you're doing is supported, just unusual.

My tasks are not pure by the way

In that case, giving them new keys every time would be the semantically correct thing to do.

so I change get_key method

There's no method named get_key exactly; are you talking about key_split or key_split_group?

ericman93 commented 2 years ago

Could you share the results of your iteration took statement? What kind of increase in runtime are we talking about here?

~ ~ ~ ~ ~ ~ ~ ~ ~ ~ 
0 minutes
~ ~ ~ ~ ~ ~ ~ ~ ~ ~ 
iteration took 0.23832011222839355
iteration took 0.18767070770263672
iteration took 0.25737690925598145
iteration took 0.2599179744720459
iteration took 0.201430082321167
iteration took 0.20307374000549316
~ ~ ~ ~ ~ ~ ~ ~ ~ ~ 
3 minutes
~ ~ ~ ~ ~ ~ ~ ~ ~ ~ 
iteration took 0.267427921295166
iteration took 0.3653082847595215
iteration took 0.5170252323150635
iteration took 0.6326770782470703
iteration took 0.758781909942627
iteration took 0.640618085861206
~ ~ ~ ~ ~ ~ ~ ~ ~ ~ 
6 minutes
~ ~ ~ ~ ~ ~ ~ ~ ~ ~ 
iteration took 2.01904296875
iteration took 2.1777050495147705
iteration took 2.5627808570861816
iteration took 2.125898838043213
iteration took 2.4273059368133545
iteration took 2.8562822341918945

One thing to notice: looks like you're opening 20 different clients at once, then submitting the "same" (same keys) operation from each of them. That should work, it's just unusual.

I'm generating a new key, but with the same prefix generating the same key was just a test should I create the client once a reuse it?

There's no method named get_key exactly; are you talking about key_split key_split_group?

I meant the get_key from my example

ericman93 commented 2 years ago

continue the discussion from dask forum

a few questions about the dashboard

image

All of my EnrichmentStep can run parallel. I understand that they are running on the same worker for optimization, but can I run it parallel? Maybe if I'll have more workers per dask-worker and change the scheduler to use threads it will run more parallel? (I have 1 worker per pod)

by the way, I see a lot of unmanaged memory image