ratt-ru / pfb-imaging

Preconditioned forward/backward clean algorithm
MIT License
7 stars 5 forks source link

Leaky semaphores in jones2col worker #50

Closed landmanbester closed 3 years ago

landmanbester commented 3 years ago

I seem to be running into a leaky semaphore issue. I haven't been able to simplify the reproducer but I thought maybe @JSKenyon or @sjperkins might have some ideas. This happens when I try to apply a QuartiCal gain table to a column of all ones in a measurement set. It seems to run when the number of correlations in the MS matches that in the gain table but in this case I have an MS with 4 correlations while the gain table only has 2 (i.e. I only solved for correlation 0 and 3). I am getting the following when I try to run the jones2col worker

(pfbenv) bester@oates:~/projects/ESO137/pfb$ pfb jones2col -ms ../msdir/1557347448_J1939_6342_scan1and3_chan950to1050.ms -mc CORRECTED_DATA -gt ../qcal/complex_cballmodel.qc/gains.qc -o j2col -ntpw 4 -nthreads 4
INFO      16:34:01 - J2COL              | Initialising client with LocalCluster.
INFO      16:34:02 - J2COL              | Input Options:
INFO      16:34:02 - J2COL              |                             ms = ['../msdir/1557347448_J1939_6342_scan1and3_chan950to1050.ms']
INFO      16:34:02 - J2COL              |                 mueller_column = CORRECTED_DATA
INFO      16:34:02 - J2COL              |                     gain_table = ../qcal/complex_cballmodel.qc/gains.qc
INFO      16:34:02 - J2COL              |                output_filename = j2col
INFO      16:34:02 - J2COL              |            nthreads_per_worker = 4
INFO      16:34:02 - J2COL              |                       nthreads = 4
INFO      16:34:02 - J2COL              |                           acol = None
INFO      16:34:02 - J2COL              |                      compareto = None
INFO      16:34:02 - J2COL              |                   host_address = None
INFO      16:34:02 - J2COL              |                       nworkers = 1
INFO      16:34:02 - J2COL              |                      nvthreads = 1
INFO      16:34:02 - J2COL              |                      mem_limit = 532

/usr/lib/python3.7/multiprocessing/semaphore_tracker.py:144: UserWarning: semaphore_tracker: There appear to be 1 leaked semaphores to clean up at shutdown
  len(cache))
distributed.nanny - WARNING - Restarting worker

This is on oates but I get exactly the same error locally. Suggestions on what could be going wrong or how to debug this will be much appreciated. In the meantime, I will try to simplify the reproducer. This is the stokes_products branch here

sjperkins commented 3 years ago

I haven't seen that before. Have your run DDFacet or Cubical on the same machine with semaphores left open?

landmanbester commented 3 years ago

Nope, and definitely haven't run either on my laptop which gives the same error

landmanbester commented 3 years ago

Interestingly, if I don't do

client.run_on_scheduler(install_plugin)

I get a more verbose output viz.

(pfb) LAPTOP-ED4N50Q2 :: ESO137/subsets/pfb » pfb jones2col -ms ../1557347448_J1939_6342_scan1and3_chan950to1050.ms -mc CORRECTED_DATA -gt ../qcal/complex_cballmodel.qc/gains.qc -o j2col -ntpw 4 -nthreads 4 1 ↵
INFO      11:01:15 - J2COL              | Initialising client with LocalCluster.
INFO      11:01:17 - J2COL              | Input Options:
INFO      11:01:17 - J2COL              |                             ms = ['../1557347448_J1939_6342_scan1and3_chan950to1050.ms']
INFO      11:01:17 - J2COL              |                 mueller_column = CORRECTED_DATA
INFO      11:01:17 - J2COL              |                     gain_table = ../qcal/complex_cballmodel.qc/gains.qc
INFO      11:01:17 - J2COL              |                output_filename = j2col
INFO      11:01:17 - J2COL              |            nthreads_per_worker = 4
INFO      11:01:17 - J2COL              |                       nthreads = 4
INFO      11:01:17 - J2COL              |                           acol = None
INFO      11:01:17 - J2COL              |                      compareto = None
INFO      11:01:17 - J2COL              |                   host_address = None
INFO      11:01:17 - J2COL              |                       nworkers = 1
INFO      11:01:17 - J2COL              |                      nvthreads = 1
INFO      11:01:17 - J2COL              |                      mem_limit = 18
/usr/lib/python3.7/multiprocessing/semaphore_tracker.py:144: UserWarning: semaphore_tracker: There appear to be 1 leaked semaphores to clean up at shutdown
  len(cache))
