coiled / feedback

A place to provide Coiled feedback
14 stars 3 forks source link

KilledWorker error #64

Closed hugobowne closed 3 years ago

hugobowne commented 4 years ago

hi team!

for the training I'm doing next week, I'm trying to spin up a Coiled cluster. It works!

But it looks like I'm killing workers :/

I created a cluster config (tried others also):

# Control the resources of your cluster by creating a new cluster configuration
coiled.create_cluster_configuration(
    name="my-cluster-config",
    worker_memory="16 GiB",
    worker_cpu=4,
    scheduler_memory="8 GiB",
    scheduler_cpu=2,
    software="my-software-env",
)

where my-software-env is as in the binder in this repo.

when I execute df.to_parquet("yellow_tripdata_2019-coiled.parq") down the bottom of this notebook, it throws the following:

---------------------------------------------------------------------------
KilledWorker                              Traceback (most recent call last)
<timed eval> in <module>

~/opt/anaconda3/envs/data-science-at-scale/lib/python3.8/site-packages/dask/dataframe/core.py in to_parquet(self, path, *args, **kwargs)
   3947         from .io import to_parquet
   3948 
-> 3949         return to_parquet(self, path, *args, **kwargs)
   3950 
   3951     @derived_from(pd.DataFrame)

~/opt/anaconda3/envs/data-science-at-scale/lib/python3.8/site-packages/dask/dataframe/io/parquet/core.py in to_parquet(df, path, engine, compression, write_index, append, ignore_divisions, partition_on, storage_options, write_metadata_file, compute, compute_kwargs, schema, **kwargs)
    506         if compute_kwargs is None:
    507             compute_kwargs = dict()
--> 508         out = out.compute(**compute_kwargs)
    509     return out
    510 

~/opt/anaconda3/envs/data-science-at-scale/lib/python3.8/site-packages/dask/base.py in compute(self, **kwargs)
    165         dask.base.compute
    166         """
--> 167         (result,) = compute(self, traverse=False, **kwargs)
    168         return result
    169 

~/opt/anaconda3/envs/data-science-at-scale/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
    450         postcomputes.append(x.__dask_postcompute__())
    451 
--> 452     results = schedule(dsk, keys, **kwargs)
    453     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    454 

~/opt/anaconda3/envs/data-science-at-scale/lib/python3.8/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2710                     should_rejoin = False
   2711             try:
-> 2712                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2713             finally:
   2714                 for f in futures.values():

~/opt/anaconda3/envs/data-science-at-scale/lib/python3.8/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1983             else:
   1984                 local_worker = None
-> 1985             return self.sync(
   1986                 self._gather,
   1987                 futures,

~/opt/anaconda3/envs/data-science-at-scale/lib/python3.8/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    831             return future
    832         else:
--> 833             return sync(
    834                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    835             )

~/opt/anaconda3/envs/data-science-at-scale/lib/python3.8/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    337     if error[0]:
    338         typ, exc, tb = error[0]
--> 339         raise exc.with_traceback(tb)
    340     else:
    341         return result[0]

~/opt/anaconda3/envs/data-science-at-scale/lib/python3.8/site-packages/distributed/utils.py in f()
    321             if callback_timeout is not None:
    322                 future = asyncio.wait_for(future, callback_timeout)
--> 323             result[0] = yield future
    324         except Exception as exc:
    325             error[0] = sys.exc_info()

~/opt/anaconda3/envs/data-science-at-scale/lib/python3.8/site-packages/tornado/gen.py in run(self)
    733 
    734                     try:
--> 735                         value = future.result()
    736                     except Exception:
    737                         exc_info = sys.exc_info()

~/opt/anaconda3/envs/data-science-at-scale/lib/python3.8/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1848                             exc = CancelledError(key)
   1849                         else:
-> 1850                             raise exception.with_traceback(traceback)
   1851                         raise exc
   1852                     if errors == "skip":

KilledWorker: ("('read-csv-rename-dbdc4c74c6971cc3b4065c512dd1a615', 54)", <Worker 'tls://10.1.12.247:46755', name: hugobowne-678-worker-8-b0a898, memory: 0, processing: 32>)
mrocklin commented 4 years ago

This might be helpful to explain the situation: https://stackoverflow.com/questions/46691675/what-do-killedworker-exceptions-mean-in-dask

I recommend looking at the logs from workers to see if something is going on there.

On Tue, Sep 8, 2020 at 1:52 AM Hugo Bowne-Anderson notifications@github.com wrote:

hi team!

for the training I'm doing next week, I'm trying to spin up a Coiled cluster. It works!

But it looks like I'm killing workers :/

I created a cluster config (tried others also):

Control the resources of your cluster by creating a new cluster configuration

coiled.create_cluster_configuration( name="my-cluster-config", worker_memory="16 GiB", worker_cpu=4, scheduler_memory="8 GiB", scheduler_cpu=2, software="my-software-env", )

where my-software-env is as in the binder in this repo https://github.com/coiled/data-science-at-scale/.

when I execute df.to_parquet("yellow_tripdata_2019-coiled.parq") down the bottom of this notebook, it throws the following:


KilledWorker Traceback (most recent call last)

in ~/opt/anaconda3/envs/data-science-at-scale/lib/python3.8/site-packages/dask/dataframe/core.py in to_parquet(self, path, *args, **kwargs) 3947 from .io import to_parquet 3948 -> 3949 return to_parquet(self, path, *args, **kwargs) 3950 3951 @derived_from(pd.DataFrame) ~/opt/anaconda3/envs/data-science-at-scale/lib/python3.8/site-packages/dask/dataframe/io/parquet/core.py in to_parquet(df, path, engine, compression, write_index, append, ignore_divisions, partition_on, storage_options, write_metadata_file, compute, compute_kwargs, schema, **kwargs) 506 if compute_kwargs is None: 507 compute_kwargs = dict() --> 508 out = out.compute(**compute_kwargs) 509 return out 510 ~/opt/anaconda3/envs/data-science-at-scale/lib/python3.8/site-packages/dask/base.py in compute(self, **kwargs) 165 dask.base.compute 166 """ --> 167 (result,) = compute(self, traverse=False, **kwargs) 168 return result 169 ~/opt/anaconda3/envs/data-science-at-scale/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs) 450 postcomputes.append(x.__dask_postcompute__()) 451 --> 452 results = schedule(dsk, keys, **kwargs) 453 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)]) 454 ~/opt/anaconda3/envs/data-science-at-scale/lib/python3.8/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs) 2710 should_rejoin = False 2711 try: -> 2712 results = self.gather(packed, asynchronous=asynchronous, direct=direct) 2713 finally: 2714 for f in futures.values(): ~/opt/anaconda3/envs/data-science-at-scale/lib/python3.8/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous) 1983 else: 1984 local_worker = None -> 1985 return self.sync( 1986 self._gather, 1987 futures, ~/opt/anaconda3/envs/data-science-at-scale/lib/python3.8/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs) 831 return future 832 else: --> 833 return sync( 834 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs 835 ) ~/opt/anaconda3/envs/data-science-at-scale/lib/python3.8/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs) 337 if error[0]: 338 typ, exc, tb = error[0] --> 339 raise exc.with_traceback(tb) 340 else: 341 return result[0] ~/opt/anaconda3/envs/data-science-at-scale/lib/python3.8/site-packages/distributed/utils.py in f() 321 if callback_timeout is not None: 322 future = asyncio.wait_for(future, callback_timeout) --> 323 result[0] = yield future 324 except Exception as exc: 325 error[0] = sys.exc_info() ~/opt/anaconda3/envs/data-science-at-scale/lib/python3.8/site-packages/tornado/gen.py in run(self) 733 734 try: --> 735 value = future.result() 736 except Exception: 737 exc_info = sys.exc_info() ~/opt/anaconda3/envs/data-science-at-scale/lib/python3.8/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker) 1848 exc = CancelledError(key) 1849 else: -> 1850 raise exception.with_traceback(traceback) 1851 raise exc 1852 if errors == "skip": KilledWorker: ("('read-csv-rename-dbdc4c74c6971cc3b4065c512dd1a615', 54)", ) — You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub , or unsubscribe .
jrbourbeau commented 4 years ago

