askap-vast / vast-pipeline

This repository holds the code of the Radio Transient detection pipeline for the VAST project.
https://vast-survey.org/vast-pipeline/
MIT License
7 stars 3 forks source link

Dask/Pandas regression: IndexError raised during new source analysis #631

Closed marxide closed 2 years ago

marxide commented 2 years ago

Pipeline runs have been failing with pandas>=1.4.0 with an IndexError that is raised during the new source analysis.

From my investigations, it seems to be that since pandas>=1.4.0, Dask is passing empty DataFrames to the apply function during a groupby-apply. This appears to be related to the number of partitions in the Dask DataFrame: if the number of partitions is greater than the number of unique groups, then the apply func will be given an empty DataFrame on which an .iloc[0] operation fails.

I haven't been able to track down the upstream problem. Even though this is related to Dask, this issue only arises when Pandas is upgraded - the Dask version was unchanged. The best solution for us is probably just to skip empty groups in the apply func.

Also note that this doesn't cause any of our regressions tests to fail during CI due to the testing platform. GitHub actions run on VMs with 2 vCPUs and the number of Dask DataFrame partitions is set by the number of available CPU cores. However, the tests do fail when tested locally on a machine with a decent number of cores.

2022-02-23 04:38:27,783 utils INFO Creating ideal source coverage df...
2022-02-23 04:38:35,718 utils INFO Ideal source coverage time: 7.93 seconds
2022-02-23 04:38:35,719 new_sources INFO Starting new source analysis.
2022-02-23 04:38:55,259 runpipeline ERROR Processing error:
single positional indexer is out-of-bounds
Traceback (most recent call last):
  File "/usr/src/vast-pipeline/vast-pipeline-dev/vast_pipeline/management/commands/runpipeline.py", line 336, in run_pipe
    pipeline.process_pipeline(p_run)
  File "/usr/src/vast-pipeline/vast-pipeline-dev/vast_pipeline/pipeline/main.py", line 256, in process_pipeline
    new_sources_df = new_sources(
  File "/usr/src/vast-pipeline/vast-pipeline-dev/vast_pipeline/pipeline/new_sources.py", line 408, in new_sources
    new_sources_df = parallel_get_rms_measurements(
  File "/usr/src/vast-pipeline/vast-pipeline-dev/vast_pipeline/pipeline/new_sources.py", line 221, in parallel_get_rms_measurements
    dd.from_pandas(out, n_cpu)
  File "/usr/src/vast-pipeline/.local/lib/python3.8/site-packages/dask/base.py", line 290, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/usr/src/vast-pipeline/.local/lib/python3.8/site-packages/dask/base.py", line 573, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/usr/src/vast-pipeline/.local/lib/python3.8/site-packages/dask/multiprocessing.py", line 220, in get
    result = get_async(
  File "/usr/src/vast-pipeline/.local/lib/python3.8/site-packages/dask/local.py", line 506, in get_async
    raise_exception(exc, tb)
  File "/usr/src/vast-pipeline/.local/lib/python3.8/site-packages/dask/local.py", line 314, in reraise
    raise exc
  File "/usr/src/vast-pipeline/.local/lib/python3.8/site-packages/dask/local.py", line 219, in execute_task
    result = _execute_task(task, data)
  File "/usr/src/vast-pipeline/.local/lib/python3.8/site-packages/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/usr/src/vast-pipeline/.local/lib/python3.8/site-packages/dask/optimization.py", line 969, in __call__
    return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
  File "/usr/src/vast-pipeline/.local/lib/python3.8/site-packages/dask/core.py", line 149, in get
    result = _execute_task(task, cache)
  File "/usr/src/vast-pipeline/.local/lib/python3.8/site-packages/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/usr/src/vast-pipeline/.local/lib/python3.8/site-packages/dask/utils.py", line 39, in apply
    return func(*args, **kwargs)
  File "/usr/src/vast-pipeline/.local/lib/python3.8/site-packages/dask/dataframe/core.py", line 6174, in apply_and_enforce
    df = func(*args, **kwargs)
  File "/usr/src/vast-pipeline/.local/lib/python3.8/site-packages/dask/dataframe/groupby.py", line 170, in _groupby_slice_apply
    return g.apply(func, *args, **kwargs)
  File "/usr/src/vast-pipeline/.local/lib/python3.8/site-packages/pandas/core/groupby/groupby.py", line 1414, in apply
    result = self._python_apply_general(f, self._selected_obj)
  File "/usr/src/vast-pipeline/.local/lib/python3.8/site-packages/pandas/core/groupby/groupby.py", line 1455, in _python_apply_general
    values, mutated = self.grouper.apply(f, data, self.axis)
  File "/usr/src/vast-pipeline/.local/lib/python3.8/site-packages/pandas/core/groupby/ops.py", line 776, in apply
    f(data.iloc[:0])
  File "/usr/src/vast-pipeline/.local/lib/python3.8/site-packages/pandas/core/groupby/groupby.py", line 1388, in f
    return func(g, *args, **kwargs)
  File "/usr/src/vast-pipeline/vast-pipeline-dev/vast_pipeline/pipeline/new_sources.py", line 82, in get_image_rms_measurements
    image = group.iloc[0]['img_diff_rms_path']
  File "/usr/src/vast-pipeline/.local/lib/python3.8/site-packages/pandas/core/indexing.py", line 967, in __getitem__
    return self._getitem_axis(maybe_callable, axis=axis)
  File "/usr/src/vast-pipeline/.local/lib/python3.8/site-packages/pandas/core/indexing.py", line 1520, in _getitem_axis
    self._validate_integer(key, axis)
  File "/usr/src/vast-pipeline/.local/lib/python3.8/site-packages/pandas/core/indexing.py", line 1452, in _validate_integer
    raise IndexError("single positional indexer is out-of-bounds")
IndexError: single positional indexer is out-of-bounds