radio-astro-tools / spectral-cube

Library for reading and analyzing astrophysical spectral data cubes
http://spectral-cube.rtfd.org
BSD 3-Clause "New" or "Revised" License
95 stars 62 forks source link

DaskSpectralCube.spectral_interpolate array chunk error #815

Closed e-koch closed 2 years ago

e-koch commented 2 years ago

Spectral interpolation with this cube is failing ~90% of the way through with an odd array size error for a single chunk.

Here's the cube properties:

In [9]: int_cube_smooth
Out[9]:
DaskSpectralCube with shape=(300, 1461, 1208) and unit=K and chunk size (300, 255, 151):
 n_x:   1208  type_x: RA---SIN  unit_x: deg    range:    23.265573 deg:   23.656151 deg
 n_y:   1461  type_y: DEC--SIN  unit_y: deg    range:    30.455238 deg:   30.860794 deg
 n_s:    300  type_s: VRAD      unit_s: m / s  range:  -280000.000 m / s:  -79670.000 m / s

And the (long) error traceback:

In [8]:     int_cube_resamp = int_cube_smooth.spectral_interpolate(sd_cube.spectral_axis[low_chan:high_chan],
   ...:                                                            save_to_tmp_dir=True)
   ...:
WARNING: SmoothingWarning: Input grid has too small a spacing. The data should be smoothed prior to resampling. [spectral_cube.dask_spectral_cube]
[#################################       ] | 84% Completed |  1min 19.6s
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-8-91246032e38d> in <module>
----> 1 int_cube_resamp = int_cube_smooth.spectral_interpolate(sd_cube.spectral_axis[low_chan:high_chan],
      2                                                        save_to_tmp_dir=True)

~/ownCloud/code_development/radio_astro_tools/spectral-cube/spectral_cube/dask_spectral_cube.py in wrapper(self, *args, **kwargs)
     83             filename = tempfile.mktemp()
     84             with dask.config.set(**cube._scheduler_kwargs):
---> 85                 cube._data.to_zarr(filename)
     86             cube._data = da.from_zarr(filename)
     87         return cube

~/anaconda3/lib/python3.8/site-packages/dask/array/core.py in to_zarr(self, *args, **kwargs)
   2752         See function :func:`dask.array.to_zarr` for parameters.
   2753         """
-> 2754         return to_zarr(self, *args, **kwargs)
   2755
   2756     def to_tiledb(self, uri, *args, **kwargs):

~/anaconda3/lib/python3.8/site-packages/dask/array/core.py in to_zarr(arr, url, component, storage_options, overwrite, region, compute, return_stored, **kwargs)
   3510         **kwargs,
   3511     )
-> 3512     return arr.store(z, lock=False, compute=compute, return_stored=return_stored)
   3513
   3514

~/anaconda3/lib/python3.8/site-packages/dask/array/core.py in store(self, target, **kwargs)
   1687     @wraps(store)
   1688     def store(self, target, **kwargs):
-> 1689         r = store([self], [target], **kwargs)
   1690
   1691         if kwargs.get("return_stored", False):

~/anaconda3/lib/python3.8/site-packages/dask/array/core.py in store(sources, targets, lock, regions, compute, return_stored, **kwargs)
   1161     elif compute:
   1162         store_dsk = HighLevelGraph(layers, dependencies)
-> 1163         compute_as_if_collection(Array, store_dsk, map_keys, **kwargs)
   1164         return None
   1165

~/anaconda3/lib/python3.8/site-packages/dask/base.py in compute_as_if_collection(cls, dsk, keys, scheduler, get, **kwargs)
    315     schedule = get_scheduler(scheduler=scheduler, cls=cls, get=get)
    316     dsk2 = optimization_function(cls)(dsk, keys, **kwargs)
--> 317     return schedule(dsk2, keys, **kwargs)
    318
    319

~/anaconda3/lib/python3.8/site-packages/dask/local.py in get_sync(dsk, keys, **kwargs)
    550     """
    551     kwargs.pop("num_workers", None)  # if num_workers present, remove it
--> 552     return get_async(
    553         synchronous_executor.submit,
    554         synchronous_executor._max_workers,

~/anaconda3/lib/python3.8/site-packages/dask/local.py in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
    493             while state["waiting"] or state["ready"] or state["running"]:
    494                 fire_tasks(chunksize)
--> 495                 for key, res_info, failed in queue_get(queue).result():
    496                     if failed:
    497                         exc, tb = loads(res_info)

~/anaconda3/lib/python3.8/concurrent/futures/_base.py in result(self, timeout)
    435                     raise CancelledError()
    436                 elif self._state == FINISHED:
--> 437                     return self.__get_result()
    438
    439                 self._condition.wait(timeout)

~/anaconda3/lib/python3.8/concurrent/futures/_base.py in __get_result(self)
    387         if self._exception:
    388             try:
--> 389                 raise self._exception
    390             finally:
    391                 # Break a reference cycle with the exception in self._exception

~/anaconda3/lib/python3.8/site-packages/dask/local.py in submit(self, fn, *args, **kwargs)
    535         fut = Future()
    536         try:
--> 537             fut.set_result(fn(*args, **kwargs))
    538         except BaseException as e:
    539             fut.set_exception(e)

~/anaconda3/lib/python3.8/site-packages/dask/local.py in batch_execute_tasks(it)
    231     Batch computing of multiple tasks with `execute_task`
    232     """
--> 233     return [execute_task(*a) for a in it]
    234
    235

~/anaconda3/lib/python3.8/site-packages/dask/local.py in <listcomp>(.0)
    231     Batch computing of multiple tasks with `execute_task`
    232     """
--> 233     return [execute_task(*a) for a in it]
    234
    235

~/anaconda3/lib/python3.8/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    222         failed = False
    223     except BaseException as e:
--> 224         result = pack_exception(e, dumps)
    225         failed = True
    226     return key, result, failed

~/anaconda3/lib/python3.8/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    217     try:
    218         task, data = loads(task_info)
--> 219         result = _execute_task(task, data)
    220         id = get_id()
    221         result = dumps((result, id))

~/anaconda3/lib/python3.8/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
    117         # temporaries by their reference count and can execute certain
    118         # operations in-place.
--> 119         return func(*(_execute_task(a, cache) for a in args))
    120     elif not ishashable(arg):
    121         return arg

~/anaconda3/lib/python3.8/site-packages/dask/array/core.py in store_chunk(x, out, index, lock, return_stored)
   4162
   4163 def store_chunk(x, out, index, lock, return_stored):
-> 4164     return load_store_chunk(x, out, index, lock, return_stored, False)
   4165
   4166

~/anaconda3/lib/python3.8/site-packages/dask/array/core.py in load_store_chunk(x, out, index, lock, return_stored, load_stored)
   4149         if x is not None:
   4150             if is_arraylike(x):
-> 4151                 out[index] = x
   4152             else:
   4153                 out[index] = np.asanyarray(x)

~/anaconda3/lib/python3.8/site-packages/zarr/core.py in __setitem__(self, selection, value)
   1211
   1212         fields, selection = pop_fields(selection)
-> 1213         self.set_basic_selection(selection, value, fields=fields)
   1214
   1215     def set_basic_selection(self, selection, value, fields=None):

~/anaconda3/lib/python3.8/site-packages/zarr/core.py in set_basic_selection(self, selection, value, fields)
   1306             return self._set_basic_selection_zd(selection, value, fields=fields)
   1307         else:
-> 1308             return self._set_basic_selection_nd(selection, value, fields=fields)
   1309
   1310     def set_orthogonal_selection(self, selection, value, fields=None):

~/anaconda3/lib/python3.8/site-packages/zarr/core.py in _set_basic_selection_nd(self, selection, value, fields)
   1597         indexer = BasicIndexer(selection, self)
   1598
-> 1599         self._set_selection(indexer, value, fields=fields)
   1600
   1601     def _set_selection(self, indexer, value, fields=None):

~/anaconda3/lib/python3.8/site-packages/zarr/core.py in _set_selection(self, indexer, value, fields)
   1625             if not hasattr(value, 'shape'):
   1626                 value = np.asanyarray(value)
-> 1627             check_array_shape('value', value, sel_shape)
   1628
   1629         # iterate over chunks in range

~/anaconda3/lib/python3.8/site-packages/zarr/util.py in check_array_shape(param, array, shape)
    525                         .format(param, type(array)))
    526     if array.shape != shape:
--> 527         raise ValueError('parameter {!r}: expected array with shape {!r}, got {!r}'
    528                          .format(param, shape, array.shape))
    529

ValueError: parameter 'value': expected array with shape (77, 255, 151), got (77, 186, 151)

It's not clear to me where the actual error is coming from. A similar error occurs without saving to a temp zarr file, too. So far this only seems to happen with spectral interpolation so I'm wondering if it's something in our wrapper that changes the spatial shape of the chunk.

keflavich commented 2 years ago

It might be valuable to have the traceback w/o writing to zarr if you can reproduce it. This traceback seems to be dominated by zarr stuff

e-koch commented 2 years ago

It seems to only be triggered when writing to disk. Forcing the compute works:

In [11]:     int_cube_resamp = int_cube_smooth.spectral_interpolate(sd_cube.spectral_axis[low_chan:high_chan],
    ...:                                                            save_to_tmp_dir=False)
WARNING: SmoothingWarning: Input grid has too small a spacing. The data should be smoothed prior to resampling. [spectral_cube.dask_spectral_cube]

In [12]: int_cube_resamp._data
Out[12]: dask.array<getitem, shape=(77, 1530, 1208), dtype=>f8, chunksize=(77, 255, 151), chunktype=numpy.ndarray>

In [13]: out = int_cube_resamp._data.compute()
[########################################] | 100% Completed | 32.3s

But writing to disk while triggering the compute does not:

In [16]:     int_cube_resamp.write(f"{aca_path}/M33_ACA_12CO21_iram_spec_matched_{prefix}.fits", overwrite=True)
    ...:
[#################################       ] | 83% Completed |  1min 14.6s
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-16-30a5de7f0c27> in <module>
----> 1 int_cube_resamp.write(f"{aca_path}/M33_ACA_12CO21_iram_spec_matched_{prefix}.fits", overwrite=True)

~/ownCloud/code_development/radio_astro_tools/spectral-cube/spectral_cube/dask_spectral_cube.py in write(self, *args, **kwargs)
   1346     def write(self, *args, **kwargs):
   1347         with dask.config.set(**self._scheduler_kwargs):
-> 1348             super().write(*args, **kwargs)
   1349
   1350     @property

~/ownCloud/code_development/radio_astro_tools/spectral-cube/spectral_cube/io/core.py in __call__(self, serialize_method, *args, **kwargs)
    129
    130     def __call__(self, *args, serialize_method=None, **kwargs):
--> 131         registry.write(self._instance, *args, **kwargs)
    132
    133

~/anaconda3/lib/python3.8/site-packages/astropy/io/registry/compat.py in wrapper(registry, *args, **kwargs)
     41             registry = default_registry
     42         # get and call bound method from registry instance
---> 43         return getattr(registry, method_name)(*args, **kwargs)
     44
     45     return wrapper

~/anaconda3/lib/python3.8/site-packages/astropy/io/registry/core.py in write(self, data, format, *args, **kwargs)
    352
    353         writer = self.get_writer(format, data.__class__)
--> 354         return writer(data, *args, **kwargs)
    355
    356

~/ownCloud/code_development/radio_astro_tools/spectral-cube/spectral_cube/io/fits.py in write_fits_cube(cube, filename, overwrite, include_origin_notes)
    275                                                         date=now))
    276         try:
--> 277             hdulist.writeto(filename, overwrite=overwrite)
    278         except TypeError:
    279             hdulist.writeto(filename, clobber=overwrite)

~/anaconda3/lib/python3.8/site-packages/astropy/utils/decorators.py in wrapper(*args, **kwargs)
    544                     warnings.warn(msg, warning_type, stacklevel=2)
    545
--> 546             return function(*args, **kwargs)
    547
    548         return wrapper

~/anaconda3/lib/python3.8/site-packages/astropy/io/fits/hdu/hdulist.py in writeto(self, fileobj, output_verify, overwrite, checksum)
    958             for hdu in self:
    959                 hdu._prewriteto(checksum=checksum)
--> 960                 hdu._writeto(hdulist._file)
    961                 hdu._postwriteto()
    962         hdulist.close(output_verify=output_verify, closed=closed)

~/anaconda3/lib/python3.8/site-packages/astropy/io/fits/hdu/base.py in _writeto(self, fileobj, inplace, copy)
    688
    689         with _free_space_check(self, dirname):
--> 690             self._writeto_internal(fileobj, inplace, copy)
    691
    692     def _writeto_internal(self, fileobj, inplace, copy):

~/anaconda3/lib/python3.8/site-packages/astropy/io/fits/hdu/base.py in _writeto_internal(self, fileobj, inplace, copy)
    694         if not inplace or self._new:
    695             header_offset, _ = self._writeheader(fileobj)
--> 696             data_offset, data_size = self._writedata(fileobj)
    697
    698             # Set the various data location attributes on newly-written HDUs

~/anaconda3/lib/python3.8/site-packages/astropy/io/fits/hdu/base.py in _writedata(self, fileobj)
    624         if self._data_loaded or self._data_needs_rescale:
    625             if self.data is not None:
--> 626                 size += self._writedata_internal(fileobj)
    627             # pad the FITS data block
    628             # to avoid a bug in the lustre filesystem client, don't

~/anaconda3/lib/python3.8/site-packages/astropy/io/fits/hdu/image.py in _writedata_internal(self, fileobj)
    621             return size
    622         elif _is_dask_array(self.data):
--> 623             return self._writeinternal_dask(fileobj)
    624         else:
    625             # Based on the system type, determine the byteorders that

~/anaconda3/lib/python3.8/site-packages/astropy/io/fits/hdu/image.py in _writeinternal_dask(self, fileobj)
    707                                 buffer=outmmap)
    708
