dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 718 forks source link

Workers in distributed cluster are not using all the cores #7846

Open dev-foa opened 1 year ago

dev-foa commented 1 year ago

Describe the issue: Upon running a computation on a distributed cluster, dask doesn't seem to be using all the cores of a multi core machine. This became clear upon running it with different configurations as described in the table below.

items threads workers cpu_per_worker cores time_in_seconds
10000 100 100 1 100 266.85
10000 100 50 2 100 484.46
10000 100 25 4 100 1060.81
10000 200 200 1 200 159.74
10000 200 100 2 200 272.17
10000 200 50 4 200 564.24
10000 400 400 1 400 111.77
10000 400 200 2 400 162.48
10000 400 100 4 400 300.91
10000 800 800 1 800 98.66
10000 800 400 2 800 110.5
10000 800 200 4 800 180.23
10000 800 100 8 800 275.8
10000 800 50 16 800 526.02

The computation involves creating a dataframe from a map function and then running some aggregations on the resultant dataframe

Minimal Complete Verifiable Example:


def get_agg_1(df):
    df['ab'] = df['a'] + ['b']
    df['a_b'] = df['a'] * ['b']
    df['c'] = df['ab'] / df['a_b']
    agg_r = df.groupby(['col1']).agg({'a':'sum'},{'b':'mean'},{'c':'mean'})
    return agg_r

def get_agg_2(df):
    df['pq'] = df['p'] + ['q']
    df['p_q'] = df['p'] * ['q']
    df['r'] = df['pq'] / df['p_q']
    agg_r = df.groupby(['col2']).agg({'p':'sum'},{'q':'mean'},{'r':'mean'})
    return agg_r

def get_agg_3(df):
    df['xy'] = df['x'] + ['y']
    df['x_y'] = df['x'] * ['y']
    df['z'] = df['xy'] / df['x_y']
    agg_r = df.groupby(['col3']).agg({'z':'sum'},{'y':'mean'},{'z':'mean'})
    return agg_r

with FargateCluster(
    scheduler_cpu=16384,
    scheduler_mem=32768,
    image='IMAGE_URL',
    n_workers=workers,
    worker_cpu=1024*cpu,
    worker_mem=2048*cpu,
    scheduler_timeout='10 minutes'
) as cluster:
    start_time = time.time()
    with Client(cluster) as client:
        results = dd.from_map(map_function, items)
        agg_1 = get_agg_1(results)
        agg_2 = get_agg_2(results)
        agg_3 = get_agg_3(results)
        final_results = dask.compute(results, agg_1, agg_2, agg_3)
    print('time taken', time.time() - start_time)

Anything else we need to know?:

The map_function takes around 5-6 seconds for each item

Environment:

jacobtomlinson commented 1 year ago

Thanks for raising this @dev-foa. I wonder if this is similar to what we are seeing in https://github.com/dask/dask/issues/10291

fjetter commented 1 year ago

@dev-foa can you please provide a runnable example showing this problem? What you are seeing may very well depend on the details of what map_function is doing, what data you are generating etc.

Also, the code you are providing seems to not be runnable. There appear to be a couple of syntax errors, e.g.

I suggest to try to reproduce this issue with a LocalCluster instead of a FargateCluster as well.

See also https://matthewrocklin.com/minimal-bug-reports for some suggestions of how to produce such a minimal example.