radio-astro-tools / spectral-cube

Library for reading and analyzing astrophysical spectral data cubes
http://spectral-cube.rtfd.org
BSD 3-Clause "New" or "Revised" License
95 stars 62 forks source link

Error writing to fits from dask #800

Closed keflavich closed 2 years ago

keflavich commented 2 years ago

code:

for fn in flist:
    for rr,regn in ((reg, 'co1'), (reg2, 'co2')):
        outfile = fn.replace("_small",f"_{regn}_smaller")
        if not os.path.exists(outfile):
            cube = SpectralCube.read(fn, use_dask=True).subcube_from_regions(rr)
            cube.allow_huge_operations=True
            with cube.use_dask_scheduler('threads', num_workers=32):  
                cb = cube.beams.common_beam(max_iter=20, max_epsilon=0.01)
                scube = cube.convolve_to(cb)
                scube.write(outfile)
WARNING: StokesWarning: Cube is a Stokes cube, returning spectral cube for I component [spectral_cube.io.core]
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
/blue/adamginsburg/adamginsburg/repos/spectral-cube/spectral_cube/io/fits.py in write_fits_cube(cube, filename, overwrite, include_origin_notes)
    276         try:
--> 277             hdulist.writeto(filename, overwrite=overwrite)
    278         except TypeError:

/blue/adamginsburg/adamginsburg/repos/astropy/astropy/io/fits/hdu/hdulist.py in writeto(self, fileobj, output_verify, overwrite, checksum)
    948                 hdu._prewriteto(checksum=checksum)
--> 949                 hdu._writeto(hdulist._file)
    950                 hdu._postwriteto()

/blue/adamginsburg/adamginsburg/repos/astropy/astropy/io/fits/hdu/base.py in _writeto(self, fileobj, inplace, copy)
    680         with _free_space_check(self, dirname):
--> 681             self._writeto_internal(fileobj, inplace, copy)
    682 

/blue/adamginsburg/adamginsburg/repos/astropy/astropy/io/fits/hdu/base.py in _writeto_internal(self, fileobj, inplace, copy)
    686             header_offset, _ = self._writeheader(fileobj)
--> 687             data_offset, data_size = self._writedata(fileobj)
    688 

/blue/adamginsburg/adamginsburg/repos/astropy/astropy/io/fits/hdu/base.py in _writedata(self, fileobj)
    616             if self.data is not None:
--> 617                 size += self._writedata_internal(fileobj)
    618             # pad the FITS data block

/blue/adamginsburg/adamginsburg/repos/astropy/astropy/io/fits/hdu/image.py in _writedata_internal(self, fileobj)
    622         elif _is_dask_array(self.data):
--> 623             return self._writeinternal_dask(fileobj)
    624         else:

/blue/adamginsburg/adamginsburg/repos/astropy/astropy/io/fits/hdu/image.py in _writeinternal_dask(self, fileobj)
    708 
--> 709             output.store(outarr, lock=True, compute=True)
    710         finally:

/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/dask/array/core.py in store(self, target, **kwargs)
   1596     def store(self, target, **kwargs):
-> 1597         r = store([self], [target], **kwargs)
   1598 

/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/dask/array/core.py in store(sources, targets, lock, regions, compute, return_stored, **kwargs)
   1075         store_dsk = HighLevelGraph(layers, dependencies)
-> 1076         compute_as_if_collection(Array, store_dsk, map_keys, **kwargs)
   1077         return None

/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/dask/base.py in compute_as_if_collection(cls, dsk, keys, scheduler, get, **kwargs)
    314     dsk2 = optimization_function(cls)(dsk, keys, **kwargs)
--> 315     return schedule(dsk2, keys, **kwargs)
    316 

/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, pool, **kwargs)
     78 
