RS-DAT / JupyterDaskOnSLURM

Apache License 2.0
16 stars 3 forks source link

RuntimeError when number of workers changes #59

Open Simon-van-Diepen opened 11 months ago

Simon-van-Diepen commented 11 months ago

When the number of workers changes during processing of large jobs (>500000 tasks), sometimes the following RuntimeError appears and computation stops:

RuntimeError                              Traceback (most recent call last)
Cell In[61], line 2
      1 if check_step(6):
----> 2     computed_statistics = dask.compute(all_statistics, traverse=True)[0]

File ~/mambaforge/envs/jupyter_dask/lib/python3.11/site-packages/dask/base.py:595, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    592     keys.append(x.__dask_keys__())
    593     postcomputes.append(x.__dask_postcompute__())
--> 595 results = schedule(dsk, keys, **kwargs)
    596 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~/mambaforge/envs/jupyter_dask/lib/python3.11/site-packages/distributed/client.py:3227, in Client.get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   3225         should_rejoin = False
   3226 try:
-> 3227     results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   3228 finally:
   3229     for f in futures.values():

File ~/mambaforge/envs/jupyter_dask/lib/python3.11/site-packages/distributed/client.py:2361, in Client.gather(self, futures, errors, direct, asynchronous)
   2359 else:
   2360     local_worker = None
-> 2361 return self.sync(
   2362     self._gather,
   2363     futures,
   2364     errors=errors,
   2365     direct=direct,
   2366     local_worker=local_worker,
   2367     asynchronous=asynchronous,
   2368 )

File ~/mambaforge/envs/jupyter_dask/lib/python3.11/site-packages/distributed/utils.py:351, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    349     return future
    350 else:
--> 351     return sync(
    352         self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    353     )

File ~/mambaforge/envs/jupyter_dask/lib/python3.11/site-packages/distributed/utils.py:418, in sync(loop, func, callback_timeout, *args, **kwargs)
    416 if error:
    417     typ, exc, tb = error
--> 418     raise exc.with_traceback(tb)
    419 else:
    420     return result

File ~/mambaforge/envs/jupyter_dask/lib/python3.11/site-packages/distributed/utils.py:391, in sync.<locals>.f()
    389         future = wait_for(future, callback_timeout)
    390     future = asyncio.ensure_future(future)
--> 391     result = yield future
    392 except Exception:
    393     error = sys.exc_info()

File ~/mambaforge/envs/jupyter_dask/lib/python3.11/site-packages/tornado/gen.py:767, in Runner.run(self)
    765 try:
    766     try:
--> 767         value = future.result()
    768     except Exception as e:
    769         # Save the exception for later. It's important that
    770         # gen.throw() not be called inside this try/except block
    771         # because that makes sys.exc_info behave unexpectedly.
    772         exc: Optional[Exception] = e

File ~/mambaforge/envs/jupyter_dask/lib/python3.11/site-packages/distributed/client.py:2224, in Client._gather(self, futures, errors, direct, local_worker)
   2222         exc = CancelledError(key)
   2223     else:
-> 2224         raise exception.with_traceback(traceback)
   2225     raise exc
   2226 if errors == "skip":

File ~/mambaforge/envs/jupyter_dask/lib/python3.11/site-packages/dask/optimization.py:992, in __call__()
    990 if not len(args) == len(self.inkeys):
    991     raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 992 return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))

File ~/mambaforge/envs/jupyter_dask/lib/python3.11/site-packages/dask/core.py:151, in get()
    149 for key in toposort(dsk):
    150     task = dsk[key]
--> 151     result = _execute_task(task, cache)
    152     cache[key] = result
    153 result = _execute_task(out, cache)

File ~/mambaforge/envs/jupyter_dask/lib/python3.11/site-packages/dask/core.py:121, in _execute_task()
    117     func, args = arg[0], arg[1:]
    118     # Note: Don't assign the subtask results to a variable. numpy detects
    119     # temporaries by their reference count and can execute certain
    120     # operations in-place.
--> 121     return func(*(_execute_task(a, cache) for a in args))
    122 elif not ishashable(arg):
    123     return arg