--> 709             output.store(outarr, lock=True, compute=True)
    710         finally:
    711             if should_close:

~/anaconda3/lib/python3.8/site-packages/dask/array/core.py in store(self, target, **kwargs)
   1687     @wraps(store)
   1688     def store(self, target, **kwargs):
-> 1689         r = store([self], [target], **kwargs)
   1690
   1691         if kwargs.get("return_stored", False):

~/anaconda3/lib/python3.8/site-packages/dask/array/core.py in store(sources, targets, lock, regions, compute, return_stored, **kwargs)
   1161     elif compute:
   1162         store_dsk = HighLevelGraph(layers, dependencies)
-> 1163         compute_as_if_collection(Array, store_dsk, map_keys, **kwargs)
   1164         return None
   1165

~/anaconda3/lib/python3.8/site-packages/dask/base.py in compute_as_if_collection(cls, dsk, keys, scheduler, get, **kwargs)
    315     schedule = get_scheduler(scheduler=scheduler, cls=cls, get=get)
    316     dsk2 = optimization_function(cls)(dsk, keys, **kwargs)
--> 317     return schedule(dsk2, keys, **kwargs)
    318
    319

~/anaconda3/lib/python3.8/site-packages/dask/local.py in get_sync(dsk, keys, **kwargs)
    550     """
    551     kwargs.pop("num_workers", None)  # if num_workers present, remove it
--> 552     return get_async(
    553         synchronous_executor.submit,
    554         synchronous_executor._max_workers,

~/anaconda3/lib/python3.8/site-packages/dask/local.py in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
    493             while state["waiting"] or state["ready"] or state["running"]:
    494                 fire_tasks(chunksize)
--> 495                 for key, res_info, failed in queue_get(queue).result():
    496                     if failed:
    497                         exc, tb = loads(res_info)

~/anaconda3/lib/python3.8/concurrent/futures/_base.py in result(self, timeout)
    435                     raise CancelledError()
    436                 elif self._state == FINISHED:
--> 437                     return self.__get_result()
    438
    439                 self._condition.wait(timeout)

~/anaconda3/lib/python3.8/concurrent/futures/_base.py in __get_result(self)
    387         if self._exception:
    388             try:
--> 389                 raise self._exception
    390             finally:
    391                 # Break a reference cycle with the exception in self._exception

~/anaconda3/lib/python3.8/site-packages/dask/local.py in submit(self, fn, *args, **kwargs)
    535         fut = Future()
    536         try:
--> 537             fut.set_result(fn(*args, **kwargs))
    538         except BaseException as e:
    539             fut.set_exception(e)

~/anaconda3/lib/python3.8/site-packages/dask/local.py in batch_execute_tasks(it)
    231     Batch computing of multiple tasks with `execute_task`
    232     """
