radio-astro-tools / spectral-cube

Library for reading and analyzing astrophysical spectral data cubes
http://spectral-cube.rtfd.org
BSD 3-Clause "New" or "Revised" License
95 stars 62 forks source link

Gigantic cube operation failed a stress test #784

Open keflavich opened 2 years ago

keflavich commented 2 years ago

I tried stress-testing our dask stuff by doing some operations on a 680 GB cube.

The stress test is this: https://github.com/ALMA-IMF/reduction/blob/1fecfdceb94ab5a673cc4ce7bce9ee82224033f9/reduction/cube_finalizing.py

which loads 5 cubes, the image, residual, model, psf, and pb cubes. 4 of those are combined in some way; the PSF cube is just measured on its own.

The stress-test results look like this:

In [1]: from spectral_cube import SpectralCube

In [2]: import dask.array as da
   ...: from dask.diagnostics import ProgressBar

In [3]: %run /orange/adamginsburg/ALMA_IMF/reduction/reduction/cube_finalizing.py

In [4]: with ProgressBar():
   ...:     beam_correct_cube('S255IR-SMA1_sci.spw3.cube.I.manual')
   ...:
/blue/adamginsburg/adamginsburg/repos/casa-formats-io/casa_formats_io/casa_dask.py:230: RuntimeWarning: divide by zero encountered in long_scalars
  factors = [f for f in range(stacks[dim] + 1) if stacks[dim] % f == 0]
WARNING: BeamWarning: No beam information found in CASA image. [spectral_cube.io.casa_image]
WARNING: StokesWarning: Cube is a Stokes cube, returning spectral cube for I component [spectral_cube.io.core]
/blue/adamginsburg/adamginsburg/repos/casa-formats-io/casa_formats_io/casa_dask.py:230: RuntimeWarning: divide by zero encountered in long_scalars
  factors = [f for f in range(stacks[dim] + 1) if stacks[dim] % f == 0]
WARNING: StokesWarning: Cube is a Stokes cube, returning spectral cube for I component [spectral_cube.io.core]
/blue/adamginsburg/adamginsburg/repos/casa-formats-io/casa_formats_io/casa_dask.py:230: RuntimeWarning: divide by zero encountered in long_scalars
  factors = [f for f in range(stacks[dim] + 1) if stacks[dim] % f == 0]
WARNING: BeamWarning: No beam information found in CASA image. [spectral_cube.io.casa_image]
WARNING: StokesWarning: Cube is a Stokes cube, returning spectral cube for I component [spectral_cube.io.core]
/blue/adamginsburg/adamginsburg/repos/casa-formats-io/casa_formats_io/casa_dask.py:230: RuntimeWarning: divide by zero encountered in long_scalars
  factors = [f for f in range(stacks[dim] + 1) if stacks[dim] % f == 0]