File ~/mambaforge/envs/jupyter_dask/lib/python3.11/site-packages/dask/dataframe/methods.py:357, in assign()
    355 df = df.copy(deep=bool(deep))
    356 for name, val in pairs.items():
--> 357     df[name] = val
    358 return df

File ~/mambaforge/envs/jupyter_dask/lib/python3.11/site-packages/pandas/core/frame.py:3950, in __setitem__()
   3947     self._setitem_array([key], value)
   3948 else:
   3949     # set column
-> 3950     self._set_item(key, value)

File ~/mambaforge/envs/jupyter_dask/lib/python3.11/site-packages/pandas/core/frame.py:4143, in _set_item()
   4133 def _set_item(self, key, value) -> None:
   4134     """
   4135     Add series to DataFrame in specified column.
   4136 
   (...)
   4141     ensure homogeneity.
   4142     """
-> 4143     value = self._sanitize_column(value)
   4145     if (
   4146         key in self.columns
   4147         and value.ndim == 1
   4148         and not is_extension_array_dtype(value)
   4149     ):
   4150         # broadcast across multiple columns if necessary
   4151         if not self.columns.is_unique or isinstance(self.columns, MultiIndex):

File ~/mambaforge/envs/jupyter_dask/lib/python3.11/site-packages/pandas/core/frame.py:4871, in _sanitize_column()
   4869 if is_list_like(value):
   4870     com.require_length_match(value, self.index)
-> 4871 return sanitize_array(value, self.index, copy=True, allow_2d=True)

File ~/mambaforge/envs/jupyter_dask/lib/python3.11/site-packages/pandas/core/construction.py:580, in sanitize_array()
    576         subarr = _try_cast(data, dtype, copy)
    578 elif hasattr(data, "__array__"):
    579     # e.g. dask array GH#38645
--> 580     data = np.array(data, copy=copy)
    581     return sanitize_array(
    582         data,
    583         index=index,
   (...)
    586         allow_2d=allow_2d,
    587     )
    589 else:

File ~/mambaforge/envs/jupyter_dask/lib/python3.11/site-packages/dask/array/core.py:1701, in __array__()
   1700 def __array__(self, dtype=None, **kwargs):
-> 1701     x = self.compute()
   1702     if dtype and x.dtype != dtype:
   1703         x = x.astype(dtype)

File ~/mambaforge/envs/jupyter_dask/lib/python3.11/site-packages/dask/base.py:310, in compute()
    286 def compute(self, **kwargs):
    287     """Compute this dask collection
    288 
    289     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    308     dask.compute
    309     """
--> 310     (result,) = compute(self, traverse=False, **kwargs)
    311     return result

File ~/mambaforge/envs/jupyter_dask/lib/python3.11/site-packages/dask/base.py:583, in compute()
    580 if not collections:
    581     return args
--> 583 schedule = get_scheduler(
    584     scheduler=scheduler,
    585     collections=collections,
    586     get=get,
    587 )
    589 dsk = collections_to_dsk(collections, optimize_graph, **kwargs)
    590 keys, postcomputes = [], []

File ~/mambaforge/envs/jupyter_dask/lib/python3.11/site-packages/dask/base.py:1398, in get_scheduler()
   1394     # else:  # try to connect to remote scheduler with this name
   1395     #     return get_client(scheduler).get
   1397 if config.get("scheduler", None):
-> 1398     return get_scheduler(scheduler=config.get("scheduler", None))
   1400 if config.get("get", None):
   1401     raise ValueError(get_err_msg)

File ~/mambaforge/envs/jupyter_dask/lib/python3.11/site-packages/dask/base.py:1373, in get_scheduler()
   1371 elif scheduler in ("dask.distributed", "distributed"):
   1372     if not client_available:
-> 1373         raise RuntimeError(
   1374             f"Requested {scheduler} scheduler but no Client active."
   1375         )
   1376     from distributed.worker import get_client
   1378     return get_client().get

RuntimeError: Requested dask.distributed scheduler but no Client active.