---> 79     results = get_async(
     80         pool.submit,

/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/dask/local.py in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
    506                         else:
--> 507                             raise_exception(exc, tb)
    508                     res, worker_id = loads(res_info)

/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/dask/local.py in reraise(exc, tb)
    314         raise exc.with_traceback(tb)
--> 315     raise exc
    316 

/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    219         task, data = loads(task_info)
--> 220         result = _execute_task(task, data)
    221         id = get_id()

/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
    118         # operations in-place.
--> 119         return func(*(_execute_task(a, cache) for a in args))
    120     elif not ishashable(arg):

/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/dask/array/core.py in getter(a, b, asarray, lock)
    101     try:
--> 102         c = a[b]
    103         # Below we special-case `np.matrix` to force a conversion to

/blue/adamginsburg/adamginsburg/repos/spectral-cube/spectral_cube/dask_spectral_cube.py in __getitem__(self, view)
    198         else:
--> 199             return self._mask._filled(data=self._data,
    200                                       view=view,

/blue/adamginsburg/adamginsburg/repos/spectral-cube/spectral_cube/masks.py in _filled(self, data, wcs, fill, view, use_memmap, **kwargs)
    239 
--> 240         return np.ma.masked_array(sliced_data, mask=ex).filled(fill)
    241 

~/.local/lib/python3.9/site-packages/numpy/ma/core.py in __new__(cls, data, mask, dtype, copy, subok, ndmin, fill_value, keep_mask, hard_mask, shrink, order)
   2828         # Process data.
-> 2829         _data = np.array(data, dtype=dtype, copy=copy,
   2830                          order=order, subok=True, ndmin=ndmin)

/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/dask/array/core.py in __array__(self, dtype, **kwargs)
   1533     def __array__(self, dtype=None, **kwargs):
-> 1534         x = self.compute()
   1535         if dtype and x.dtype != dtype:

/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/dask/base.py in compute(self, **kwargs)
    287         """
--> 288         (result,) = compute(self, traverse=False, **kwargs)
    289         return result

/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/dask/base.py in compute(*args, **kwargs)
    569 
--> 570     results = schedule(dsk, keys, **kwargs)
    571     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, pool, **kwargs)
     78 
---> 79     results = get_async(
     80         pool.submit,

/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/dask/local.py in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
    506                         else:
--> 507                             raise_exception(exc, tb)
    508                     res, worker_id = loads(res_info)

/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/dask/local.py in reraise(exc, tb)
    314         raise exc.with_traceback(tb)
--> 315     raise exc
    316 

/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    219         task, data = loads(task_info)
--> 220         result = _execute_task(task, data)
    221         id = get_id()

/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
    118         # operations in-place.
--> 119         return func(*(_execute_task(a, cache) for a in args))
    120     elif not ishashable(arg):

/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/dask/optimization.py in __call__(self, *args)
    968             raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 969         return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
    970 

/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/dask/core.py in get(dsk, out, cache)
    148         task = dsk[key]
--> 149         result = _execute_task(task, cache)
    150         cache[key] = result

/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
    118         # operations in-place.
--> 119         return func(*(_execute_task(a, cache) for a in args))
    120     elif not ishashable(arg):

/blue/adamginsburg/adamginsburg/repos/spectral-cube/spectral_cube/dask_spectral_cube.py in convfunc(img, beam, **kwargs)
   1547                         if needs_beam_ratio:
-> 1548                             out[index] *= beam_ratio_factors[index]
   1549 

TypeError: ufunc 'multiply' output (typecode 'O') could not be coerced to provided output parameter (typecode 'f') according to the casting rule ''same_kind''

During handling of the above exception, another exception occurred:

TypeError                                 Traceback (most recent call last)
/scratch/local/20668739/ipykernel_5032/3967241154.py in <module>
      8                 cb = cube.beams.common_beam(max_iter=20, max_epsilon=0.01)
      9                 scube = cube.convolve_to(cb)
---> 10                 scube.write(outfile)

/blue/adamginsburg/adamginsburg/repos/spectral-cube/spectral_cube/dask_spectral_cube.py in write(self, *args, **kwargs)
   1336     def write(self, *args, **kwargs):
   1337         with dask.config.set(**self._scheduler_kwargs):
-> 1338             super().write(*args, **kwargs)
   1339 
   1340     @property

/blue/adamginsburg/adamginsburg/repos/spectral-cube/spectral_cube/io/core.py in __call__(self, serialize_method, *args, **kwargs)
    129 
    130     def __call__(self, *args, serialize_method=None, **kwargs):
--> 131         registry.write(self._instance, *args, **kwargs)
    132 
    133 

/blue/adamginsburg/adamginsburg/repos/astropy/astropy/io/registry/compat.py in wrapper(registry, *args, **kwargs)
     41             registry = default_registry
     42         # get and call bound method from registry instance
---> 43         return getattr(registry, method_name)(*args, **kwargs)
     44 
     45     return wrapper

/blue/adamginsburg/adamginsburg/repos/astropy/astropy/io/registry/core.py in write(self, data, format, *args, **kwargs)
    352 
    353         writer = self.get_writer(format, data.__class__)
--> 354         return writer(data, *args, **kwargs)
    355 
    356 

/blue/adamginsburg/adamginsburg/repos/spectral-cube/spectral_cube/io/fits.py in write_fits_cube(cube, filename, overwrite, include_origin_notes)
    277             hdulist.writeto(filename, overwrite=overwrite)
    278         except TypeError:
--> 279             hdulist.writeto(filename, clobber=overwrite)
    280     else:
    281         raise NotImplementedError()

TypeError: writeto() got an unexpected keyword argument 'clobber'
keflavich commented 2 years ago

This went away when I stopped using the nthreads=32 setup. I only had 8 cores available so maybe that's related?

astrofrog commented 2 years ago

Can you try commenting out the try...except TypeError as I think this is hiding the real error?

keflavich commented 2 years ago

I'm sure you're right. Tough to do right now; I'm running stuff remotely while traveling.

I was also going to try save_to_tmpdir in the last step before writing to see if that helped.

(also, `num_workers=8 caused same problem)

keflavich commented 2 years ago

save_to_tmpdir does seem to suppress the error

keflavich commented 2 years ago

New set of data, similar but different error:

WARNING: StokesWarning: Cube is a Stokes cube, returning spectral cube for I component [spectral_cube.io.core]
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
/scratch/local/20668739/ipykernel_6333/3761283239.py in <module>
      7             with cube.use_dask_scheduler('threads', num_workers=8):
      8                 cb = cube.beams.common_beam(max_iter=20, max_epsilon=0.01)
----> 9                 scube = cube.convolve_to(cb, save_to_tmp_dir=True)
     10             scube.write(outfile)

/blue/adamginsburg/adamginsburg/repos/spectral-cube/spectral_cube/dask_spectral_cube.py in wrapper(self, *args, **kwargs)
     83             filename = tempfile.mktemp()
     84             with dask.config.set(**cube._scheduler_kwargs):
---> 85                 cube._data.to_zarr(filename)
     86             cube._data = da.from_zarr(filename)
     87         return cube

/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/dask/array/core.py in to_zarr(self, *args, **kwargs)
   2675         See function :func:`dask.array.to_zarr` for parameters.
   2676         """
-> 2677         return to_zarr(self, *args, **kwargs)
   2678 
   2679     def to_tiledb(self, uri, *args, **kwargs):

/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/dask/array/core.py in to_zarr(arr, url, component, storage_options, overwrite, compute, return_stored, **kwargs)
   3418         **kwargs,
   3419     )
-> 3420     return arr.store(z, lock=False, compute=compute, return_stored=return_stored)
   3421 
   3422 

/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/dask/array/core.py in store(self, target, **kwargs)
   1595     @wraps(store)
   1596     def store(self, target, **kwargs):
-> 1597         r = store([self], [target], **kwargs)
   1598 
   1599         if kwargs.get("return_stored", False):

/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/dask/array/core.py in store(sources, targets, lock, regions, compute, return_stored, **kwargs)
   1074     elif compute:
   1075         store_dsk = HighLevelGraph(layers, dependencies)
-> 1076         compute_as_if_collection(Array, store_dsk, map_keys, **kwargs)
   1077         return None
   1078 

/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/dask/base.py in compute_as_if_collection(cls, dsk, keys, scheduler, get, **kwargs)
    313     schedule = get_scheduler(scheduler=scheduler, cls=cls, get=get)
    314     dsk2 = optimization_function(cls)(dsk, keys, **kwargs)
--> 315     return schedule(dsk2, keys, **kwargs)
    316 
    317 

/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, pool, **kwargs)
     77             pool = MultiprocessingPoolExecutor(pool)
     78 
