eriknw / afar

Run code on a Dask cluster via a context manager or IPython magic
Other
29 stars 0 forks source link

When using Afar one worker stops getting tasks. #32

Open ncclementi opened 2 years ago

ncclementi commented 2 years ago

I was running a workflow using Afar on Coiled, and I noticed that the Afar version at a moment had a worker that stopped receiving tasks. Notice that in the task stream on the performance reports the Afar version, the last thread stops having tasks while in the non-afar version this doesn't happen. Is this the expected behavior? what is actually happening in here?

Note: the data is public so this should work as a reproducible example.

Workflow without afar:

ddf = dd.read_parquet(
    "s3://coiled-datasets/timeseries/20-years/parquet",
    storage_options={"anon": True, "use_ssl": True},
    split_row_groups=True,
    engine="pyarrow",
)

with performance_report(filename="read_pq_groupby_mean_CPU_pyarrow.html"):
    ddf.groupby('name').x.mean().compute()

Link to performance report

Workflow with afar

%%time
with afar.run, remotely:
    ddf_cpu = dd.read_parquet(
        "s3://coiled-datasets/timeseries/20-years/parquet",
        storage_options={"anon": True, "use_ssl": True},
        split_row_groups=True,
        engine="pyarrow",
        )

    res = ddf_cpu.groupby('name').x.mean().compute()

with performance_report(filename="read_pq_groupby_mean_CPU_pyarrow_afar.html"):
    res.result()

Link to performance report

eriknw commented 2 years ago

I just love those performance reports!

This does look suspicious--thanks for the report. I don't yet have a plausible explanation for why some threads would stall. The simplest "I have no idea what's going on, but let's take a stab at a solution anyway" thing to try would be to change the Lock to RLock in afar/_printing.py.

I'll try to investigate this further tonight.

eriknw commented 2 years ago

I can reproduce the issue.

I have ruled out some of the weird bits of afar, such as using locks around updating builtins.print and using a custom channel to send messages from the worker to the client. It's really not clear to me what else in afar would even be relevant in causing any issues.

run_afar is a long running task that runs other tasks. This could be the source of the issue. Aha! To test this, I just reproduced the issue by running the following (w/o afar):

def run():
    ddf_cpu = dd.read_parquet(
        "s3://coiled-datasets/timeseries/20-years/parquet",
        storage_options={"anon": True, "use_ssl": True},
        split_row_groups=True,
        engine="pyarrow",
    )    
    return ddf_cpu.groupby('name').x.mean().compute()

with performance_report(filename="read_pq_groupby_mean_CPU_pyarrow_run.html"):
    client.submit(run).result()

So, I don't think afar is to blame. Maybe this is related to https://github.com/dask/distributed/issues/5332