cesium-ml / cesium

Machine Learning Time-Series Platform
Other
671 stars 101 forks source link

How to use dask? #300

Closed arnabbiswas1 closed 3 years ago

arnabbiswas1 commented 3 years ago

I am trying to use cesium.featurize.featurize_time_series(). But from the documentation, it is not clear how to use Dask?

If I execute this function, will the library trigger a Dask Local Cluster and distribute the feature generation over the dask workers? Or I need to point pass the address of the remote or local dask scheduler ("tcp://127.0.0.1:44065") as a value to "scheduler" parameter?

Could you please clarify?

stefanv commented 3 years ago

Hi Arnab,

Thanks for your question!

By default, the time series will featurized in parallel using the dask.threaded scheduler; other approaches, including serial and distributed approaches, can be implemented by passing in other dask schedulers as the scheduler argument to featurize_time_series.

For example, say you have a dask scheduler running on tcp://192.168.0.10:8786 (this is what I get when runningdask-scheduler on my machine). You can then connect to the scheduler with:

from dask.distributed import Client
c = Client('tcp://192.168.0.10:8786')

And then call the featurization as follows:

fset_cesium = featurize.featurize_time_series(
    times=eeg["times"],
    values=eeg["measurements"],
    errors=None,
    features_to_use=features_to_use, scheduler=c
)

The scheduler argument is passed through to dask. Instead of a scheduler, you can specify threads or processes, for example (see the dask docs).

(The above is based on the gallery example from the Cesium docs.)

Let me know if that helps!

arnabbiswas1 commented 3 years ago

Appreciate your quick reply.

As per my understanding, c below is not the dask_scheduler, but a Dask Client connected to a scheduler.

from dask.distributed import Client
c = Client('tcp://192.168.0.10:8786')

Name of the parameter (scheduler) made it confusing. In fact, I have tried to pass the following as the value of the parameter:

As expected, both didn't work.

May be updating the documentation and adding few examples with different possible use cases would be helpful. That will enable quick adaptation of cesium as well

Note: This understanding is based on the Dask Architecture Diagram:

arnabbiswas1 commented 3 years ago

Some more update.

I have been able to start using Dask Distributed Scheduler using a local client & a local cluster (client = Client()), but it fails after some time with the following error message (Dask Local Cluster shuts down). I have not debugged the issue, so I don't know where exactly it's going wrong.

/home/arnabb/anaconda3/envs/py_k/lib/python3.8/site-packages/distributed/worker.py:3856: UserWarning: Large object of size 38.54 MiB detected in task graph: 
  (['featurize_single_ts-9e17c5543f8d9553544d32e248d ... fc1027fbcd71'])
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and 
keep data on workers

    future = client.submit(func, big_data)    # bad

    big_future = client.scatter(big_data)     # good
    future = client.submit(func, big_future)  # good
  warnings.warn(
distributed.core - ERROR - 'tcp://127.0.0.1:41139'
Traceback (most recent call last):
  File "/home/arnabb/anaconda3/envs/py_k/lib/python3.8/site-packages/distributed/core.py", line 575, in handle_stream
    handler(**merge(extra, msg))
  File "/home/arnabb/anaconda3/envs/py_k/lib/python3.8/site-packages/distributed/scheduler.py", line 5332, in handle_release_data
    ws: WorkerState = parent._workers_dv[worker]
KeyError: 'tcp://127.0.0.1:41139'
distributed.utils - ERROR - 'tcp://127.0.0.1:41139'
Traceback (most recent call last):
  File "/home/arnabb/anaconda3/envs/py_k/lib/python3.8/site-packages/distributed/utils.py", line 637, in log_errors
    yield
  File "/home/arnabb/anaconda3/envs/py_k/lib/python3.8/site-packages/distributed/scheduler.py", line 4315, in add_worker
    await self.handle_worker(comm=comm, worker=address)
  File "/home/arnabb/anaconda3/envs/py_k/lib/python3.8/site-packages/distributed/scheduler.py", line 5435, in handle_worker
    await self.handle_stream(comm=comm, extra={"worker": worker})
  File "/home/arnabb/anaconda3/envs/py_k/lib/python3.8/site-packages/distributed/core.py", line 575, in handle_stream
    handler(**merge(extra, msg))
  File "/home/arnabb/anaconda3/envs/py_k/lib/python3.8/site-packages/distributed/scheduler.py", line 5332, in handle_release_data
    ws: WorkerState = parent._workers_dv[worker]
KeyError: 'tcp://127.0.0.1:41139'
distributed.core - ERROR - Exception while handling op register-worker
...........
..........
KeyError: 'tcp://127.0.0.1:42759'
distributed.worker - WARNING - Heartbeat to scheduler failed
distributed.worker - WARNING - Heartbeat to scheduler failed

I tried with scheduler="processes" and that worked.

stefanv commented 3 years ago

Sorry, I'm not sure why your distributed tasks aren't running; if you send me a small self-contained example I can see if it runs on my machine.

W.r.t. the docs, arguably this notion of client vs scheduler needs to be clarified in the dask docs (since we simply pass the scheduler argument to dask's scheduler argument). Still, we would be happy to merge an improvement to the documentation, if you'd be willing to submit a PR?

arnabbiswas1 commented 3 years ago

Sorry for late reply.

I am absolutely sorry that I will not be able to work on a PR.

stefanv commented 3 years ago

I hope you were able to resolve your issue, @arnabbiswas1. If not, feel free to open another issue.