---> 79     results = get_async(
     80         pool.submit,
     81         pool._max_workers,

/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/dask/local.py in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
    505                             _execute_task(task, data)  # Re-execute locally
    506                         else:
--> 507                             raise_exception(exc, tb)
    508                     res, worker_id = loads(res_info)
    509                     state["cache"][key] = res

/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/dask/local.py in reraise(exc, tb)
    313     if exc.__traceback__ is not tb:
    314         raise exc.with_traceback(tb)
--> 315     raise exc
    316 
    317 

/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    218     try:
    219         task, data = loads(task_info)
--> 220         result = _execute_task(task, data)
    221         id = get_id()
    222         result = dumps((result, id))

/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
    117         # temporaries by their reference count and can execute certain
    118         # operations in-place.
--> 119         return func(*(_execute_task(a, cache) for a in args))
    120     elif not ishashable(arg):
    121         return arg

/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/dask/core.py in <genexpr>(.0)
    117         # temporaries by their reference count and can execute certain
    118         # operations in-place.
--> 119         return func(*(_execute_task(a, cache) for a in args))
    120     elif not ishashable(arg):
    121         return arg

/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
    117         # temporaries by their reference count and can execute certain
    118         # operations in-place.
--> 119         return func(*(_execute_task(a, cache) for a in args))
    120     elif not ishashable(arg):
    121         return arg

/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/dask/optimization.py in __call__(self, *args)
    967         if not len(args) == len(self.inkeys):
    968             raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 969         return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
    970 
    971     def __reduce__(self):

/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/dask/core.py in get(dsk, out, cache)
    147     for key in toposort(dsk):
    148         task = dsk[key]
--> 149         result = _execute_task(task, cache)
    150         cache[key] = result
    151     result = _execute_task(out, cache)

/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
    117         # temporaries by their reference count and can execute certain
    118         # operations in-place.
--> 119         return func(*(_execute_task(a, cache) for a in args))
    120     elif not ishashable(arg):
    121         return arg

/blue/adamginsburg/adamginsburg/repos/spectral-cube/spectral_cube/dask_spectral_cube.py in convfunc(img, beam, **kwargs)
   1546 
   1547                         if needs_beam_ratio:
-> 1548                             out[index] *= beam_ratio_factors[index]
   1549 
   1550                 return out

TypeError: ufunc 'multiply' output (typecode 'O') could not be coerced to provided output parameter (typecode 'f') according to the casting rule ''same_kind''
keflavich commented 2 years ago

Is this maybe some sort of endianness issue?

e-koch commented 2 years ago

@keflavich -- is this related to #803 and solved in #804?

keflavich commented 2 years ago

probably yes!