Open jrbourbeau opened 3 years ago
Got this too: https://github.com/dask/distributed/pull/4937/checks?check_run_id=2888140312#step:10:2213
The
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/multiprocessing/spawn.py", line 116, in spawn_main
exitcode = _main(fd, parent_sentinel)
File "/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/multiprocessing/spawn.py", line 126, in _main
self = reduction.pickle.load(from_parent)
File "/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/multiprocessing/synchronize.py", line 110, in __setstate__
self._semlock = _multiprocessing.SemLock._rebuild(*state)
FileNotFoundError: [Errno 2] No such file or directory
seemingly leading to a Nanny failure and timeout (and subsequent 99% time spent in GC?!) is interesting.
I've seen the GC message in some other "flaky" tests as well, I believe
What's also interesting is, that we only see one error log of Failed to start gen_cluster:
although this section should be retried 60 times. Considering that the nanny times out after about 10s I would've expected to see at least to cluster start failures (gen_cluster timeout is at 30) so the GC might actually block here?
Both links also merely happen on py3.8 but that may be a coincidence.
This has been showing up in several CI builds lately (for example https://github.com/dask/distributed/runs/2814505063).
Traceback:
```python 2021-06-13T17:50:12.3994425Z =================================== FAILURES =================================== 2021-06-13T17:50:12.3995912Z __________________________ test_AllProgress_lost_key ___________________________ 2021-06-13T17:50:12.3996481Z 2021-06-13T17:50:12.3997052Z def test_func(): 2021-06-13T17:50:12.3998014Z result = None 2021-06-13T17:50:12.3998648Z workers = [] 2021-06-13T17:50:12.4000042Z with clean(timeout=active_rpc_timeout, **clean_kwargs) as loop: 2021-06-13T17:50:12.4001589Z 2021-06-13T17:50:12.4002225Z async def coro(): 2021-06-13T17:50:12.4003000Z with dask.config.set(config): 2021-06-13T17:50:12.4003897Z s = False 2021-06-13T17:50:12.4004505Z for _ in range(60): 2021-06-13T17:50:12.4005265Z try: 2021-06-13T17:50:12.4005893Z s, ws = await start_cluster( 2021-06-13T17:50:12.4006594Z nthreads, 2021-06-13T17:50:12.4008058Z scheduler, 2021-06-13T17:50:12.4008686Z loop, 2021-06-13T17:50:12.4009563Z security=security, 2021-06-13T17:50:12.4010208Z Worker=Worker, 2021-06-13T17:50:12.4010913Z scheduler_kwargs=scheduler_kwargs, 2021-06-13T17:50:12.4011916Z worker_kwargs=worker_kwargs, 2021-06-13T17:50:12.4012713Z ) 2021-06-13T17:50:12.4013471Z except Exception as e: 2021-06-13T17:50:12.4014154Z logger.error( 2021-06-13T17:50:12.4014868Z "Failed to start gen_cluster: " 2021-06-13T17:50:12.4015594Z f"{e.__class__.__name__}: {e}; retrying", 2021-06-13T17:50:12.4016275Z exc_info=True, 2021-06-13T17:50:12.4016809Z ) 2021-06-13T17:50:12.4017416Z await asyncio.sleep(1) 2021-06-13T17:50:12.4018030Z else: 2021-06-13T17:50:12.4018584Z workers[:] = ws 2021-06-13T17:50:12.4019185Z args = [s] + workers 2021-06-13T17:50:12.4019739Z break 2021-06-13T17:50:12.4020435Z if s is False: 2021-06-13T17:50:12.4021139Z raise Exception("Could not start cluster") 2021-06-13T17:50:12.4021947Z if client: 2021-06-13T17:50:12.4022511Z c = await Client( 2021-06-13T17:50:12.4023072Z s.address, 2021-06-13T17:50:12.4023821Z loop=loop, 2021-06-13T17:50:12.4024447Z security=security, 2021-06-13T17:50:12.4025096Z asynchronous=True, 2021-06-13T17:50:12.4025737Z **client_kwargs, 2021-06-13T17:50:12.4026274Z ) 2021-06-13T17:50:12.4026801Z args = [c] + args 2021-06-13T17:50:12.4027521Z try: 2021-06-13T17:50:12.4028102Z future = func(*args) 2021-06-13T17:50:12.4028698Z if timeout: 2021-06-13T17:50:12.4029407Z future = asyncio.wait_for(future, timeout) 2021-06-13T17:50:12.4030155Z result = await future 2021-06-13T17:50:12.4030758Z if s.validate: 2021-06-13T17:50:12.4031458Z s.validate_state() 2021-06-13T17:50:12.4036496Z finally: 2021-06-13T17:50:12.4037089Z if client and c.status not in ("closing", "closed"): 2021-06-13T17:50:12.4038195Z await c._close(fast=s.status == Status.closed) 2021-06-13T17:50:12.4038927Z await end_cluster(s, workers) 2021-06-13T17:50:12.4039597Z await asyncio.wait_for(cleanup_global_workers(), 1) 2021-06-13T17:50:12.4040148Z 2021-06-13T17:50:12.4040798Z try: 2021-06-13T17:50:12.4041284Z c = await default_client() 2021-06-13T17:50:12.4041817Z except ValueError: 2021-06-13T17:50:12.4042447Z pass 2021-06-13T17:50:12.4042823Z else: 2021-06-13T17:50:12.4043280Z await c._close(fast=True) 2021-06-13T17:50:12.4043681Z 2021-06-13T17:50:12.4044075Z def get_unclosed(): 2021-06-13T17:50:12.4044666Z return [c for c in Comm._instances if not c.closed()] + [ 2021-06-13T17:50:12.4045196Z c 2021-06-13T17:50:12.4045898Z for c in _global_clients.values() 2021-06-13T17:50:12.4046473Z if c.status != "closed" 2021-06-13T17:50:12.4046909Z ] 2021-06-13T17:50:12.4047238Z 2021-06-13T17:50:12.4047868Z try: 2021-06-13T17:50:12.4048300Z start = time() 2021-06-13T17:50:12.4048922Z while time() < start + 60: 2021-06-13T17:50:12.4050039Z gc.collect() 2021-06-13T17:50:12.4050573Z if not get_unclosed(): 2021-06-13T17:50:12.4051026Z break 2021-06-13T17:50:12.4051538Z await asyncio.sleep(0.05) 2021-06-13T17:50:12.4052039Z else: 2021-06-13T17:50:12.4052485Z if allow_unclosed: 2021-06-13T17:50:12.4053105Z print(f"Unclosed Comms: {get_unclosed()}") 2021-06-13T17:50:12.4053636Z else: 2021-06-13T17:50:12.4054261Z raise RuntimeError("Unclosed Comms", get_unclosed()) 2021-06-13T17:50:12.4054865Z finally: 2021-06-13T17:50:12.4055385Z Comm._instances.clear() 2021-06-13T17:50:12.4055967Z _global_clients.clear() 2021-06-13T17:50:12.4056427Z 2021-06-13T17:50:12.4056805Z return result 2021-06-13T17:50:12.4057201Z 2021-06-13T17:50:12.4057612Z > result = loop.run_sync( 2021-06-13T17:50:12.4058223Z coro, timeout=timeout * 2 if timeout else timeout 2021-06-13T17:50:12.4058750Z ) 2021-06-13T17:50:12.4058975Z 2021-06-13T17:50:12.4059423Z distributed/utils_test.py:966: 2021-06-13T17:50:12.4059938Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 2021-06-13T17:50:12.4061606Z /usr/share/miniconda3/envs/dask-distributed/lib/python3.8/contextlib.py:120: in __exit__ 2021-06-13T17:50:12.4062376Z next(self.gen) 2021-06-13T17:50:12.4062882Z distributed/utils_test.py:1557: in clean 2021-06-13T17:50:12.4063500Z del thread_state.on_event_loop_thread 2021-06-13T17:50:12.4064566Z /usr/share/miniconda3/envs/dask-distributed/lib/python3.8/contextlib.py:120: in __exit__ 2021-06-13T17:50:12.4065324Z next(self.gen) 2021-06-13T17:50:12.4066722Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 2021-06-13T17:50:12.4067066Z 2021-06-13T17:50:12.4067470Z @contextmanager 2021-06-13T17:50:12.4067950Z def check_instances(): 2021-06-13T17:50:12.4068503Z Client._instances.clear() 2021-06-13T17:50:12.4069092Z Worker._instances.clear() 2021-06-13T17:50:12.4069864Z Scheduler._instances.clear() 2021-06-13T17:50:12.4070502Z SpecCluster._instances.clear() 2021-06-13T17:50:12.4071174Z Worker._initialized_clients.clear() 2021-06-13T17:50:12.4072034Z # assert all(n.status == "closed" for n in Nanny._instances), { 2021-06-13T17:50:12.4072726Z # n: n.status for n in Nanny._instances 2021-06-13T17:50:12.4073179Z # } 2021-06-13T17:50:12.4073610Z Nanny._instances.clear() 2021-06-13T17:50:12.4074164Z _global_clients.clear() 2021-06-13T17:50:12.4074690Z Comm._instances.clear() 2021-06-13T17:50:12.4075120Z 2021-06-13T17:50:12.4075435Z yield 2021-06-13T17:50:12.4075772Z 2021-06-13T17:50:12.4076108Z start = time() 2021-06-13T17:50:12.4076723Z while set(_global_clients): 2021-06-13T17:50:12.4077139Z sleep(0.1) 2021-06-13T17:50:12.4077550Z assert time() < start + 10 2021-06-13T17:50:12.4077911Z 2021-06-13T17:50:12.4078313Z _global_clients.clear() 2021-06-13T17:50:12.4078706Z 2021-06-13T17:50:12.4079109Z for w in Worker._instances: 2021-06-13T17:50:12.4079708Z with suppress(RuntimeError): # closed IOLoop 2021-06-13T17:50:12.4081371Z w.loop.add_callback(w.close, report=False, executor_wait=False) 2021-06-13T17:50:12.4090143Z if w.status == Status.running: 2021-06-13T17:50:12.4090730Z w.loop.add_callback(w.close) 2021-06-13T17:50:12.4091313Z Worker._instances.clear() 2021-06-13T17:50:12.4093411Z 2021-06-13T17:50:12.4093778Z start = time() 2021-06-13T17:50:12.4094383Z while any(c.status != "closed" for c in Worker._initialized_clients): 2021-06-13T17:50:12.4096722Z sleep(0.1) 2021-06-13T17:50:12.4097128Z assert time() < start + 10 2021-06-13T17:50:12.4097688Z Worker._initialized_clients.clear() 2021-06-13T17:50:12.4099499Z 2021-06-13T17:50:12.4099838Z for i in range(5): 2021-06-13T17:50:12.4100344Z if all(c.closed() for c in Comm._instances): 2021-06-13T17:50:12.4100811Z break 2021-06-13T17:50:12.4103249Z else: 2021-06-13T17:50:12.4103799Z sleep(0.1) 2021-06-13T17:50:12.4104180Z else: 2021-06-13T17:50:12.4104673Z L = [c for c in Comm._instances if not c.closed()] 2021-06-13T17:50:12.4106740Z Comm._instances.clear() 2021-06-13T17:50:12.4107252Z print("Unclosed Comms", L) 2021-06-13T17:50:12.4107984Z # raise ValueError("Unclosed Comms", L) 2021-06-13T17:50:12.4109982Z 2021-06-13T17:50:12.4110349Z > assert all( 2021-06-13T17:50:12.4111055Z n.status == Status.closed or n.status == Status.init for n in Nanny._instances 2021-06-13T17:50:12.4113249Z ), {n: n.status for n in Nanny._instances} 2021-06-13T17:50:12.4116294Z E AssertionError: {