--> 233     return [execute_task(*a) for a in it]
    234
    235

~/anaconda3/lib/python3.8/site-packages/dask/local.py in <listcomp>(.0)
    231     Batch computing of multiple tasks with `execute_task`
    232     """
--> 233     return [execute_task(*a) for a in it]
    234
    235

~/anaconda3/lib/python3.8/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    222         failed = False
    223     except BaseException as e:
--> 224         result = pack_exception(e, dumps)
    225         failed = True
    226     return key, result, failed

~/anaconda3/lib/python3.8/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    217     try:
    218         task, data = loads(task_info)
--> 219         result = _execute_task(task, data)
    220         id = get_id()
    221         result = dumps((result, id))

~/anaconda3/lib/python3.8/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
    117         # temporaries by their reference count and can execute certain
    118         # operations in-place.
--> 119         return func(*(_execute_task(a, cache) for a in args))
    120     elif not ishashable(arg):
    121         return arg

~/anaconda3/lib/python3.8/site-packages/dask/array/core.py in store_chunk(x, out, index, lock, return_stored)
   4162
   4163 def store_chunk(x, out, index, lock, return_stored):
-> 4164     return load_store_chunk(x, out, index, lock, return_stored, False)
   4165
   4166

~/anaconda3/lib/python3.8/site-packages/dask/array/core.py in load_store_chunk(x, out, index, lock, return_stored, load_stored)
   4149         if x is not None:
   4150             if is_arraylike(x):
-> 4151                 out[index] = x
   4152             else:
   4153                 out[index] = np.asanyarray(x)

ValueError: could not broadcast input array from shape (77,186,151) into shape (77,255,151)

It's a partial chunk on one of the edge of the array, so the shape it's returning is "right" but it's raising an error as it's not the full chunk size.

I can't tell if this is the write-out or specifically our compute function for spectral interpolation, but I haven't been able to trigger the issue with other functions (reproject, spectral smoothing, etc).

Will keep digging on this.