AstroHuntsman / huntsman-drp

Imaging pipeline tools and flow.
MIT License
0 stars 2 forks source link

`CalexpQualityMonitor` worker processes hang if `processCcdTask` fails #148

Closed danjampro closed 3 years ago

danjampro commented 3 years ago

This appears to be the case following the exception described here: #147

The CalexpQualityMonitor reports that it is running correctly but the number of processed files does not change:

2021-06-02 04:32:49.528 | INFO     | huntsman.drp.services.base:_async_monitor_status:231 - CalexpQualityMonitor status: {'status_thread': True, 'queue_thread': True, 'process_thread': True, 'processed': 57, 'failed': 57, 'input_queue': 10071, 'output_queue': 0}

An exception did get raised inside the worker process:

2021-06-01 06:29:28.211 | ERROR    | huntsman.drp.services.base:_wrap_process_func:58 - Exception while processing {'filter': 'g_band', 'dataType': 'science', 'filename': '/data/nifi/huntsman_priv/images/fields/Frb190711/2d194b0013090900/20200914T131035/20200914T140817.fits.fz', 'taiObs': '2020-09-14T14:08:21.333(UTC)', 'expId': 'PAN000_2d194b0013090900_20200914T140817', 'expTime': 1.0, 'field': 'FRB190711', 'visit': 20200914140821333, 'ccdTemp': 0.5, 'ccd': 9, 'dateObs': '2020-09-14'}: IndexError('list index out of range')

It is not clear why processing of the files has stopped.

danjampro commented 3 years ago

It may not actually be related to processCcdTask failing.

danjampro commented 3 years ago

Possibly related (after running reduction on 10 procs): EDIT: This could have been caused by me terminating the process before the code had finished running.

processCcd.charImage.measurePsf.reserve INFO: Reserved 0/9105 sources
processCcd.charImage.measurePsf INFO: Sending 9105 candidates to PSF determiner
processCcd.charImage.measurePsf.psfDeterminer WARN: NOT scaling kernelSize by stellar quadrupole moment, but using absolute value
processCcd.calibrate.deblend INFO: Deblended: of 32797 sources, 13471 were deblended, creating 49338 children, total 82135 sources
processCcd.calibrate.deblend INFO: Deblended: of 30797 sources, 12196 were deblended, creating 42776 children, total 73573 sources
processCcd.calibrate.measurement INFO: Measuring 82135 sources (32797 parents, 49338 children)
processCcd.calibrate.measurement INFO: Measuring 73573 sources (30797 parents, 42776 children)
processCcd.calibrate.applyApCorr INFO: Applying aperture corrections to 2 instFlux fields
processCcd.calibrate INFO: Copying flags from icSourceCat to sourceCat for 30656 sources
processCcd.calibrate.photoCal.match.sourceSelection INFO: Selected 71833/73573 sources
processCcd.calibrate.photoCal.match.referenceSelection INFO: Selected 1057/1057 references
processCcd.calibrate.photoCal.match INFO: Matched 725 from 71833/73573 input and 1057/1057 reference sources
processCcd.calibrate.photoCal.reserve INFO: Reserved 0/725 sources
processCcd.calibrate.photoCal INFO: Not applying color terms because config.applyColorTerms is None and data is not available and photoRefCat is provided
processCcd.calibrate.photoCal INFO: Magnitude zero point: 25.420483 +/- 0.000423 from 434 stars
processCcd.calibrate INFO: Photometric zero-point: 25.420483
processCcd.calibrate.applyApCorr INFO: Applying aperture corrections to 2 instFlux fields
processCcd.calibrate INFO: Copying flags from icSourceCat to sourceCat for 32658 sources
processCcd.calibrate.photoCal.match.sourceSelection INFO: Selected 80223/82135 sources
processCcd.calibrate.photoCal.match.referenceSelection INFO: Selected 1057/1057 references
processCcd.calibrate.photoCal.match INFO: Matched 721 from 80223/82135 input and 1057/1057 reference sources
processCcd.calibrate.photoCal.reserve INFO: Reserved 0/721 sources
processCcd.calibrate.photoCal INFO: Not applying color terms because config.applyColorTerms is None and data is not available and photoRefCat is provided
processCcd.calibrate.photoCal INFO: Magnitude zero point: 25.347488 +/- 0.000425 from 456 stars
processCcd.calibrate INFO: Photometric zero-point: 25.347488

