dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.57k stars 717 forks source link

Performance Report crashes compute midway #3858

Closed KrishanBhasin closed 4 years ago

KrishanBhasin commented 4 years ago

What happened: Computing a Dask Dataframe results in failure partway with an error raised by performance_report(). The error raised is:

~/.conda/envs/dc/lib/python3.7/site-packages/distributed/scheduler.py in performance_report()
   5081         timespent = defaultdict(int)
   5082         for d in task_stream:
-> 5083             for x in d["startstops"]:
   5084                 timespent[x["action"]] += x["stop"] - x["start"]
   5085         tasks_timings = ""

KeyError: 'startstops'

What you expected to happen: Performance report to not fail, or performance report fails without impacting the computation

Minimal Complete Verifiable Example:

I am yet to craft one, as my use case involves creating a large dask graph and submitting it all at once. I will continue to try to find one, but I thought it might be helpful if I filed this early.

It looks like part of the timings information collected as part of https://github.com/dask/distributed/pull/3822 result in a request into a dict for keys that do not exist (see above error)

Unfortunately I don't know enough about distributed's internals to understand whether every task stream's dictionaries should include a startstops key.

I'd be happy to contribute a PR that has a safety check for the key startstops, if that is the correct fix here.

Error stacktrace I'm doubtful this adds more useful information, but I have included it for completeness. ```python --------------------------------------------------------------------------- KeyError Traceback (most recent call last) in 1 # Similar code can be written to write the dataframe to CSV or to Parquet. 2 with performance_report("my_report.html"): ----> 3 ddf.to_csv("output_csvs") ~/.conda/envs/dc/lib/python3.7/site-packages/distributed/client.py in __exit__(self, typ, value, traceback) 4735 except Exception: 4736 code = "" -> 4737 get_client().sync(self.__aexit__, type, value, traceback, code=code) 4738 4739 ~/.conda/envs/dc/lib/python3.7/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs) 822 else: 823 return sync( --> 824 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs 825 ) 826 ~/.conda/envs/dc/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs) 337 if error[0]: 338 typ, exc, tb = error[0] --> 339 raise exc.with_traceback(tb) 340 else: 341 return result[0] ~/.conda/envs/dc/lib/python3.7/site-packages/distributed/utils.py in f() 321 if callback_timeout is not None: 322 future = asyncio.wait_for(future, callback_timeout) --> 323 result[0] = yield future 324 except Exception as exc: 325 error[0] = sys.exc_info() ~/.conda/envs/dc/lib/python3.7/site-packages/tornado/gen.py in run(self) 733 734 try: --> 735 value = future.result() 736 except Exception: 737 exc_info = sys.exc_info() ~/.conda/envs/dc/lib/python3.7/site-packages/distributed/client.py in __aexit__(self, typ, value, traceback, code) 4721 code = "" 4722 data = await get_client().scheduler.performance_report( -> 4723 start=self.start, code=code 4724 ) 4725 with open(self.filename, "w") as f: ~/.conda/envs/dc/lib/python3.7/site-packages/distributed/core.py in send_recv_from_rpc(**kwargs) 804 name, comm.name = comm.name, "ConnectionPool." + key 805 try: --> 806 result = await send_recv(comm=comm, op=key, **kwargs) 807 finally: 808 self.pool.reuse(self.addr, comm) ~/.conda/envs/dc/lib/python3.7/site-packages/distributed/core.py in send_recv(comm, reply, serializers, deserializers, **kwargs) 603 if comm.deserialize: 604 typ, exc, tb = clean_exception(**response) --> 605 raise exc.with_traceback(tb) 606 else: 607 raise Exception(response["text"]) ~/.conda/envs/dc/lib/python3.7/site-packages/distributed/core.py in handle_comm() 457 result = asyncio.ensure_future(result) 458 self._ongoing_coroutines.add(result) --> 459 result = await result 460 except (CommClosedError, CancelledError) as e: 461 if self.status == "running": ~/.conda/envs/dc/lib/python3.7/site-packages/distributed/scheduler.py in performance_report() 5081 timespent = defaultdict(int) 5082 for d in task_stream: -> 5083 for x in d["startstops"]: 5084 timespent[x["action"]] += x["stop"] - x["start"] 5085 tasks_timings = "" KeyError: 'startstops' ```

Anything else we need to know?:

Environment:

mrocklin commented 4 years ago

cc @quasiben

It looks like it's possible for a task to arrive without any known compute/transfer times. Perhaps this comes about if the task is already computed? I'm not sure. My guess is that this can be resolved by replacing d["startstops"] with d.get("startstops, [])

Is this a change that you would be interested in making @KrishanBhasin ?

quasiben commented 4 years ago

Thanks for the report @KrishanBhasin and for the pin @mrocklin . Submit a PR to fix. @KrishanBhasin if you have time can you test to see if it resolves your issue ?

KrishanBhasin commented 4 years ago

Yep it just completed without errors, thanks for fixing it!

I was hoping for this to be another excuse for me to contribute a PR but you beat me 😂