WARNING: BeamWarning: No beam information found in CASA image. [spectral_cube.io.casa_image]
WARNING: StokesWarning: Cube is a Stokes cube, returning spectral cube for I component [spectral_cube.io.core]
INFO: Completed reading. t=3.079984188079834 [unknown]
INFO: Starting minimize. t=3.315718650817871 [unknown]
[########################################] | 100% Completed | 19min  9.6s
[########################################] | 100% Completed | 18min 12.6s
[########################################] | 100% Completed | 17min 29.8s
INFO: Completed minimize. t=3300.095191717148.  minimizing took 3296.779483795166 [unknown]
INFO: Completed minslice. t=3300.7892508506775 [unknown]
INFO: Epsilon beginning. t=3300.7909491062164 [unknown]
[########################################] | 100% Completed |  2.2s
[########################################] | 100% Completed |  1.3s

******* SNIP : cut for brevity *********

[########################################] | 100% Completed |  1.3s
[########################################] | 100% Completed |  0.2s
INFO: Epsilon completed. t=11251.39980506897, eps took 7950.6088609695435 [unknown]
INFO: Convolving.  t=11251.443447828293 [unknown]
INFO: Done convolving, now rescaling.  t=11252.022993803024 [unknown]
creating restor
/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/dask/array/core.py:4458: PerformanceWarning: Increasing number of chunks by factor of 10
  result = blockwise(
done creating restor
INFO: Done rescaling.  t=11254.715781450272 [unknown]
WARNING: VerifyWarning: Keyword name 'JvM_epsilon_max' is greater than 8 characters or contains characters not allowed by the FITS standard; a HIERARCH card will be created. [astropy.io.fits.card]
WARNING: VerifyWarning: Keyword name 'JvM_epsilon_min' is greater than 8 characters or contains characters not allowed by the FITS standard; a HIERARCH card will be created. [astropy.io.fits.card]
WARNING: VerifyWarning: Keyword name 'JvM_epsilon_median' is greater than 8 characters or contains characters not allowed by the FITS standard; a HIERARCH card will be created. [astropy.io.fits.card]
INFO: Beginning JvM write.  t=11254.803322076797 [unknown]
[###                                     ] | 9% Completed |  1hr 20min 35.3sKilled

the files:

$ du -csh S255IR-SMA1_sci.spw3.cube.I.manual*
680G    S255IR-SMA1_sci.spw3.cube.I.manual.image
375G    S255IR-SMA1_sci.spw3.cube.I.manual.image.pbcor
15G     S255IR-SMA1_sci.spw3.cube.I.manual.JvM.image.fits
660G    S255IR-SMA1_sci.spw3.cube.I.manual.mask
660G    S255IR-SMA1_sci.spw3.cube.I.manual.model
680G    S255IR-SMA1_sci.spw3.cube.I.manual.pb
660G    S255IR-SMA1_sci.spw3.cube.I.manual.psf
680G    S255IR-SMA1_sci.spw3.cube.I.manual.residual
84K     S255IR-SMA1_sci.spw3.cube.I.manual.sumwt
4.4T    total

It was running on a machine with 32 GB of memory allocated.

It looks like it died only a small fraction of the way through writing.

This probably indicates that somewhere along the path, there is a compute step happening - or something that is loading to memory instead of dumping direct to disk.

keflavich commented 2 years ago

This was done based on #783

keflavich commented 2 years ago

I'm trying a simpler test:

In [1]: from spectral_cube import SpectralCube

In [2]: from dask.diagnostics import ProgressBar

In [3]: with ProgressBar():
   ...:     cube = SpectralCube.read('S255IR-SMA1_sci.spw3.cube.I.manual.image')
   ...:     print(f"Loaded cube {cube}")
   ...:     result = cube*cube
   ...:     print(f"Created square cube {result}")
   ...:     result.write("test.fits")
   ...:     print("Wrote test.fits")
   ...:
/blue/adamginsburg/adamginsburg/repos/casa-formats-io/casa_formats_io/casa_dask.py:230: RuntimeWarning: divide by zero encountered in long_scalars
  factors = [f for f in range(stacks[dim] + 1) if stacks[dim] % f == 0]
WARNING: StokesWarning: Cube is a Stokes cube, returning spectral cube for I component [spectral_cube.io.core]
Loaded cube DaskVaryingResolutionSpectralCube with shape=(1920, 9600, 9600) and unit=Jy / beam and chunk size (10, 100, 9600):
 n_x:   9600  type_x: RA---SIN  unit_x: deg    range:    93.219155 deg:   93.230930 deg
 n_y:   9600  type_y: DEC--SIN  unit_y: deg    range:    17.984139 deg:   17.995338 deg
 n_s:   1920  type_s: FREQ      unit_s: Hz     range: 219563051918.890 Hz:221436973845.577 Hz
Created square cube DaskVaryingResolutionSpectralCube with shape=(1920, 9600, 9600) and unit=Jy2 / beam2 and chunk size (10, 100, 9600):
 n_x:   9600  type_x: RA---SIN  unit_x: deg    range:    93.219155 deg:   93.230930 deg
 n_y:   9600  type_y: DEC--SIN  unit_y: deg    range:    17.984139 deg:   17.995338 deg
 n_s:   1920  type_s: FREQ      unit_s: Hz     range: 219563051918.890 Hz:221436973845.577 Hz
[                                        ] | 0% Completed | 10.7s
keflavich commented 2 years ago

hmm, looks like a success:

In [1]: from spectral_cube import SpectralCube

In [2]: from dask.diagnostics import ProgressBar

In [3]: with ProgressBar():
   ...:     cube = SpectralCube.read('S255IR-SMA1_sci.spw3.cube.I.manual.image')
   ...:     print(f"Loaded cube {cube}")
   ...:     result = cube*cube
   ...:     print(f"Created square cube {result}")
   ...:     result.write("test.fits")
   ...:     print("Wrote test.fits")
   ...:
/blue/adamginsburg/adamginsburg/repos/casa-formats-io/casa_formats_io/casa_dask.py:230: RuntimeWarning: divide by zero encountered in long_scalars
  factors = [f for f in range(stacks[dim] + 1) if stacks[dim] % f == 0]
WARNING: StokesWarning: Cube is a Stokes cube, returning spectral cube for I component [spectral_cube.io.core]
Loaded cube DaskVaryingResolutionSpectralCube with shape=(1920, 9600, 9600) and unit=Jy / beam and chunk size (10, 100, 9600):
 n_x:   9600  type_x: RA---SIN  unit_x: deg    range:    93.219155 deg:   93.230930 deg
 n_y:   9600  type_y: DEC--SIN  unit_y: deg    range:    17.984139 deg:   17.995338 deg
 n_s:   1920  type_s: FREQ      unit_s: Hz     range: 219563051918.890 Hz:221436973845.577 Hz
Created square cube DaskVaryingResolutionSpectralCube with shape=(1920, 9600, 9600) and unit=Jy2 / beam2 and chunk size (10, 100, 9600):
 n_x:   9600  type_x: RA---SIN  unit_x: deg    range:    93.219155 deg:   93.230930 deg
 n_y:   9600  type_y: DEC--SIN  unit_y: deg    range:    17.984139 deg:   17.995338 deg
 n_s:   1920  type_s: FREQ      unit_s: Hz     range: 219563051918.890 Hz:221436973845.577 Hz
[####################################    ] | 91% Completed |  3hr 42min 33.0s

so I suspect the convolution plays a role here... dang.

keflavich commented 2 years ago

This is a warning that should truly inspire fear:

  File "/blue/adamginsburg/adamginsburg/repos/spectral-cube/spectral_cube/dask_spectral_cube.py", line 1545, in convfunc
    out[index] = convolve(img[index], kernel, normalize_kernel=True, **kwargs)
  File "/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/astropy/nddata/decorators.py", line 246, in wrapper
    result = func(data, *args, **kwargs)
  File "/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/astropy/convolution/convolve.py", line 650, in convolve_fft
    raise ValueError(f"Size Error: Arrays will be {human_file_size(array_size_B)}.  "
ValueError: Size Error: Arrays will be 1.4G.  Use allow_huge=True to override this exception.
keflavich commented 2 years ago

wow, convolution is not fast.

In [11]: with ProgressBar():
    ...:     cube = SpectralCube.read('S255IR-SMA1_sci.spw3.cube.I.manual.image')
    ...:     print(f"Loaded cube {cube}")
    ...:     result = cube.convolve_to(cube.beams.common_beam(), allow_huge=True)
    ...:     print(f"Created smooth cube {result}")
    ...:     result.write("test.fits")
    ...:     print("Wrote test.fits")
    ...:
/blue/adamginsburg/adamginsburg/repos/casa-formats-io/casa_formats_io/casa_dask.py:230: RuntimeWarning: divide by zero encountered in long_scalars
  factors = [f for f in range(stacks[dim] + 1) if stacks[dim] % f == 0]
WARNING: StokesWarning: Cube is a Stokes cube, returning spectral cube for I component [spectral_cube.io.core]
Loaded cube DaskVaryingResolutionSpectralCube with shape=(1920, 9600, 9600) and unit=Jy / beam and chunk size (10, 100, 9600):
 n_x:   9600  type_x: RA---SIN  unit_x: deg    range:    93.219155 deg:   93.230930 deg
 n_y:   9600  type_y: DEC--SIN  unit_y: deg    range:    17.984139 deg:   17.995338 deg
 n_s:   1920  type_s: FREQ      unit_s: Hz     range: 219563051918.890 Hz:221436973845.577 Hz
Created smooth cube DaskSpectralCube with shape=(1920, 9600, 9600) and unit=Jy / beam and chunk size (1, 9600, 9600):
 n_x:   9600  type_x: RA---SIN  unit_x: deg    range:    93.219155 deg:   93.230930 deg
 n_y:   9600  type_y: DEC--SIN  unit_y: deg    range:    17.984139 deg:   17.995338 deg
 n_s:   1920  type_s: FREQ      unit_s: Hz     range: 219563051918.890 Hz:221436973845.577 Hz
[                                        ] | 1% Completed | 11hr  4min 46.9s
dhomeier commented 2 years ago

And that's with FFT? Though for a small kernel, the plain astropy.convolution.convolve might actually be faster.

keflavich commented 2 years ago

It will be fast-ish with fft:

In [12]: %timeit astropy.convolution.convolve_fft(img, astropy.convolution.Gaussian2DKernel(3), allow_huge=True)
21.3 s ± 1.58 s per loop (mean ± std. dev. of 7 runs, 1 loop each)
keflavich commented 2 years ago
In [16]: with ProgressBar():
    ...:     cube = SpectralCube.read('S255IR-SMA1_sci.spw3.cube.I.manual.image')
    ...:     print(f"Loaded cube {cube}")
    ...:     result = cube.convolve_to(cube.beams.common_beam(), allow_huge=True, convolve=astropy.convolution.convolve_fft)
    ...:     print(f"Created smooth cube {result}")
    ...:     result.write("test.fits")
    ...:     print("Wrote test.fits")
    ...:
/blue/adamginsburg/adamginsburg/repos/casa-formats-io/casa_formats_io/casa_dask.py:230: RuntimeWarning: divide by zero encountered in long_scalars
  factors = [f for f in range(stacks[dim] + 1) if stacks[dim] % f == 0]
WARNING: StokesWarning: Cube is a Stokes cube, returning spectral cube for I component [spectral_cube.io.core]
Loaded cube DaskVaryingResolutionSpectralCube with shape=(1920, 9600, 9600) and unit=Jy / beam and chunk size (10, 100, 9600):
 n_x:   9600  type_x: RA---SIN  unit_x: deg    range:    93.219155 deg:   93.230930 deg
 n_y:   9600  type_y: DEC--SIN  unit_y: deg    range:    17.984139 deg:   17.995338 deg
 n_s:   1920  type_s: FREQ      unit_s: Hz     range: 219563051918.890 Hz:221436973845.577 Hz
Created smooth cube DaskSpectralCube with shape=(1920, 9600, 9600) and unit=Jy / beam and chunk size (1, 9600, 9600):
 n_x:   9600  type_x: RA---SIN  unit_x: deg    range:    93.219155 deg:   93.230930 deg
 n_y:   9600  type_y: DEC--SIN  unit_y: deg    range:    17.984139 deg:   17.995338 deg
 n_s:   1920  type_s: FREQ      unit_s: Hz     range: 219563051918.890 Hz:221436973845.577 Hz
[                                        ] | 1% Completed | 25min 14.4s

21s 1920 = 11h 100 25m = 42h so somewhere in that range is expected.

dhomeier commented 2 years ago

I thought astropy.convolution.convolve_fft is the default already.

keflavich commented 2 years ago

no, for dask_spectral_cube, it's direct convolve

dhomeier commented 2 years ago

Ah, wrong subclass again!

keflavich commented 2 years ago

Looks like ~35h anticipated, so I'm gonna stop it:

In [16]: with ProgressBar():
    ...:     cube = SpectralCube.read('S255IR-SMA1_sci.spw3.cube.I.manual.image')
    ...:     print(f"Loaded cube {cube}")
    ...:     result = cube.convolve_to(cube.beams.common_beam(), allow_huge=True, convolve=astropy.convolution.convolve_fft)
    ...:     print(f"Created smooth cube {result}")
    ...:     result.write("test.fits")
    ...:     print("Wrote test.fits")
    ...:
/blue/adamginsburg/adamginsburg/repos/casa-formats-io/casa_formats_io/casa_dask.py:230: RuntimeWarning: divide by zero encountered in long_scalars
  factors = [f for f in range(stacks[dim] + 1) if stacks[dim] % f == 0]
WARNING: StokesWarning: Cube is a Stokes cube, returning spectral cube for I component [spectral_cube.io.core]
Loaded cube DaskVaryingResolutionSpectralCube with shape=(1920, 9600, 9600) and unit=Jy / beam and chunk size (10, 100, 9600):
 n_x:   9600  type_x: RA---SIN  unit_x: deg    range:    93.219155 deg:   93.230930 deg
 n_y:   9600  type_y: DEC--SIN  unit_y: deg    range:    17.984139 deg:   17.995338 deg
 n_s:   1920  type_s: FREQ      unit_s: Hz     range: 219563051918.890 Hz:221436973845.577 Hz
Created smooth cube DaskSpectralCube with shape=(1920, 9600, 9600) and unit=Jy / beam and chunk size (1, 9600, 9600):
 n_x:   9600  type_x: RA---SIN  unit_x: deg    range:    93.219155 deg:   93.230930 deg
 n_y:   9600  type_y: DEC--SIN  unit_y: deg    range:    17.984139 deg:   17.995338 deg
 n_s:   1920  type_s: FREQ      unit_s: Hz     range: 219563051918.890 Hz:221436973845.577 Hz
[####                                    ] | 11% Completed |  3hr 51min 24.2s

I don't think this adds up as I want it to; it's too slow.

dhomeier commented 2 years ago

But you do need the frames convolved to a common resolution? Should be possible to get some speedup by passing the multi-threaded FFTs from Scipy or PyFFTW, and in particular trimming the memory footprint (if you can go with single precision), to astropy.convolution convolve_fft(img, kernel, fftn = lambda a: scipy.fft.fft2(a, workers=-1), ifftn = lambda a: scipy.fft.ifft2(a, workers=-1), complex_dtype=np.complex64) and handing that over to convolve_to, but it won't be orders of magnitude, I am afraid.

keflavich commented 2 years ago

Yeah, this isn't a case where I actually need this for science right now, but this is an incredibly common use case and one where CASA has optimized performance a lot.

I like the idea of switching to single-precision and multithreaded FFTs. Might also try using rfft.

I've tried also using dask's multithreading/multiprocessing, but that had no effect (I tried with dask.config.set(scheduler='processes', num_workers=8): and with dask.config.set(scheduler='threads', pool=ThreadPool(20)): and with dask.config.set(scheduler='threads'):, all with the same outcome).

dhomeier commented 2 years ago

But not using dask.array.fft I assume? Seems nontrivial to get around casting to ndarray in astropy.convolution.

keflavich commented 2 years ago

No, hm, ok, maybe that's the problem. I'll try that too

dhomeier commented 2 years ago

The FFT itself seems unrealistically fast (1 ms), looks like %timeit is stumbling there over delayed execution.

keflavich commented 2 years ago

yeah for timeit tests, you may need to force a .compute

keflavich commented 2 years ago

ETA ~25h for this one:

In [46]: with ProgressBar():
    ...:     # skip with dask.config.set(scheduler='processes', num_workers=8):
    ...:         cube = SpectralCube.read('S255IR-SMA1_sci.spw3.cube.I.manual.image')
    ...:         print(f"Loaded cube {cube}")
    ...:         result = cube.convolve_to(cube.beams.common_beam(), allow_huge=True, convolve=astropy.convolution.convolve_fft, fftn=lambda x: scipy.fft.fftn(x, workers=8), ifftn=lambda x: scipy.fft.ifftn(x, workers=8))
    ...:         print(f"Created smooth cube {result}")
    ...:         result.write("test.fits")
    ...:         print("Wrote test.fits")
    ...:
/blue/adamginsburg/adamginsburg/repos/casa-formats-io/casa_formats_io/casa_dask.py:230: RuntimeWarning: divide by zero encountered in long_scalars
  factors = [f for f in range(stacks[dim] + 1) if stacks[dim] % f == 0]
WARNING: StokesWarning: Cube is a Stokes cube, returning spectral cube for I component [spectral_cube.io.core]
Loaded cube DaskVaryingResolutionSpectralCube with shape=(1920, 9600, 9600) and unit=Jy / beam and chunk size (10, 100, 9600):
 n_x:   9600  type_x: RA---SIN  unit_x: deg    range:    93.219155 deg:   93.230930 deg
 n_y:   9600  type_y: DEC--SIN  unit_y: deg    range:    17.984139 deg:   17.995338 deg
 n_s:   1920  type_s: FREQ      unit_s: Hz     range: 219563051918.890 Hz:221436973845.577 Hz
Created smooth cube DaskSpectralCube with shape=(1920, 9600, 9600) and unit=Jy / beam and chunk size (1, 9600, 9600):
 n_x:   9600  type_x: RA---SIN  unit_x: deg    range:    93.219155 deg:   93.230930 deg
 n_y:   9600  type_y: DEC--SIN  unit_y: deg    range:    17.984139 deg:   17.995338 deg
 n_s:   1920  type_s: FREQ      unit_s: Hz     range: 219563051918.890 Hz:221436973845.577 Hz
[                                        ] | 2% Completed | 33min 24.3s

so <2x boost, if anything. Trying with dask fft now

keflavich commented 2 years ago

dang, astropy convolution doesn't play nice with dask.fft:

  File "/blue/adamginsburg/adamginsburg/repos/spectral-cube/spectral_cube/dask_spectral_cube.py", line 1545, in convfunc
    out[index] = convolve(img[index], kernel, normalize_kernel=True, **kwargs)
  File "/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/astropy/nddata/decorators.py", line 246, in wrapper
    result = func(data, *args, **kwargs)
  File "/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/astropy/convolution/convolve.py", line 782, in convolve_fft
    arrayfft = fftn(bigarray)
  File "/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/dask/array/fft.py", line 180, in func
    if len(a.chunks[each_axis]) != 1:
AttributeError: 'numpy.ndarray' object has no attribute 'chunks'

this might be an issue I need to tackle in astropy core

dhomeier commented 2 years ago

What I meant. Or clone the entire convolve_fft to create an own, daskified version.

keflavich commented 2 years ago

yeah, this is really unfortunate; we'd like to be able to work with dask arrays, but np.array(daskarr) triggering a compute makes the code a total mess.

dhomeier commented 2 years ago

You want the convolved slice though, so at some point you'll have to do the computation? But multi-threaded scipy.fft might be the lower-hanging fruit for a first optimisation round. And at least on my notebook for the 9600x9600 images it already very much seems to come down to memory performance (of course that might mean potentially very significant gains with dask, but then again, dask.array.fft.fft2 told me I needed to rechunk the input to a single chunk, so it may not be that much better at running things in cache).

keflavich commented 2 years ago

see https://github.com/astropy/astropy/pull/12596 as an attempt to enable this.

@dhomeier yes I will definitely do the computation at some point, but we want dask to handle the "do compute; write to disk; unload from memory; do next compute" stuff