^CProcess ForkPoolWorker-39:
Process ForkPoolWorker-31:
Process ForkPoolWorker-36:
Process ForkPoolWorker-37:
Process ForkPoolWorker-35:
Process ForkPoolWorker-32:
Process ForkPoolWorker-33:
Process ForkPoolWorker-38:
Process ForkPoolWorker-34:
---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
<ipython-input-1-fe2b48b65298> in <module>
      3 reduction = create_from_file("/opt/lsst/software/stack/huntsman-drp/config/reductions/norma.yml")
      4
----> 5 reduction.prepare(); reduction.reduce()

/opt/lsst/software/stack/huntsman-drp/src/huntsman/drp/reduction/lsst.py in reduce(self)
     50
     51         self.logger.info(f"Making calexps for {len(self.science_docs)} science images.")
---> 52         self._butler_repo.make_calexps(dataIds=dataIds, **self._calexp_kwargs)
     53
     54         self.logger.info(f"Making coadds from {len(self.science_docs)} calexps.")

/opt/lsst/software/stack/huntsman-drp/src/huntsman/drp/lsst/butler.py in make_calexps(self, dataIds, rerun, **kwargs)
    359         # Process the science frames in parallel using LSST taskRunner
    360         tasks.make_calexps(dataIds, rerun=rerun, butler_dir=self.butler_dir,
--> 361                            calib_dir=self.calib_dir, doReturnResults=False, **kwargs)
    362
    363         # Check if we have the right number of calexps

/opt/lsst/software/stack/huntsman-drp/src/huntsman/drp/lsst/tasks/__init__.py in make_calexps(dataIds, rerun, butler_dir, calib_dir, procs, clobber_config, doReturnResults, ext
ra_config, **kwargs)
    170
    171     result = utils.run_cmdline_task(HuntsmanProcessCcdTask, cmd.split(),
--> 172                                     doReturnResults=doReturnResults, **kwargs)
    173
    174     if doReturnResults:

/opt/lsst/software/stack/huntsman-drp/src/huntsman/drp/lsst/utils/task.py in run_cmdline_task(Task, args, config, log, doReturnResults, **kwargs)
     84     """
     85     results = Task.parseAndRun(args=args, config=config, log=log, doReturnResults=doReturnResults,
---> 86                                **kwargs)
     87                                                                                                                                                              [130/90948]
     88     return results

/opt/lsst/software/stack/stack/miniconda3-py37_4.8.2-cb4e2dc/Linux64/pipe_base/21.0.0+544a109665/python/lsst/pipe/base/cmdLineTask.py in parseAndRun(cls, args, config, log, doR
eturnResults)
    676
    677         taskRunner = cls.RunnerClass(TaskClass=cls, parsedCmd=parsedCmd, doReturnResults=doReturnResults)
--> 678         resultList = taskRunner.run(parsedCmd)
    679
    680         try:

/opt/lsst/software/stack/stack/miniconda3-py37_4.8.2-cb4e2dc/Linux64/pipe_base/21.0.0+544a109665/python/lsst/pipe/base/cmdLineTask.py in run(self, parsedCmd)
    239                 with profile(profileName, log):
    240                     # Run the task using self.__call__
--> 241                     resultList = list(mapFunc(self, targetList))
    242             else:
    243                 log.warn("Not running the task because there is no data to process; "

/opt/lsst/software/stack/stack/miniconda3-py37_4.8.2-cb4e2dc/Linux64/pipe_base/21.0.0+544a109665/python/lsst/pipe/base/cmdLineTask.py in _runPool(pool, timeout, function, itera
ble)
     44     http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool
     45     """
---> 46     return pool.map_async(function, iterable).get(timeout)
     47
     48

/opt/lsst/software/stack/conda/miniconda3-py37_4.8.2/envs/lsst-scipipe-cb4e2dc/lib/python3.7/multiprocessing/pool.py in get(self, timeout)
    649
    650     def get(self, timeout=None):
--> 651         self.wait(timeout)
    652         if not self.ready():
    653             raise TimeoutError

/opt/lsst/software/stack/conda/miniconda3-py37_4.8.2/envs/lsst-scipipe-cb4e2dc/lib/python3.7/multiprocessing/pool.py in wait(self, timeout)
    646
    647     def wait(self, timeout=None):
--> 648         self._event.wait(timeout)
    649
    650     def get(self, timeout=None):