Thanks for raising an issue @hugobowne! Looking at the notebook you linked, I noticed the DataFrame you're saving to disk with df.to_parquet("yellow_tripdata_2019-coiled.parq") originates from reading data in S3 df = dd.read_csv("s3://nyc-tlc/trip data/yellow_tripdata_2019-*.csv", ...). However, s3fs, the library Dask uses to access files on S3, isn't included in the software environment. I recommend adding s3fs to the list of conda environments in your environment.yml file and trying again.

Also, if you're interested in further reading, there is https://distributed.dask.org/en/latest/killed.html in the docs

asos-ashtonhills commented 4 years ago

Hi, I am running into the same error. I have investigated the worker logs and found the following which might be the source of the problem: ModuleNotFoundError: No module named 'adlfs'.

However, I checked my requirements.txt file and can see that the module is definitely in there, and when I create the software environment, the logs say that adlfs-0.3.1 has been successfully installed. Is anyone able to provide some insight here as to why the module isn't being found properly? I am creating my cluster and environment like so:

cluster = coiled.Cluster(n_workers=25,
    configuration="coiled/default-py37",
    name='feature-store-cluster-50')

coiled.create_software_environment(
    name="coiled-feature-store",
    pip="requirements.txt",
)
jrbourbeau commented 4 years ago

@ashtonhills95 it looks like you've create a software environment with adlfs installed, but haven't yet created a cluster configuration that uses the software environment with adlfs. In the snippet you posted, the coiled.Cluster you're creating uses configuration="coiled/default-py37" which does not have adlfs installed, which is why you're running into ModuleNotFoundError: No module named 'adlfs'.

Something like:


import coiled

coiled.create_software_environment(
    name="coiled-feature-store",
    pip="requirements.txt",
)

coiled.create_cluster_configuration(
    name="coiled-feature-store",
    software="coiled-feature-store",
    # there are other inputs here you can also adjust
)

cluster = coiled.Cluster(
    n_workers=25,
    configuration="coiled-feature-store",
    name="feature-store-cluster",
)

will create your software environment with adlfs, a cluster configuration that uses the newly created software environment, and then spins up a cluster using the new cluster configuration. Hope that helps! Let me know if you're still running into issues

FabioRosado commented 3 years ago

We have recently added an article that aims to tackle this exception and provide the user with some guidance as to what to do when they encounter the KilledWorker Exception. I believe we can now close this issue.