distributed.scheduler - ERROR - Couldn't gather keys {"('write~CORRECTED_DATA-1557347448_J1939_6342_scan1and3_chan950to1050.ms-752fc201421bb0287e87a1e3b2f58a87', 0, 0, 0)": ['tcp://127.0.0.1:36339'], "('write~CORRECTED_DATA-1557347448_J1939_6342_scan1and3_chan950to1050.ms-175285bf673ed9d1de8bc36c7379847f', 0, 0, 0)": ['tcp://127.0.0.1:36339']} state: ['waiting', 'waiting'] workers: ['tcp://127.0.0.1:36339']
NoneType: None
distributed.scheduler - ERROR - Workers don't have promised key: ['tcp://127.0.0.1:36339'], ('write~CORRECTED_DATA-1557347448_J1939_6342_scan1and3_chan950to1050.ms-752fc201421bb0287e87a1e3b2f58a87', 0, 0, 0)
NoneType: None
distributed.scheduler - ERROR - Workers don't have promised key: ['tcp://127.0.0.1:36339'], ('write~CORRECTED_DATA-1557347448_J1939_6342_scan1and3_chan950to1050.ms-175285bf673ed9d1de8bc36c7379847f', 0, 0, 0)
NoneType: None
distributed.client - WARNING - Couldn't gather 2 keys, rescheduling {"('write~CORRECTED_DATA-1557347448_J1939_6342_scan1and3_chan950to1050.ms-752fc201421bb0287e87a1e3b2f58a87', 0, 0, 0)": ('tcp://127.0.0.1:36339',), "('write~CORRECTED_DATA-1557347448_J1939_6342_scan1and3_chan950to1050.ms-175285bf673ed9d1de8bc36c7379847f', 0, 0, 0)": ('tcp://127.0.0.1:36339',)}
distributed.nanny - WARNING - Restarting worker
/usr/lib/python3.7/multiprocessing/semaphore_tracker.py:144: UserWarning: semaphore_tracker: There appear to be 1 leaked semaphores to clean up at shutdown
  len(cache))
distributed.nanny - WARNING - Restarting worker
/usr/lib/python3.7/multiprocessing/semaphore_tracker.py:144: UserWarning: semaphore_tracker: There appear to be 1 leaked semaphores to clean up at shutdown
  len(cache))
distributed.nanny - WARNING - Restarting worker
/usr/lib/python3.7/multiprocessing/semaphore_tracker.py:144: UserWarning: semaphore_tracker: There appear to be 1 leaked semaphores to clean up at shutdown
  len(cache))
distributed.nanny - WARNING - Restarting worker
/usr/lib/python3.7/multiprocessing/semaphore_tracker.py:144: UserWarning: semaphore_tracker: There appear to be 1 leaked semaphores to clean up at shutdown
  len(cache))
distributed.nanny - WARNING - Restarting worker
distributed.core - ERROR - None
Traceback (most recent call last):
  File "/home/landman/venvs/pfb/lib/python3.7/site-packages/distributed/core.py", line 575, in handle_stream
    handler(**merge(extra, msg))
  File "/home/landman/venvs/pfb/lib/python3.7/site-packages/distributed/scheduler.py", line 5010, in client_releases_keys
    self.transitions(recommendations)
  File "/home/landman/venvs/pfb/lib/python3.7/site-packages/distributed/scheduler.py", line 7054, in transitions
    self.send_all(client_msgs, worker_msgs)
  File "/home/landman/venvs/pfb/lib/python3.7/site-packages/distributed/scheduler.py", line 5536, in send_all
    w = stream_comms[worker]
KeyError: None
distributed.core - ERROR - Exception while handling op register-client
Traceback (most recent call last):
  File "/home/landman/venvs/pfb/lib/python3.7/site-packages/distributed/scheduler.py", line 5243, in add_client
    await self.handle_stream(comm=comm, extra={"client": client})
  File "/home/landman/venvs/pfb/lib/python3.7/site-packages/distributed/core.py", line 575, in handle_stream
    handler(**merge(extra, msg))
  File "/home/landman/venvs/pfb/lib/python3.7/site-packages/distributed/scheduler.py", line 5010, in client_releases_keys
    self.transitions(recommendations)
  File "/home/landman/venvs/pfb/lib/python3.7/site-packages/distributed/scheduler.py", line 7054, in transitions
    self.send_all(client_msgs, worker_msgs)
  File "/home/landman/venvs/pfb/lib/python3.7/site-packages/distributed/scheduler.py", line 5536, in send_all
    w = stream_comms[worker]