/opt/lsst/software/stack/conda/miniconda3-py37_4.8.2/envs/lsst-scipipe-cb4e2dc/lib/python3.7/threading.py in wait(self, timeout)
    550             signaled = self._flag
    551             if not signaled:
--> 552                 signaled = self._cond.wait(timeout)                                                                                                           [87/90948]
    553             return signaled
    554

/opt/lsst/software/stack/conda/miniconda3-py37_4.8.2/envs/lsst-scipipe-cb4e2dc/lib/python3.7/threading.py in wait(self, timeout)
    298             else:
    299                 if timeout > 0:
--> 300                     gotit = waiter.acquire(True, timeout)
    301                 else:
    302                     gotit = waiter.acquire(False)

KeyboardInterrupt:

In [2]: Traceback (most recent call last):
Traceback (most recent call last):
  File "/opt/lsst/software/stack/conda/miniconda3-py37_4.8.2/envs/lsst-scipipe-cb4e2dc/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
Traceback (most recent call last):
  File "/opt/lsst/software/stack/conda/miniconda3-py37_4.8.2/envs/lsst-scipipe-cb4e2dc/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/lsst/software/stack/conda/miniconda3-py37_4.8.2/envs/lsst-scipipe-cb4e2dc/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/opt/lsst/software/stack/conda/miniconda3-py37_4.8.2/envs/lsst-scipipe-cb4e2dc/lib/python3.7/multiprocessing/queues.py", line 351, in get
    with self._rlock:
  File "/opt/lsst/software/stack/conda/miniconda3-py37_4.8.2/envs/lsst-scipipe-cb4e2dc/lib/python3.7/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
  File "/opt/lsst/software/stack/conda/miniconda3-py37_4.8.2/envs/lsst-scipipe-cb4e2dc/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
KeyboardInterrupt
  File "/opt/lsst/software/stack/conda/miniconda3-py37_4.8.2/envs/lsst-scipipe-cb4e2dc/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/lsst/software/stack/conda/miniconda3-py37_4.8.2/envs/lsst-scipipe-cb4e2dc/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/opt/lsst/software/stack/conda/miniconda3-py37_4.8.2/envs/lsst-scipipe-cb4e2dc/lib/python3.7/multiprocessing/queues.py", line 351, in get
    with self._rlock:
Traceback (most recent call last):
  File "/opt/lsst/software/stack/conda/miniconda3-py37_4.8.2/envs/lsst-scipipe-cb4e2dc/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
Traceback (most recent call last):
  File "/opt/lsst/software/stack/conda/miniconda3-py37_4.8.2/envs/lsst-scipipe-cb4e2dc/lib/python3.7/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
  File "/opt/lsst/software/stack/conda/miniconda3-py37_4.8.2/envs/lsst-scipipe-cb4e2dc/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/lsst/software/stack/conda/miniconda3-py37_4.8.2/envs/lsst-scipipe-cb4e2dc/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/opt/lsst/software/stack/conda/miniconda3-py37_4.8.2/envs/lsst-scipipe-cb4e2dc/lib/python3.7/multiprocessing/queues.py", line 351, in get
    with self._rlock:
  File "/opt/lsst/software/stack/conda/miniconda3-py37_4.8.2/envs/lsst-scipipe-cb4e2dc/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
KeyboardInterrupt
  File "/opt/lsst/software/stack/conda/miniconda3-py37_4.8.2/envs/lsst-scipipe-cb4e2dc/lib/python3.7/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
  File "/opt/lsst/software/stack/conda/miniconda3-py37_4.8.2/envs/lsst-scipipe-cb4e2dc/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/lsst/software/stack/conda/miniconda3-py37_4.8.2/envs/lsst-scipipe-cb4e2dc/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/opt/lsst/software/stack/conda/miniconda3-py37_4.8.2/envs/lsst-scipipe-cb4e2dc/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/opt/lsst/software/stack/conda/miniconda3-py37_4.8.2/envs/lsst-scipipe-cb4e2dc/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/lsst/software/stack/conda/miniconda3-py37_4.8.2/envs/lsst-scipipe-cb4e2dc/lib/python3.7/multiprocessing/queues.py", line 351, in get
    with self._rlock:
  File "/opt/lsst/software/stack/conda/miniconda3-py37_4.8.2/envs/lsst-scipipe-cb4e2dc/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/opt/lsst/software/stack/conda/miniconda3-py37_4.8.2/envs/lsst-scipipe-cb4e2dc/lib/python3.7/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
  File "/opt/lsst/software/stack/conda/miniconda3-py37_4.8.2/envs/lsst-scipipe-cb4e2dc/lib/python3.7/multiprocessing/queues.py", line 351, in get
    with self._rlock:
KeyboardInterrupt
  File "/opt/lsst/software/stack/conda/miniconda3-py37_4.8.2/envs/lsst-scipipe-cb4e2dc/lib/python3.7/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt
KeyboardInterrupt
Traceback (most recent call last):
Traceback (most recent call last):
  File "/opt/lsst/software/stack/conda/miniconda3-py37_4.8.2/envs/lsst-scipipe-cb4e2dc/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/opt/lsst/software/stack/conda/miniconda3-py37_4.8.2/envs/lsst-scipipe-cb4e2dc/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
Traceback (most recent call last):
  File "/opt/lsst/software/stack/conda/miniconda3-py37_4.8.2/envs/lsst-scipipe-cb4e2dc/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/opt/lsst/software/stack/conda/miniconda3-py37_4.8.2/envs/lsst-scipipe-cb4e2dc/lib/python3.7/multiprocessing/queues.py", line 351, in get
    with self._rlock:
  File "/opt/lsst/software/stack/conda/miniconda3-py37_4.8.2/envs/lsst-scipipe-cb4e2dc/lib/python3.7/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
  File "/opt/lsst/software/stack/conda/miniconda3-py37_4.8.2/envs/lsst-scipipe-cb4e2dc/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/opt/lsst/software/stack/conda/miniconda3-py37_4.8.2/envs/lsst-scipipe-cb4e2dc/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/opt/lsst/software/stack/conda/miniconda3-py37_4.8.2/envs/lsst-scipipe-cb4e2dc/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
KeyboardInterrupt
  File "/opt/lsst/software/stack/conda/miniconda3-py37_4.8.2/envs/lsst-scipipe-cb4e2dc/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/lsst/software/stack/conda/miniconda3-py37_4.8.2/envs/lsst-scipipe-cb4e2dc/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/opt/lsst/software/stack/conda/miniconda3-py37_4.8.2/envs/lsst-scipipe-cb4e2dc/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/opt/lsst/software/stack/conda/miniconda3-py37_4.8.2/envs/lsst-scipipe-cb4e2dc/lib/python3.7/multiprocessing/queues.py", line 351, in get
    with self._rlock:
  File "/opt/lsst/software/stack/conda/miniconda3-py37_4.8.2/envs/lsst-scipipe-cb4e2dc/lib/python3.7/multiprocessing/queues.py", line 351, in get
    with self._rlock:
  File "/opt/lsst/software/stack/conda/miniconda3-py37_4.8.2/envs/lsst-scipipe-cb4e2dc/lib/python3.7/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
  File "/opt/lsst/software/stack/conda/miniconda3-py37_4.8.2/envs/lsst-scipipe-cb4e2dc/lib/python3.7/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt
KeyboardInterrupt
Traceback (most recent call last):
  File "/opt/lsst/software/stack/conda/miniconda3-py37_4.8.2/envs/lsst-scipipe-cb4e2dc/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/opt/lsst/software/stack/conda/miniconda3-py37_4.8.2/envs/lsst-scipipe-cb4e2dc/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/lsst/software/stack/conda/miniconda3-py37_4.8.2/envs/lsst-scipipe-cb4e2dc/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/opt/lsst/software/stack/conda/miniconda3-py37_4.8.2/envs/lsst-scipipe-cb4e2dc/lib/python3.7/multiprocessing/queues.py", line 352, in get
    res = self._reader.recv_bytes()
  File "/opt/lsst/software/stack/conda/miniconda3-py37_4.8.2/envs/lsst-scipipe-cb4e2dc/lib/python3.7/multiprocessing/connection.py", line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File "/opt/lsst/software/stack/conda/miniconda3-py37_4.8.2/envs/lsst-scipipe-cb4e2dc/lib/python3.7/multiprocessing/connection.py", line 407, in _recv_bytes
    buf = self._recv(4)
  File "/opt/lsst/software/stack/conda/miniconda3-py37_4.8.2/envs/lsst-scipipe-cb4e2dc/lib/python3.7/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
KeyboardInterrupt