dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.57k stars 717 forks source link

Asyncio progress reporting #1024

Open kszucs opened 7 years ago

kszucs commented 7 years ago

I'd like to send progress reports through a websocket. Currently I can't find a nice way to achieve that.

Could we implement an interface similar to the following?

from distributed.diagnostics import Progress

future = client.compute(...)

async for processed_key, completeness in Progress(future):
    logging.info(processed_key)
    websocket.send(completeness)

Somewhat related to https://github.com/dask/distributed/issues/1020

mrocklin commented 7 years ago

If you have many futures then as_completed may work for you. See http://distributed.readthedocs.io/en/latest/api.html#distributed.client.as_completed

It also supports the __aiter__ protocol.

On Tue, Apr 18, 2017 at 6:46 AM, Krisztián Szűcs notifications@github.com wrote:

I'd like to send progress reports through a websocket. Currently I can't find a nice way to achieve that.

Could we implement an interface similar to the following?

from distributed.diagnostics import Progress

future = client.compute(...) async for processed_key, completeness in Progress(future): logging.info(processed_key) websocket.send(completeness)

Somewhat related to #1020 https://github.com/dask/distributed/issues/1020

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/1024, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszCBdnNlKJ6n6dMe7dKoNrIKdTcMxks5rxJR_gaJpZM4NALTI .

kszucs commented 7 years ago

I've tried. The same bad yield problem occurs when using on top of asyncio runner.

I also need the actual completeness percentage. How about an async iterable version of distributed.diagnostics.progress?

mrocklin commented 7 years ago

Is this different from as_completed combined with some counting and division by the total number of futures?

On Tue, Apr 18, 2017 at 8:14 AM, Krisztián Szűcs notifications@github.com wrote:

I've tried. The same bad yield problem occurs when using on top of asyncio runner.

I also need the actual completeness percentage. How about an async iterable version of distributed.diagnostics.progress?

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/1024#issuecomment-294813320, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszIqqZnjRHuQiBgqNV7yJO-VRgmvUks5rxKk5gaJpZM4NALTI .

kszucs commented 7 years ago

Your question implicates it is not.

# dask's ProgressBar
with ProgressBar():
    a_delayed_value.compute()

However what if I have just one future, which is the tip of a more complicated dag?

mrocklin commented 7 years ago

OK, that makes sense. You might be able to copy something in distributed/diagnostics/progressbar.py

On Tue, Apr 18, 2017 at 8:22 AM, Krisztián Szűcs notifications@github.com wrote:

Your question implicates it is not.

dask's ProgressBarwith ProgressBar():

a_delayed_value.compute()

However what if I have just one future, which is the tip of a more complicated dag?

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/1024#issuecomment-294816300, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszMkbl6OCurtOdCSQSdGqXqt7-0Fvks5rxKrqgaJpZM4NALTI .

kszucs commented 7 years ago

Well, I guess this depends on https://github.com/dask/distributed/pull/1021

kszucs commented 7 years ago

Is the recent GroupProgress related to this issue? Could You give me a hint how to implement this one?

mrocklin commented 7 years ago

@kszucs these links may be helpful:

If this isn't sufficient then let me know