KeyError: None

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/landman/venvs/pfb/lib/python3.7/site-packages/distributed/core.py", line 502, in handle_comm
    result = await result
  File "/home/landman/venvs/pfb/lib/python3.7/site-packages/distributed/scheduler.py", line 5245, in add_client
    self.remove_client(client=client)
  File "/home/landman/venvs/pfb/lib/python3.7/site-packages/distributed/scheduler.py", line 5273, in remove_client
    keys=[ts._key for ts in cs._wants_what], client=cs._client_key
  File "/home/landman/venvs/pfb/lib/python3.7/site-packages/distributed/scheduler.py", line 5010, in client_releases_keys
    self.transitions(recommendations)
  File "/home/landman/venvs/pfb/lib/python3.7/site-packages/distributed/scheduler.py", line 7054, in transitions
    self.send_all(client_msgs, worker_msgs)
  File "/home/landman/venvs/pfb/lib/python3.7/site-packages/distributed/scheduler.py", line 5536, in send_all
    w = stream_comms[worker]
KeyError: None
tornado.application - ERROR - Exception in callback functools.partial(<function TCPServer._handle_connection.<locals>.<lambda> at 0x7f31c41c8dd0>, <Task finished coro=<BaseTCPListener._handle_stream() done, defined at /home/landman/venvs/pfb/lib/python3.7/site-packages/distributed/comm/tcp.py:493> exception=KeyError(None)>)
Traceback (most recent call last):
  File "/home/landman/venvs/pfb/lib/python3.7/site-packages/distributed/scheduler.py", line 5243, in add_client
    await self.handle_stream(comm=comm, extra={"client": client})
  File "/home/landman/venvs/pfb/lib/python3.7/site-packages/distributed/core.py", line 575, in handle_stream
    handler(**merge(extra, msg))
  File "/home/landman/venvs/pfb/lib/python3.7/site-packages/distributed/scheduler.py", line 5010, in client_releases_keys
    self.transitions(recommendations)
  File "/home/landman/venvs/pfb/lib/python3.7/site-packages/distributed/scheduler.py", line 7054, in transitions
    self.send_all(client_msgs, worker_msgs)
  File "/home/landman/venvs/pfb/lib/python3.7/site-packages/distributed/scheduler.py", line 5536, in send_all
    w = stream_comms[worker]
KeyError: None

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/landman/venvs/pfb/lib/python3.7/site-packages/tornado/ioloop.py", line 741, in _run_callback
    ret = callback()
  File "/home/landman/venvs/pfb/lib/python3.7/site-packages/tornado/tcpserver.py", line 331, in <lambda>
    gen.convert_yielded(future), lambda f: f.result()
  File "/home/landman/venvs/pfb/lib/python3.7/site-packages/distributed/comm/tcp.py", line 510, in _handle_stream
    await self.comm_handler(comm)
  File "/home/landman/venvs/pfb/lib/python3.7/site-packages/distributed/core.py", line 502, in handle_comm
    result = await result
  File "/home/landman/venvs/pfb/lib/python3.7/site-packages/distributed/scheduler.py", line 5245, in add_client
    self.remove_client(client=client)
  File "/home/landman/venvs/pfb/lib/python3.7/site-packages/distributed/scheduler.py", line 5273, in remove_client
    keys=[ts._key for ts in cs._wants_what], client=cs._client_key
  File "/home/landman/venvs/pfb/lib/python3.7/site-packages/distributed/scheduler.py", line 5010, in client_releases_keys
    self.transitions(recommendations)
  File "/home/landman/venvs/pfb/lib/python3.7/site-packages/distributed/scheduler.py", line 7054, in transitions
    self.send_all(client_msgs, worker_msgs)
  File "/home/landman/venvs/pfb/lib/python3.7/site-packages/distributed/scheduler.py", line 5536, in send_all
    w = stream_comms[worker]
KeyError: None
Traceback (most recent call last):
  File "/home/landman/venvs/pfb/bin/pfb", line 11, in <module>
    load_entry_point('pfb-clean', 'console_scripts', 'pfb')()
  File "/home/landman/venvs/pfb/lib/python3.7/site-packages/click/core.py", line 1137, in __call__
    return self.main(*args, **kwargs)
  File "/home/landman/venvs/pfb/lib/python3.7/site-packages/click/core.py", line 1062, in main
    rv = self.invoke(ctx)
  File "/home/landman/venvs/pfb/lib/python3.7/site-packages/click/core.py", line 1668, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/home/landman/venvs/pfb/lib/python3.7/site-packages/click/core.py", line 1404, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/home/landman/venvs/pfb/lib/python3.7/site-packages/click/core.py", line 763, in invoke
    return __callback(*args, **kwargs)
  File "/home/landman/Projects/pfb-clean/pfb/workers/misc/jones2col.py", line 63, in jones2col
    return _jones2col(**args)
  File "/home/landman/Projects/pfb-clean/pfb/workers/misc/jones2col.py", line 176, in _jones2col
    dask.compute(writes)
  File "/home/landman/venvs/pfb/lib/python3.7/site-packages/dask/base.py", line 568, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/home/landman/venvs/pfb/lib/python3.7/site-packages/distributed/client.py", line 2671, in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
  File "/home/landman/venvs/pfb/lib/python3.7/site-packages/distributed/client.py", line 1954, in gather
    asynchronous=asynchronous,
  File "/home/landman/venvs/pfb/lib/python3.7/site-packages/distributed/client.py", line 846, in sync
    self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
  File "/home/landman/venvs/pfb/lib/python3.7/site-packages/distributed/utils.py", line 326, in sync
    raise exc.with_traceback(tb)
  File "/home/landman/venvs/pfb/lib/python3.7/site-packages/distributed/utils.py", line 309, in f
    result[0] = yield future
  File "/home/landman/venvs/pfb/lib/python3.7/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/home/landman/venvs/pfb/lib/python3.7/site-packages/distributed/client.py", line 1813, in _gather
    raise exception.with_traceback(traceback)
