microsoft / PlanetaryComputerExamples

Examples of using the Planetary Computer
MIT License
369 stars 181 forks source link

unable to initialize dask_cuda LocalCUDACluster #163

Closed dylanrstewart closed 2 years ago

dylanrstewart commented 2 years ago

Using GPU-PyTorch profile, unable to initialize LocalCUDACluster.

It does not fail on import: from dask.distributed import Client from dask_cuda import LocalCUDACluster

It fails on initializing the Cluster: cluster = LocalCUDACluster(threads_per_worker=4)

Full error message:

distributed.diskutils - INFO - Found stale lock file and directory '/home/jovyan/PlanetaryComputerExamples/tutorials/dask-worker-space/worker-o6h81bfq', purging
distributed.preloading - INFO - Import preload module: dask_cuda.initialize
distributed.nanny - ERROR - Failed to start worker
Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/", line 889, in run
    await worker
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/", line 283, in _
    await self.start()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/", line 1502, in start
    await self._register_with_scheduler()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/", line 1198, in _register_with_scheduler
    types={k: typename(v) for k, v in},
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/", line 1198, in <dictcomp>
    types={k: typename(v) for k, v in},
  File "/srv/conda/envs/notebook/lib/python3.8/", line 743, in __iter__
    for key in self._mapping:
RuntimeError: Set changed size during iteration
distributed.nanny - ERROR - Failed while trying to start worker process: Set changed size during iteration
Task exception was never retrieved
future: <Task finished name='Task-22' coro=<_wrap_awaitable() done, defined at /srv/conda/envs/notebook/lib/python3.8/asyncio/> exception=RuntimeError('Set changed size during iteration')>
Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.8/asyncio/", line 695, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/", line 283, in _
    await self.start()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/", line 338, in start
    response = await self.instantiate()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/", line 421, in instantiate
    result = await self.process.start()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/", line 698, in start
    msg = await self._wait_until_connected(uid)
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/", line 817, in _wait_until_connected
    raise msg["exception"]
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/", line 889, in run
    await worker
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/", line 283, in _
    await self.start()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/", line 1502, in start
    await self._register_with_scheduler()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/", line 1198, in _register_with_scheduler
    types={k: typename(v) for k, v in},
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/", line 1198, in <dictcomp>
    types={k: typename(v) for k, v in},
  File "/srv/conda/envs/notebook/lib/python3.8/", line 743, in __iter__
    for key in self._mapping:
RuntimeError: Set changed size during iteration
distributed.diskutils - INFO - Found stale lock file and directory '/home/jovyan/PlanetaryComputerExamples/tutorials/dask-worker-space/worker-fz13r6ye', purging
distributed.preloading - INFO - Import preload module: dask_cuda.initialize
distributed.nanny - ERROR - Failed to start worker
Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/", line 889, in run
    await worker
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/", line 283, in _
    await self.start()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/", line 1502, in start
    await self._register_with_scheduler()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/", line 1198, in _register_with_scheduler
    types={k: typename(v) for k, v in},
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/", line 1198, in <dictcomp>
    types={k: typename(v) for k, v in},
  File "/srv/conda/envs/notebook/lib/python3.8/", line 743, in __iter__
    for key in self._mapping:
RuntimeError: Set changed size during iteration
distributed.nanny - ERROR - Failed while trying to start worker process: Set changed size during iteration
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <zmq.eventloop.ioloop.ZMQIOLoop object at 0x7f338725d970>>, <Task finished name='Task-21' coro=<SpecCluster._correct_state_internal() done, defined at /srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/deploy/> exception=RuntimeError('Set changed size during iteration')>)
Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/", line 741, in _run_callback
    ret = callback()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/", line 765, in _discard_future_result
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/deploy/", line 363, in _correct_state_internal
    await w  # for tornado gen.coroutine support
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/", line 283, in _
    await self.start()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/", line 338, in start
    response = await self.instantiate()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/", line 421, in instantiate
    result = await self.process.start()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/", line 698, in start
    msg = await self._wait_until_connected(uid)
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/", line 817, in _wait_until_connected
    raise msg["exception"]
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/", line 889, in run
    await worker
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/", line 283, in _
    await self.start()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/", line 1502, in start
    await self._register_with_scheduler()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/", line 1198, in _register_with_scheduler
    types={k: typename(v) for k, v in},
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/", line 1198, in <dictcomp>
    types={k: typename(v) for k, v in},
  File "/srv/conda/envs/notebook/lib/python3.8/", line 743, in __iter__
    for key in self._mapping:
RuntimeError: Set changed size during iteration
distributed.diskutils - INFO - Found stale lock file and directory '/home/jovyan/PlanetaryComputerExamples/tutorials/dask-worker-space/worker-zyrogkjh', purging
distributed.preloading - INFO - Import preload module: dask_cuda.initialize
distributed.nanny - ERROR - Failed to start worker
Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/", line 889, in run
    await worker
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/", line 283, in _
    await self.start()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/", line 1502, in start
    await self._register_with_scheduler()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/", line 1198, in _register_with_scheduler
    types={k: typename(v) for k, v in},
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/", line 1198, in <dictcomp>
    types={k: typename(v) for k, v in},
  File "/srv/conda/envs/notebook/lib/python3.8/", line 743, in __iter__
    for key in self._mapping:
RuntimeError: Set changed size during iteration
distributed.nanny - ERROR - Failed while trying to start worker process: Set changed size during iteration
Task exception was never retrieved
future: <Task finished name='Task-47' coro=<_wrap_awaitable() done, defined at /srv/conda/envs/notebook/lib/python3.8/asyncio/> exception=RuntimeError('Set changed size during iteration')>
Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.8/asyncio/", line 695, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/", line 283, in _
    await self.start()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/", line 338, in start
    response = await self.instantiate()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/", line 421, in instantiate
    result = await self.process.start()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/", line 698, in start
    msg = await self._wait_until_connected(uid)
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/", line 817, in _wait_until_connected
    raise msg["exception"]
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/", line 889, in run
    await worker
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/", line 283, in _
    await self.start()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/", line 1502, in start
    await self._register_with_scheduler()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/", line 1198, in _register_with_scheduler
    types={k: typename(v) for k, v in},
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/", line 1198, in <dictcomp>
    types={k: typename(v) for k, v in},
  File "/srv/conda/envs/notebook/lib/python3.8/", line 743, in __iter__
    for key in self._mapping:
RuntimeError: Set changed size during iteration
distributed.diskutils - INFO - Found stale lock file and directory '/home/jovyan/PlanetaryComputerExamples/tutorials/dask-worker-space/worker-jpr2r_q4', purging
distributed.preloading - INFO - Import preload module: dask_cuda.initialize
distributed.nanny - ERROR - Failed to start worker
Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/", line 889, in run
    await worker
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/", line 283, in _
    await self.start()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/", line 1502, in start
    await self._register_with_scheduler()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/", line 1198, in _register_with_scheduler
    types={k: typename(v) for k, v in},
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/", line 1198, in <dictcomp>
    types={k: typename(v) for k, v in},
  File "/srv/conda/envs/notebook/lib/python3.8/", line 743, in __iter__
    for key in self._mapping:
RuntimeError: Set changed size during iteration
distributed.nanny - ERROR - Failed while trying to start worker process: Set changed size during iteration
RuntimeError                              Traceback (most recent call last)
Input In [1], in <module>
      1 from dask.distributed import Client
      2 from dask_cuda import LocalCUDACluster
----> 4 cluster = LocalCUDACluster(threads_per_worker=4)
      5 client = Client(cluster)
      6 print(f"/proxy/{client.scheduler_info()['services']['dashboard']}/status")

File /srv/conda/envs/notebook/lib/python3.8/site-packages/dask_cuda/, in LocalCUDACluster.__init__(self, CUDA_VISIBLE_DEVICES, n_workers, threads_per_worker, memory_limit, device_memory_limit, data, local_directory, protocol, enable_tcp_over_ucx, enable_infiniband, enable_nvlink, enable_rdmacm, ucx_net_devices, rmm_pool_size, rmm_managed_memory, rmm_async, rmm_log_directory, jit_unspill, log_spilling, worker_class, **kwargs)
    359 self.cuda_visible_devices = CUDA_VISIBLE_DEVICES
    360 self.scale(n_workers)
--> 361 self.sync(self._correct_state)

File /srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/deploy/, in Cluster.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    256     return future
    257 else:
--> 258     return sync(self.loop, func, *args, **kwargs)

File /srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/, in sync(loop, func, callback_timeout, *args, **kwargs)
    330 if error[0]:
    331     typ, exc, tb = error[0]
--> 332     raise exc.with_traceback(tb)
    333 else:
    334     return result[0]

File /srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/, in sync.<locals>.f()
    313     if callback_timeout is not None:
    314         future = asyncio.wait_for(future, callback_timeout)
--> 315     result[0] = yield future
    316 except Exception:
    317     error[0] = sys.exc_info()

File /srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/, in
    759 exc_info = None
    761 try:
--> 762     value = future.result()
    763 except Exception:
    764     exc_info = sys.exc_info()

File /srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/deploy/, in SpecCluster._correct_state_internal(self)
    361     for w in workers:
    362         w._cluster = weakref.ref(self)
--> 363         await w  # for tornado gen.coroutine support
    364 self.workers.update(dict(zip(to_open, workers)))

File /srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/, in Server.__await__.<locals>._()
    277             raise TimeoutError(
    278                 "{} failed to start in {} seconds".format(
    279                     type(self).__name__, timeout
    280                 )
    281             )
    282     else:
--> 283         await self.start()
    284         self.status = Status.running
    285 return self

File /srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/, in Nanny.start(self)
    335     await self.plugin_add(plugin=plugin, name=name)
    337"        Start Nanny at: %r", self.address)
--> 338 response = await self.instantiate()
    339 if response == Status.running:
    340     assert self.worker_address

File /srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/, in Nanny.instantiate(self, comm)
    419 else:
    420     try:
--> 421         result = await self.process.start()
    422     except Exception:
    423         await self.close()

File /srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/, in WorkerProcess.start(self)
    696     return self.status
    697 try:
--> 698     msg = await self._wait_until_connected(uid)
    699 except Exception:
    700     self.status = Status.failed

File /srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/, in WorkerProcess._wait_until_connected(self, uid)
    813 if "exception" in msg:
    814     logger.error(
    815         "Failed while trying to start worker process: %s", msg["exception"]
    816     )
--> 817     raise msg["exception"]
    818 else:
    819     return msg

File /srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/, in run()
    885 """
    886 Try to start worker and inform parent of outcome.
    887 """
    888 try:
--> 889     await worker
    890 except Exception as e:
    891     logger.exception("Failed to start worker")

File /srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/, in _()
    277             raise TimeoutError(
    278                 "{} failed to start in {} seconds".format(
    279                     type(self).__name__, timeout
    280                 )
    281             )
    282     else:
--> 283         await self.start()
    284         self.status = Status.running
    285 return self

File /srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/, in start()
   1497 await asyncio.gather(
   1498     *(self.plugin_add(plugin=plugin) for plugin in self._pending_plugins)
   1499 )
   1500 self._pending_plugins = ()
-> 1502 await self._register_with_scheduler()
   1504 self.start_periodic_callbacks()
   1505 return self

File /srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/, in _register_with_scheduler()
   1179 = "Worker->Scheduler"
   1180 comm._server = weakref.ref(self)
   1181 await comm.write(
   1182     dict(
   1183         op="register-worker",
   1184         reply=False,
   1185         address=self.contact_address,
   1187         keys=list(,
   1188         nthreads=self.nthreads,
   1190         nbytes={
   1191             ts.key: ts.get_nbytes()
   1192             for ts in self.tasks.values()
   1193             # Only if the task is in memory this is a sensible
   1194             # result since otherwise it simply submits the
   1195             # default value
   1196             if ts.state == "memory"
   1197         },
-> 1198         types={k: typename(v) for k, v in},
   1199         now=time(),
   1200         resources=self.total_resources,
   1201         memory_limit=self.memory_limit,
   1202         local_directory=self.local_directory,
   1203         services=self.service_ports,
   1204         nanny=self.nanny,
   1205         pid=os.getpid(),
   1206         versions=get_versions(),
   1207         metrics=await self.get_metrics(),
   1208         extra=await self.get_startup_information(),
   1209     ),
   1210     serializers=["msgpack"],
   1211 )
   1212 future =["msgpack"])
   1214 response = await future

File /srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/, in <dictcomp>()
   1179 = "Worker->Scheduler"
   1180 comm._server = weakref.ref(self)
   1181 await comm.write(
   1182     dict(
   1183         op="register-worker",
   1184         reply=False,
   1185         address=self.contact_address,
   1187         keys=list(,
   1188         nthreads=self.nthreads,
   1190         nbytes={
   1191             ts.key: ts.get_nbytes()
   1192             for ts in self.tasks.values()
   1193             # Only if the task is in memory this is a sensible
   1194             # result since otherwise it simply submits the
   1195             # default value
   1196             if ts.state == "memory"
   1197         },
-> 1198         types={k: typename(v) for k, v in},
   1199         now=time(),
   1200         resources=self.total_resources,
   1201         memory_limit=self.memory_limit,
   1202         local_directory=self.local_directory,
   1203         services=self.service_ports,
   1204         nanny=self.nanny,
   1205         pid=os.getpid(),
   1206         versions=get_versions(),
   1207         metrics=await self.get_metrics(),
   1208         extra=await self.get_startup_information(),
   1209     ),
   1210     serializers=["msgpack"],
   1211 )
   1212 future =["msgpack"])
   1214 response = await future

File /srv/conda/envs/notebook/lib/python3.8/, in __iter__()
    742 def __iter__(self):
--> 743     for key in self._mapping:
    744         yield (key, self._mapping[key])

RuntimeError: Set changed size during iteration
TomAugspurger commented 2 years ago

Hmm strange. I am bumping the container images in version in Once I've validated that things are OK in staging then I'll update prod as well.

TomAugspurger commented 2 years ago

This should be fixed by the updated images released to the hub today. LMK if you're still seeing it.