distributed.scheduler.KilledWorker: ("('corrupt_vis_wrapper-reshape-write~CORRECTED_DATA-1557347448_J1939_6342_scan1and3_chan950to1050.ms-175285bf673ed9d1d-7e35', 0, 0, 0)", <WorkerState 'tcp://127.0.0.1:40195', name: 0, memory: 0, processing: 2>)

Not sure if that is useful information or not. Busy tryin to test what happens if I the threaded scheduler instead of a local cluster

sjperkins commented 3 years ago

This looks like a known (but unsolved!) issue

https://github.com/dask/distributed/issues/1078

It seems to pop up a fair bit on other repos too

JSKenyon commented 3 years ago

Did you notice that one response was related to Zarr? I wonder if that may be the root cause in our case. I also see this occasionally in QuartiCal. I will try some experiments tomorrow.

landmanbester commented 3 years ago

This looks like a known (but unsolved!) issue

dask/distributed#1078

It seems to pop up a fair bit on other repos too

Saw this, thanks. I think I may have narrowed it down. I managed a segfault when running with the threaded scheduler. Diving into the core file I see this

#0  0x00007fc33041d797 in africanus::calibration::utils::corrupt_vis::corrupt_vis::$3clocals$3e::_corrupt_vis_fn$241(Array<int, 1, C, mutable, aligned>, Array<int, 1, C, mutable, aligned>, Array<int, 1, C, mutable, aligned>, Array<int, 1, C, mutable, aligned>, Array<complex128, 5, A, mutable, aligned>, Array<complex128, 5, A, readonly, aligned>) ()
[Current thread is 1 (Thread 0x7fc2eaffd700 (LWP 1964))]
(gdb) bt
#0  0x00007fc33041d797 in africanus::calibration::utils::corrupt_vis::corrupt_vis::$3clocals$3e::_corrupt_vis_fn$241(Array<int, 1, C, mutable, aligned>, Array<int, 1, C, mutable, aligned>, Array<int, 1, C, mutable, aligned>, Array<int, 1, C, mutable, aligned>, Array<complex128, 5, A, mutable, aligned>, Array<complex128, 5, A, readonly, aligned>) ()
#1  0x00007fc33041e3ab in cpython::africanus::calibration::utils::corrupt_vis::corrupt_vis::$3clocals$3e::_corrupt_vis_fn$241(Array<int, 1, C, mutable, aligned>, Array<int, 1, C, mutable, aligned>, Array<int, 1, C, mutable, aligned>, Array<int, 1, C, mutable, aligned>, Array<complex128, 5, A, mutable, aligned>, Array<complex128, 5, A, readonly, aligned>) ()
#2  0x00007fc31a9ebf6c in call_cfunc (self=self@entry=0x7fc302cd0cb0, cfunc=cfunc@entry=0x7fc2e917d0a0, args=args@entry=0x7fc302bf5a60, kws=kws@entry=0x0, locals=locals@entry=0x0) at numba/_dispatcher.cpp:437
#3  0x00007fc31a9ec39c in compile_and_invoke (self=self@entry=0x7fc302cd0cb0, args=0x7fc302bf5a60, kws=0x0, locals=locals@entry=0x0) at numba/_dispatcher.cpp:460
#4  0x00007fc31a9ec6bd in Dispatcher_call (self=0x7fc302cd0cb0, args=<optimized out>, kws=<optimized out>) at numba/_dispatcher.cpp:739

so something is happening in the africanus corrupt_vis function. Busy trying to track it down

landmanbester commented 3 years ago

Found it. Definitely a developer error and not a problem with dask or numba. Out of bounds allocation because I forgot to iterate through my index sets by dataset. Numba developer mode + bounds check to the rescue. Thanks guys